Program Listing for File ConnectionManager.hpp
↰ Return to documentation for file (BackendLibfabric/ConnectionManager.hpp
)
#ifndef NETIO3BACKEND_BACKENDLIBFABRIC_CONNECTIONMANAGER_HPP
#define NETIO3BACKEND_BACKENDLIBFABRIC_CONNECTIONMANAGER_HPP
#include <map>
#include <memory>
#include <rdma/fi_eq.h>
#include "CqReactor.hpp"
#include "DomainManager.hpp"
#include "Helpers.hpp"
#include "ListenSocket.hpp"
#include "ReceiveSocket.hpp"
#include "SendSocketBuffered.hpp"
#include "SendSocketZeroCopy.hpp"
#include "netio3-backend/EventLoop/EventSignalHandle.hpp"
#include "netio3-backend/Netio3Backend.hpp"
#include "netio3-backend/EventLoop/BaseEventLoop.hpp"
#include "netio3-backend/utility/ThreadSafeMap.hpp"
namespace netio3::libfabric {
class ConnectionManager : public std::enable_shared_from_this<ConnectionManager>
{
public:
explicit ConnectionManager(BaseEventLoop* event_loop, NetworkConfig config);
~ConnectionManager();
ConnectionManager(const ConnectionManager&) = delete;
ConnectionManager(ConnectionManager&&) = delete;
ConnectionManager& operator=(const ConnectionManager&) = delete;
ConnectionManager& operator=(ConnectionManager&&) = delete;
static std::shared_ptr<ConnectionManager> create(BaseEventLoop* event_loop,
NetworkConfig config);
void init();
[[nodiscard]] EndPointAddress open_listen_endpoint(const EndPointAddress& address,
ConnectionParametersRecv conn_params);
void open_send_endpoint_buffered(const EndPointAddress& address,
ConnectionParameters conn_params);
void open_send_endpoint_zero_copy(const EndPointAddress& address,
ConnectionParameters conn_params);
void close_listen_endpoint(const EndPointAddress& address);
void close_send_endpoint(const EndPointAddress& address);
[[nodiscard]] std::size_t get_num_available_buffers(const EndPointAddress& address);
[[nodiscard]] auto apply_to_send_socket_buffered(
const EndPointAddress& address,
const std::invocable<SendSocketBuffered&> auto& func)
-> decltype(func(std::declval<SendSocketBuffered&>()))
{
try {
return m_ssockets_buffered.apply(address, func);
} catch (const std::out_of_range&) {
throw UnknownSendEndpoint(ERS_HERE, address.address(), address.port());
}
}
[[nodiscard]] auto apply_to_send_socket_zero_copy(
const EndPointAddress& address,
const std::invocable<SendSocketZeroCopy&> auto& func)
-> decltype(func(std::declval<SendSocketZeroCopy&>()))
{
try {
return m_ssockets_zero_copy.apply(address, func);
} catch (const std::out_of_range&) {
throw UnknownSendEndpoint(ERS_HERE, address.address(), address.port());
}
}
private:
struct CmEvent {
long event;
fi_eq_err_entry err_entry;
FiInfoUniquePtr info;
};
void check_and_init(const EndPointAddress& address, std::uint64_t flags);
[[nodiscard]] static CmEvent read_cm_event(fid_eq* eq);
void on_listen_socket_cm_event(ListenSocket& lsocket);
void on_recv_socket_cm_event(ReceiveSocket& rsocket);
template<SendSocketConcept SocketType>
void on_send_socket_cm_event(const EndPointAddress& address);
void handle_connection_request(ListenSocket& lsocket, FiInfoUniquePtr&& info);
void unregister_endpoint(const CqCmFds& fds);
void handle_close_requests();
void do_close_listen_endpoint(const EndPointAddress& address);
void do_close_send_endpoint(const EndPointAddress& address);
void unregister_all();
struct CloseQueueItem {
EndPointAddress address;
enum class Type {
listen,
send,
} type{}; // cppcheck-suppress internalAstError
};
NetworkConfig m_config;
BaseEventLoop* m_event_loop;
mutable std::mutex m_rsocket_mutex;
std::optional<EventSignalHandle> m_close_signal;
tbb::concurrent_queue<CloseQueueItem> m_close_queue;
std::unique_ptr<DomainManager> m_domain_manager{nullptr};
std::unique_ptr<CqReactor> m_cq_reactor{nullptr};
std::map<EndPointAddress, ListenSocket> m_lsockets;
std::multimap<EndPointAddress, ReceiveSocket> m_rsockets;
ThreadSafeMap<EndPointAddress, SendSocketBuffered> m_ssockets_buffered;
ThreadSafeMap<EndPointAddress, SendSocketZeroCopy> m_ssockets_zero_copy;
};
} // namespace netio3::libfabric
#endif // NETIO3BACKEND_BACKENDLIBFABRIC_CONNECTIONMANAGER_HPP