.. _program_listing_file_BackendLibfabric_ConnectionManager.hpp: Program Listing for File ConnectionManager.hpp ============================================== |exhale_lsh| :ref:`Return to documentation for file ` (``BackendLibfabric/ConnectionManager.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #ifndef NETIO3BACKEND_BACKENDLIBFABRIC_CONNECTIONMANAGER_HPP #define NETIO3BACKEND_BACKENDLIBFABRIC_CONNECTIONMANAGER_HPP #include #include #include #include "CqReactor.hpp" #include "DomainManager.hpp" #include "Helpers.hpp" #include "ListenSocket.hpp" #include "ReceiveSocket.hpp" #include "SendSocketBuffered.hpp" #include "SendSocketZeroCopy.hpp" #include "netio3-backend/EventLoop/EventSignalHandle.hpp" #include "netio3-backend/Netio3Backend.hpp" #include "netio3-backend/EventLoop/BaseEventLoop.hpp" #include "netio3-backend/utility/ThreadSafeMap.hpp" namespace netio3::libfabric { class ConnectionManager : public std::enable_shared_from_this { public: explicit ConnectionManager(BaseEventLoop* event_loop, NetworkConfig config); ~ConnectionManager(); ConnectionManager(const ConnectionManager&) = delete; ConnectionManager(ConnectionManager&&) = delete; ConnectionManager& operator=(const ConnectionManager&) = delete; ConnectionManager& operator=(ConnectionManager&&) = delete; static std::shared_ptr create(BaseEventLoop* event_loop, NetworkConfig config); void init(); [[nodiscard]] EndPointAddress open_listen_endpoint(const EndPointAddress& address, ConnectionParametersRecv conn_params); void open_send_endpoint_buffered(const EndPointAddress& address, ConnectionParameters conn_params); void open_send_endpoint_zero_copy(const EndPointAddress& address, ConnectionParameters conn_params); void close_listen_endpoint(const EndPointAddress& address); void close_send_endpoint(const EndPointAddress& address); [[nodiscard]] std::size_t get_num_available_buffers(const EndPointAddress& address); [[nodiscard]] auto apply_to_send_socket_buffered( const EndPointAddress& address, const std::invocable auto& func) -> decltype(func(std::declval())) { try { return m_ssockets_buffered.apply(address, func); } catch (const std::out_of_range&) { throw UnknownSendEndpoint(ERS_HERE, address.address(), address.port()); } } [[nodiscard]] auto apply_to_send_socket_zero_copy( const EndPointAddress& address, const std::invocable auto& func) -> decltype(func(std::declval())) { try { return m_ssockets_zero_copy.apply(address, func); } catch (const std::out_of_range&) { throw UnknownSendEndpoint(ERS_HERE, address.address(), address.port()); } } private: struct CmEvent { long event; fi_eq_err_entry err_entry; FiInfoUniquePtr info; }; void check_and_init(const EndPointAddress& address, std::uint64_t flags); [[nodiscard]] static CmEvent read_cm_event(fid_eq* eq); void on_listen_socket_cm_event(ListenSocket& lsocket); void on_recv_socket_cm_event(ReceiveSocket& rsocket); template void on_send_socket_cm_event(const EndPointAddress& address); void handle_connection_request(ListenSocket& lsocket, FiInfoUniquePtr&& info); void unregister_endpoint(const CqCmFds& fds); void handle_close_requests(); void do_close_listen_endpoint(const EndPointAddress& address); void do_close_send_endpoint(const EndPointAddress& address); void unregister_all(); struct CloseQueueItem { EndPointAddress address; enum class Type { listen, send, } type{}; // cppcheck-suppress internalAstError }; NetworkConfig m_config; BaseEventLoop* m_event_loop; mutable std::mutex m_rsocket_mutex; std::optional m_close_signal; tbb::concurrent_queue m_close_queue; std::unique_ptr m_domain_manager{nullptr}; std::unique_ptr m_cq_reactor{nullptr}; std::map m_lsockets; std::multimap m_rsockets; ThreadSafeMap m_ssockets_buffered; ThreadSafeMap m_ssockets_zero_copy; }; } // namespace netio3::libfabric #endif // NETIO3BACKEND_BACKENDLIBFABRIC_CONNECTIONMANAGER_HPP