Program Listing for File ConnectionlessEndpointManager.hpp

Return to documentation for file (BackendLibfabric/ConnectionlessEndpointManager.hpp)

#ifndef NETIO3BACKEND_BACKENDLIBFABRIC_CONNECTIONLESSENDPOINTMANAGER_HPP
#define NETIO3BACKEND_BACKENDLIBFABRIC_CONNECTIONLESSENDPOINTMANAGER_HPP

#include <unordered_map>

#include <rdma/fabric.h>

#include "ActiveEndpoint.hpp"
#include "AddressVectorManager.hpp"
#include "CqReactor.hpp"
#include "DomainManager.hpp"
#include "ReceiveEndpoint.hpp"
#include "SendEndpointBuffered.hpp"
#include "SharedBufferManager.hpp"
#include "netio3-backend/Netio3Backend.hpp"
#include "netio3-backend/utility/ThreadSafeMap.hpp"

namespace netio3::libfabric {
  class ConnectionlessEndpointManager :
    public std::enable_shared_from_this<ConnectionlessEndpointManager>
  {
  public:
    ConnectionlessEndpointManager(NetworkConfig config,
                                  const EndPointAddress& address,
                                  BaseEventLoop* evloop,
                                  std::uint64_t flags);
    static std::shared_ptr<ConnectionlessEndpointManager> create(const NetworkConfig& config,
                                                                 EndPointAddress address,
                                                                 BaseEventLoop* evloop,
                                                                 std::uint64_t flags);

    void open_send_endpoint(const EndPointAddress& address,
                            const ConnectionParameters& connection_params);

    void open_receive_endpoint(const EndPointAddress& address,
                               const ConnectionParameters& connection_params);

    void close_send_endpoint(const EndPointAddress& address);

    void close_receive_endpoint(const EndPointAddress& address);

    [[nodiscard]] std::size_t get_num_available_buffers(const EndPointAddress& address);

    [[nodiscard]] auto apply_to_send_endpoint_buffered(
      const EndPointAddress& address,
      const std::invocable<SendEndpointBuffered&, fi_addr_t> auto& func)
      -> decltype(func(std::declval<SendEndpointBuffered&>(), std::declval<fi_addr_t>()))
    {
      try {
        return m_send_endpoint_buffered_addresses.apply(address, [this, &func](fi_addr_t addr) {
          return func(*m_send_endpoint_buffered, addr);
        });
      } catch (const std::out_of_range&) {
        throw UnknownActiveEndpoint(ERS_HERE, address.address(), address.port());
      }
    }

    [[nodiscard]] auto apply_to_send_endpoint_zero_copy(
      const EndPointAddress& address,
      const std::invocable<SendEndpointZeroCopy&, fi_addr_t> auto& func)
      -> decltype(func(std::declval<SendEndpointZeroCopy&>(), std::declval<fi_addr_t>()))
    {
      try {
        return m_send_endpoint_zero_copy_addresses.apply(address, [this, &func](fi_addr_t addr) {
          return func(*m_send_endpoint_zero_copy, addr);
        });
      } catch (const std::out_of_range&) {
        throw UnknownActiveEndpoint(ERS_HERE, address.address(), address.port());
      }
    }

  private:
    void init();

    void open_send_endpoint_buffered(const EndPointAddress& address);

    void open_send_endpoint_zero_copy(const EndPointAddress& address);

    void init_buffered_send_endpoint(const EndPointAddress& address);

    void init_zero_copy_send_endpoint(const EndPointAddress& address);

    void init_receive_endpoint(const EndPointAddress& address,
                               const ConnectionParametersRecv& connection_params);

    void handle_close_requests();

    void do_close_endpoint(const EndPointAddress& address);

    void handle_open_requests();

    void validate_capabilities(const ConnectionParameters& connection_params) const;

    struct OpenQueueItem {
      EndPointAddress address;
      EndpointCapabilities capabilities;
    };

    NetworkConfig m_config;
    BaseEventLoop* m_event_loop;
    DomainManager m_domain_manager;
    CqReactor m_cq_reactor;
    SharedBufferManager m_shared_buffer_manager;
    std::optional<EventSignalHandle> m_close_signal;
    tbb::concurrent_queue<EndPointAddress> m_close_queue;
    std::optional<EventSignalHandle> m_open_signal;
    tbb::concurrent_queue<OpenQueueItem> m_open_queue;
    libfabric::AddressVectorManager m_address_vector_manager;
    std::unordered_map<EndPointAddress, ActiveEndpoint, EndPointAddress::Hash>
      m_active_endpoint_receive;
    std::unique_ptr<ActiveEndpoint> m_active_endpoint_send_buffered;
    std::unique_ptr<ActiveEndpoint> m_active_endpoint_send_zero_copy;
    std::unordered_map<EndPointAddress, ReceiveEndpoint, EndPointAddress::Hash>
      m_receive_endpoints;
    std::unique_ptr<SendEndpointBuffered> m_send_endpoint_buffered;
    std::unique_ptr<SendEndpointZeroCopy> m_send_endpoint_zero_copy;
    ThreadSafeMap<EndPointAddress, fi_addr_t> m_send_endpoint_buffered_addresses;
    ThreadSafeMap<EndPointAddress, fi_addr_t> m_send_endpoint_zero_copy_addresses;
    std::mutex m_mutex;
  };
}  // namespace netio3::libfabric

#endif  // NETIO3BACKEND_BACKENDLIBFABRIC_CONNECTIONLESSENDPOINTMANAGER_HPP