Program Listing for File ConnectionManager.hpp

Return to documentation for file (BackendLibfabric/ConnectionManager.hpp)

#ifndef NETIO3BACKEND_BACKENDLIBFABRIC_CONNECTIONMANAGER_HPP
#define NETIO3BACKEND_BACKENDLIBFABRIC_CONNECTIONMANAGER_HPP

#include <map>
#include <memory>

#include <rdma/fi_eq.h>

#include "ActiveEndpoint.hpp"
#include "CqReactor.hpp"
#include "DomainManager.hpp"
#include "Helpers.hpp"
#include "ListenSocket.hpp"
#include "ReceiveEndpoint.hpp"
#include "SendEndpointBuffered.hpp"
#include "SendEndpointZeroCopy.hpp"
#include "SharedBufferManager.hpp"
#include "SyncSignalWrapper.hpp"
#include "netio3-backend/EventLoop/EventSignalHandle.hpp"
#include "netio3-backend/Netio3Backend.hpp"
#include "netio3-backend/EventLoop/BaseEventLoop.hpp"
#include "netio3-backend/utility/ThreadSafeMap.hpp"

namespace netio3::libfabric {
  class ConnectionManager
  {
  public:
    explicit ConnectionManager(BaseEventLoop* event_loop, NetworkConfig config);

    ~ConnectionManager();
    ConnectionManager(const ConnectionManager&) = delete;
    ConnectionManager(ConnectionManager&&) = delete;
    ConnectionManager& operator=(const ConnectionManager&) = delete;
    ConnectionManager& operator=(ConnectionManager&&) = delete;

    [[nodiscard]] EndPointAddress open_listen_endpoint(const EndPointAddress& address,
                                                       ConnectionParameters conn_params);

    void open_active_endpoint(const EndPointAddress& address,
                              const ConnectionParameters& connection_params);

    void close_listen_endpoint(const EndPointAddress& address);

    void close_active_endpoint(const EndPointAddress& address);

    [[nodiscard]] std::size_t get_num_available_buffers(const EndPointAddress& address);

    [[nodiscard]] auto apply_to_send_endpoint_buffered(
      const EndPointAddress& address,
      const std::invocable<std::unique_ptr<SendEndpointBuffered>&> auto& func)
      -> decltype(func(std::declval<std::unique_ptr<SendEndpointBuffered>&>()))
    {
      try {
        return m_send_sendpoints_buffered.apply(address, func);
      } catch (const std::out_of_range&) {
        throw UnknownActiveEndpoint(ERS_HERE, address.address(), address.port());
      }
    }

    [[nodiscard]] auto apply_to_send_endpoint_zero_copy(
      const EndPointAddress& address,
      const std::invocable<std::unique_ptr<SendEndpointZeroCopy>&> auto& func)
      -> decltype(func(std::declval<std::unique_ptr<SendEndpointZeroCopy>&>()))
    {
      try {
        return m_send_endpoints_zero_copy.apply(address, func);
      } catch (const std::out_of_range&) {
        throw UnknownActiveEndpoint(ERS_HERE, address.address(), address.port());
      }
    }

  private:
    struct CmEvent {
      long event;
      fi_eq_err_entry err_entry;
      FiInfoUniquePtr info;
    };

    void check_and_init(const EndPointAddress& address, std::uint64_t flags);

    void validate_capabilities(const ConnectionParameters& connection_params) const;

    void enable_capabilities(const EndPointAddress& address,
                             const ConnectionParameters& connection_params);

    void create_active_endpoint(const EndPointAddress& address,
                                const ConnectionParameters& connection_params);

    void enable_buffered_sending(const EndPointAddress& address, ConnectionParametersSendBuffered conn_params);

    void enable_zero_copy_sending(const EndPointAddress& address, ConnectionParametersSendZeroCopy conn_params);

    void enable_receiving(const EndPointAddress& address, ConnectionParametersRecv conn_params);

    [[nodiscard]] static CmEvent read_cm_event(fid_eq* eq);

    void on_listen_endpoint_cm_event(ListenSocket& lendpoint);

    void on_active_endpoint_cm_event(int eqfd);

    void handle_connection_request(ListenSocket& lendpoint, FiInfoUniquePtr&& info);

    [[nodiscard]] ActiveEndpoint& handle_pending_active_endpoint(int eqfd);

    void unregister_endpoint(const CqCmFds& fds);

    void handle_close_requests();

    void do_close_listen_endpoint(const EndPointAddress& address);

    void do_close_active_endpoint(const EndPointAddress& address);

    void unregister_all();

    struct CloseQueueItem {
      EndPointAddress address;
      enum class Type {
        listen,
        active,
      } type{};  // cppcheck-suppress internalAstError
    };

    struct PendingActiveEndpoint {
      ActiveEndpoint endpoint;
      ConnectionParameters connection_params;
    };

    NetworkConfig m_config;
    BaseEventLoop* m_event_loop;
    mutable std::mutex m_receive_endpoint_mutex;
    mutable std::mutex m_active_endpoint_by_passive_mutex;
    mutable std::mutex m_listen_endpoint_mutex;
    mutable std::mutex m_active_endpoint_mutex;
    tbb::concurrent_queue<CloseQueueItem> m_close_queue;
    SyncSignalWrapper m_close_signal;
    std::unique_ptr<DomainManager> m_domain_manager{nullptr};
    std::unique_ptr<CqReactor> m_cq_reactor{nullptr};
    std::unique_ptr<SharedBufferManager> m_shared_buffer_manager{nullptr};
    std::map<EndPointAddress, ActiveEndpoint> m_active_endpoints;
    std::map<EndPointAddress, ListenSocket> m_listen_endpoints;
    std::multimap<EndPointAddress, ActiveEndpoint&> m_active_endpoints_by_passive_address;
    std::map<EndPointAddress, ReceiveEndpoint> m_receive_endpoints;
    ThreadSafeMap<EndPointAddress, std::unique_ptr<SendEndpointBuffered>> m_send_sendpoints_buffered;
    ThreadSafeMap<EndPointAddress, std::unique_ptr<SendEndpointZeroCopy>> m_send_endpoints_zero_copy;
    std::list<PendingActiveEndpoint> m_pending_active_endpoints;
  };
}  // namespace netio3::libfabric

#endif  // NETIO3BACKEND_BACKENDLIBFABRIC_CONNECTIONMANAGER_HPP