Program Listing for File ConnectionlessEndpointManager.hpp
↰ Return to documentation for file (BackendLibfabric/ConnectionlessEndpointManager.hpp)
#ifndef NETIO3BACKEND_BACKENDLIBFABRIC_CONNECTIONLESSENDPOINTMANAGER_HPP
#define NETIO3BACKEND_BACKENDLIBFABRIC_CONNECTIONLESSENDPOINTMANAGER_HPP
#include <unordered_map>
#include <rdma/fabric.h>
#include "ActiveEndpoint.hpp"
#include "AddressVectorManager.hpp"
#include "CqReactor.hpp"
#include "DomainManager.hpp"
#include "ReceiveEndpoint.hpp"
#include "SendEndpointBuffered.hpp"
#include "SharedBufferManager.hpp"
#include "netio3-backend/Netio3Backend.hpp"
#include "netio3-backend/utility/ThreadSafeMap.hpp"
namespace netio3::libfabric {
class ConnectionlessEndpointManager :
public std::enable_shared_from_this<ConnectionlessEndpointManager>
{
public:
ConnectionlessEndpointManager(NetworkConfig config,
const EndPointAddress& address,
BaseEventLoop* evloop,
std::uint64_t flags);
static std::shared_ptr<ConnectionlessEndpointManager> create(const NetworkConfig& config,
EndPointAddress address,
BaseEventLoop* evloop,
std::uint64_t flags);
void open_send_endpoint(const EndPointAddress& address,
const ConnectionParameters& connection_params);
void open_receive_endpoint(const EndPointAddress& address,
const ConnectionParameters& connection_params);
void close_send_endpoint(const EndPointAddress& address);
void close_receive_endpoint(const EndPointAddress& address);
[[nodiscard]] std::size_t get_num_available_buffers(const EndPointAddress& address);
[[nodiscard]] auto apply_to_send_endpoint_buffered(
const EndPointAddress& address,
const std::invocable<SendEndpointBuffered&, fi_addr_t> auto& func)
-> decltype(func(std::declval<SendEndpointBuffered&>(), std::declval<fi_addr_t>()))
{
try {
return m_send_endpoint_buffered_addresses.apply(address, [this, &func](fi_addr_t addr) {
return func(*m_send_endpoint_buffered, addr);
});
} catch (const std::out_of_range&) {
throw UnknownActiveEndpoint(ERS_HERE, address.address(), address.port());
}
}
[[nodiscard]] auto apply_to_send_endpoint_zero_copy(
const EndPointAddress& address,
const std::invocable<SendEndpointZeroCopy&, fi_addr_t> auto& func)
-> decltype(func(std::declval<SendEndpointZeroCopy&>(), std::declval<fi_addr_t>()))
{
try {
return m_send_endpoint_zero_copy_addresses.apply(address, [this, &func](fi_addr_t addr) {
return func(*m_send_endpoint_zero_copy, addr);
});
} catch (const std::out_of_range&) {
throw UnknownActiveEndpoint(ERS_HERE, address.address(), address.port());
}
}
private:
void init();
void open_send_endpoint_buffered(const EndPointAddress& address);
void open_send_endpoint_zero_copy(const EndPointAddress& address);
void init_buffered_send_endpoint(const EndPointAddress& address);
void init_zero_copy_send_endpoint(const EndPointAddress& address);
void init_receive_endpoint(const EndPointAddress& address,
const ConnectionParametersRecv& connection_params);
void handle_close_requests();
void do_close_endpoint(const EndPointAddress& address);
void handle_open_requests();
void validate_capabilities(const ConnectionParameters& connection_params) const;
struct OpenQueueItem {
EndPointAddress address;
EndpointCapabilities capabilities;
};
NetworkConfig m_config;
BaseEventLoop* m_event_loop;
DomainManager m_domain_manager;
CqReactor m_cq_reactor;
SharedBufferManager m_shared_buffer_manager;
std::optional<EventSignalHandle> m_close_signal;
tbb::concurrent_queue<EndPointAddress> m_close_queue;
std::optional<EventSignalHandle> m_open_signal;
tbb::concurrent_queue<OpenQueueItem> m_open_queue;
libfabric::AddressVectorManager m_address_vector_manager;
std::unordered_map<EndPointAddress, ActiveEndpoint, EndPointAddress::Hash>
m_active_endpoint_receive;
std::unique_ptr<ActiveEndpoint> m_active_endpoint_send_buffered;
std::unique_ptr<ActiveEndpoint> m_active_endpoint_send_zero_copy;
std::unordered_map<EndPointAddress, ReceiveEndpoint, EndPointAddress::Hash>
m_receive_endpoints;
std::unique_ptr<SendEndpointBuffered> m_send_endpoint_buffered;
std::unique_ptr<SendEndpointZeroCopy> m_send_endpoint_zero_copy;
ThreadSafeMap<EndPointAddress, fi_addr_t> m_send_endpoint_buffered_addresses;
ThreadSafeMap<EndPointAddress, fi_addr_t> m_send_endpoint_zero_copy_addresses;
std::mutex m_mutex;
};
} // namespace netio3::libfabric
#endif // NETIO3BACKEND_BACKENDLIBFABRIC_CONNECTIONLESSENDPOINTMANAGER_HPP