Program Listing for File ConnectionManager.hpp
↰ Return to documentation for file (BackendLibfabric/ConnectionManager.hpp)
#ifndef NETIO3BACKEND_BACKENDLIBFABRIC_CONNECTIONMANAGER_HPP
#define NETIO3BACKEND_BACKENDLIBFABRIC_CONNECTIONMANAGER_HPP
#include <map>
#include <memory>
#include <rdma/fi_eq.h>
#include "ActiveEndpoint.hpp"
#include "CqReactor.hpp"
#include "DomainManager.hpp"
#include "Helpers.hpp"
#include "ListenSocket.hpp"
#include "ReceiveEndpoint.hpp"
#include "SendEndpointBuffered.hpp"
#include "SendEndpointZeroCopy.hpp"
#include "SharedBufferManager.hpp"
#include "SyncSignalWrapper.hpp"
#include "netio3-backend/EventLoop/EventSignalHandle.hpp"
#include "netio3-backend/Netio3Backend.hpp"
#include "netio3-backend/EventLoop/BaseEventLoop.hpp"
#include "netio3-backend/utility/ThreadSafeMap.hpp"
namespace netio3::libfabric {
class ConnectionManager
{
public:
explicit ConnectionManager(BaseEventLoop* event_loop, NetworkConfig config);
~ConnectionManager();
ConnectionManager(const ConnectionManager&) = delete;
ConnectionManager(ConnectionManager&&) = delete;
ConnectionManager& operator=(const ConnectionManager&) = delete;
ConnectionManager& operator=(ConnectionManager&&) = delete;
[[nodiscard]] EndPointAddress open_listen_endpoint(const EndPointAddress& address,
ConnectionParameters conn_params);
void open_active_endpoint(const EndPointAddress& address,
const ConnectionParameters& connection_params);
void close_listen_endpoint(const EndPointAddress& address);
void close_active_endpoint(const EndPointAddress& address);
[[nodiscard]] std::size_t get_num_available_buffers(const EndPointAddress& address);
[[nodiscard]] auto apply_to_send_endpoint_buffered(
const EndPointAddress& address,
const std::invocable<std::unique_ptr<SendEndpointBuffered>&> auto& func)
-> decltype(func(std::declval<std::unique_ptr<SendEndpointBuffered>&>()))
{
try {
return m_send_sendpoints_buffered.apply(address, func);
} catch (const std::out_of_range&) {
throw UnknownActiveEndpoint(ERS_HERE, address.address(), address.port());
}
}
[[nodiscard]] auto apply_to_send_endpoint_zero_copy(
const EndPointAddress& address,
const std::invocable<std::unique_ptr<SendEndpointZeroCopy>&> auto& func)
-> decltype(func(std::declval<std::unique_ptr<SendEndpointZeroCopy>&>()))
{
try {
return m_send_endpoints_zero_copy.apply(address, func);
} catch (const std::out_of_range&) {
throw UnknownActiveEndpoint(ERS_HERE, address.address(), address.port());
}
}
private:
struct CmEvent {
long event;
fi_eq_err_entry err_entry;
FiInfoUniquePtr info;
};
void check_and_init(const EndPointAddress& address, std::uint64_t flags);
void validate_capabilities(const ConnectionParameters& connection_params) const;
void enable_capabilities(const EndPointAddress& address,
const ConnectionParameters& connection_params);
void create_active_endpoint(const EndPointAddress& address,
const ConnectionParameters& connection_params);
void enable_buffered_sending(const EndPointAddress& address, ConnectionParametersSendBuffered conn_params);
void enable_zero_copy_sending(const EndPointAddress& address, ConnectionParametersSendZeroCopy conn_params);
void enable_receiving(const EndPointAddress& address, ConnectionParametersRecv conn_params);
[[nodiscard]] static CmEvent read_cm_event(fid_eq* eq);
void on_listen_endpoint_cm_event(ListenSocket& lendpoint);
void on_active_endpoint_cm_event(int eqfd);
void handle_connection_request(ListenSocket& lendpoint, FiInfoUniquePtr&& info);
[[nodiscard]] ActiveEndpoint& handle_pending_active_endpoint(int eqfd);
void unregister_endpoint(const CqCmFds& fds);
void handle_close_requests();
void do_close_listen_endpoint(const EndPointAddress& address);
void do_close_active_endpoint(const EndPointAddress& address);
void unregister_all();
struct CloseQueueItem {
EndPointAddress address;
enum class Type {
listen,
active,
} type{}; // cppcheck-suppress internalAstError
};
struct PendingActiveEndpoint {
ActiveEndpoint endpoint;
ConnectionParameters connection_params;
};
NetworkConfig m_config;
BaseEventLoop* m_event_loop;
mutable std::mutex m_receive_endpoint_mutex;
mutable std::mutex m_active_endpoint_by_passive_mutex;
mutable std::mutex m_listen_endpoint_mutex;
mutable std::mutex m_active_endpoint_mutex;
tbb::concurrent_queue<CloseQueueItem> m_close_queue;
SyncSignalWrapper m_close_signal;
std::unique_ptr<DomainManager> m_domain_manager{nullptr};
std::unique_ptr<CqReactor> m_cq_reactor{nullptr};
std::unique_ptr<SharedBufferManager> m_shared_buffer_manager{nullptr};
std::map<EndPointAddress, ActiveEndpoint> m_active_endpoints;
std::map<EndPointAddress, ListenSocket> m_listen_endpoints;
std::multimap<EndPointAddress, ActiveEndpoint&> m_active_endpoints_by_passive_address;
std::map<EndPointAddress, ReceiveEndpoint> m_receive_endpoints;
ThreadSafeMap<EndPointAddress, std::unique_ptr<SendEndpointBuffered>> m_send_sendpoints_buffered;
ThreadSafeMap<EndPointAddress, std::unique_ptr<SendEndpointZeroCopy>> m_send_endpoints_zero_copy;
std::list<PendingActiveEndpoint> m_pending_active_endpoints;
};
} // namespace netio3::libfabric
#endif // NETIO3BACKEND_BACKENDLIBFABRIC_CONNECTIONMANAGER_HPP