.. _program_listing_file_BackendLibfabric_ConnectionlessEndpointManager.hpp: Program Listing for File ConnectionlessEndpointManager.hpp ========================================================== |exhale_lsh| :ref:`Return to documentation for file ` (``BackendLibfabric/ConnectionlessEndpointManager.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #ifndef NETIO3BACKEND_BACKENDLIBFABRIC_CONNECTIONLESSENDPOINTMANAGER_HPP #define NETIO3BACKEND_BACKENDLIBFABRIC_CONNECTIONLESSENDPOINTMANAGER_HPP #include #include #include "ActiveEndpoint.hpp" #include "AddressVectorManager.hpp" #include "CqReactor.hpp" #include "DomainManager.hpp" #include "ReceiveEndpoint.hpp" #include "SendEndpointBuffered.hpp" #include "SharedBufferManager.hpp" #include "netio3-backend/Netio3Backend.hpp" #include "netio3-backend/utility/ThreadSafeMap.hpp" namespace netio3::libfabric { class ConnectionlessEndpointManager : public std::enable_shared_from_this { public: ConnectionlessEndpointManager(NetworkConfig config, const EndPointAddress& address, BaseEventLoop* evloop, EndpointRole role); static std::shared_ptr create(const NetworkConfig& config, EndPointAddress address, BaseEventLoop* evloop, EndpointRole role); void open_send_endpoint(const EndPointAddress& address, const ConnectionParameters& connection_params); void open_receive_endpoint(const EndPointAddress& address, const ConnectionParameters& connection_params); void close_send_endpoint(const EndPointAddress& address); void close_receive_endpoint(const EndPointAddress& address); [[nodiscard]] std::size_t get_num_available_buffers(const EndPointAddress& address); [[nodiscard]] auto apply_to_send_endpoint_buffered( const EndPointAddress& address, const std::invocable auto& func) -> decltype(func(std::declval(), std::declval())) { try { return m_send_endpoint_buffered_addresses.apply(address, [this, &func](fi_addr_t addr) { return func(*m_send_endpoint_buffered, addr); }); } catch (const std::out_of_range&) { throw UnknownActiveEndpoint(ERS_HERE, address.address(), address.port()); } } [[nodiscard]] auto apply_to_send_endpoint_zero_copy( const EndPointAddress& address, const std::invocable auto& func) -> decltype(func(std::declval(), std::declval())) { try { return m_send_endpoint_zero_copy_addresses.apply(address, [this, &func](fi_addr_t addr) { return func(*m_send_endpoint_zero_copy, addr); }); } catch (const std::out_of_range&) { throw UnknownActiveEndpoint(ERS_HERE, address.address(), address.port()); } } private: void init(); void open_send_endpoint_buffered(const EndPointAddress& address); void open_send_endpoint_zero_copy(const EndPointAddress& address); void init_buffered_send_endpoint(const EndPointAddress& address); void init_zero_copy_send_endpoint(const EndPointAddress& address); void init_receive_endpoint(const EndPointAddress& address, const ConnectionParametersRecv& connection_params); void handle_close_requests(); void do_close_endpoint(const EndPointAddress& address); void handle_open_requests(); void validate_capabilities(const ConnectionParameters& connection_params) const; struct OpenQueueItem { EndPointAddress address; EndpointCapabilities capabilities; }; NetworkConfig m_config; BaseEventLoop* m_event_loop; DomainManager m_domain_manager; CqReactor m_cq_reactor; SharedBufferManager m_shared_buffer_manager; std::optional m_close_signal; tbb::concurrent_queue m_close_queue; std::optional m_open_signal; tbb::concurrent_queue m_open_queue; libfabric::AddressVectorManager m_address_vector_manager; std::unordered_map m_active_endpoint_receive; std::unique_ptr m_active_endpoint_send_buffered; std::unique_ptr m_active_endpoint_send_zero_copy; std::unordered_map m_receive_endpoints; std::unique_ptr m_send_endpoint_buffered; std::unique_ptr m_send_endpoint_zero_copy; ThreadSafeMap m_send_endpoint_buffered_addresses; ThreadSafeMap m_send_endpoint_zero_copy_addresses; std::mutex m_mutex; }; } // namespace netio3::libfabric #endif // NETIO3BACKEND_BACKENDLIBFABRIC_CONNECTIONLESSENDPOINTMANAGER_HPP