Program Listing for File ConnectionManager.hpp

Return to documentation for file (BackendLibfabric/ConnectionManager.hpp)

#ifndef NETIO3BACKEND_BACKENDLIBFABRIC_CONNECTIONMANAGER_HPP
#define NETIO3BACKEND_BACKENDLIBFABRIC_CONNECTIONMANAGER_HPP

#include <map>
#include <memory>

#include <rdma/fi_eq.h>

#include "CqReactor.hpp"
#include "DomainManager.hpp"
#include "Helpers.hpp"
#include "ListenSocket.hpp"
#include "ReceiveSocket.hpp"
#include "SendSocketBuffered.hpp"
#include "SendSocketZeroCopy.hpp"
#include "netio3-backend/EventLoop/EventSignalHandle.hpp"
#include "netio3-backend/Netio3Backend.hpp"
#include "netio3-backend/EventLoop/BaseEventLoop.hpp"
#include "netio3-backend/utility/ThreadSafeMap.hpp"

namespace netio3::libfabric {
  class ConnectionManager : public std::enable_shared_from_this<ConnectionManager>
  {
  public:
    explicit ConnectionManager(BaseEventLoop* event_loop, NetworkConfig config);

    ~ConnectionManager();
    ConnectionManager(const ConnectionManager&) = delete;
    ConnectionManager(ConnectionManager&&) = delete;
    ConnectionManager& operator=(const ConnectionManager&) = delete;
    ConnectionManager& operator=(ConnectionManager&&) = delete;

    static std::shared_ptr<ConnectionManager> create(BaseEventLoop* event_loop,
                                                     NetworkConfig config);

    void init();

    [[nodiscard]] EndPointAddress open_listen_endpoint(const EndPointAddress& address,
                                                       ConnectionParametersRecv conn_params);

    void open_send_endpoint_buffered(const EndPointAddress& address,
                                     ConnectionParameters conn_params);

    void open_send_endpoint_zero_copy(const EndPointAddress& address,
                                      ConnectionParameters conn_params);

    void close_listen_endpoint(const EndPointAddress& address);

    void close_send_endpoint(const EndPointAddress& address);

    [[nodiscard]] std::size_t get_num_available_buffers(const EndPointAddress& address);

    [[nodiscard]] auto apply_to_send_socket_buffered(
      const EndPointAddress& address,
      const std::invocable<SendSocketBuffered&> auto& func)
      -> decltype(func(std::declval<SendSocketBuffered&>()))
    {
      try {
        return m_ssockets_buffered.apply(address, func);
      } catch (const std::out_of_range&) {
        throw UnknownSendEndpoint(ERS_HERE, address.address(), address.port());
      }
    }

    [[nodiscard]] auto apply_to_send_socket_zero_copy(
      const EndPointAddress& address,
      const std::invocable<SendSocketZeroCopy&> auto& func)
      -> decltype(func(std::declval<SendSocketZeroCopy&>()))
    {
      try {
        return m_ssockets_zero_copy.apply(address, func);
      } catch (const std::out_of_range&) {
        throw UnknownSendEndpoint(ERS_HERE, address.address(), address.port());
      }
    }

  private:
    struct CmEvent {
      long event;
      fi_eq_err_entry err_entry;
      FiInfoUniquePtr info;
    };

    void check_and_init(const EndPointAddress& address, std::uint64_t flags);

    [[nodiscard]] static CmEvent read_cm_event(fid_eq* eq);

    void on_listen_socket_cm_event(ListenSocket& lsocket);

    void on_recv_socket_cm_event(ReceiveSocket& rsocket);

    template<SendSocketConcept SocketType>
    void on_send_socket_cm_event(const EndPointAddress& address);

    void handle_connection_request(ListenSocket& lsocket, FiInfoUniquePtr&& info);

    void unregister_endpoint(const CqCmFds& fds);

    void handle_close_requests();

    void do_close_listen_endpoint(const EndPointAddress& address);

    void do_close_send_endpoint(const EndPointAddress& address);

    void unregister_all();

    struct CloseQueueItem {
      EndPointAddress address;
      enum class Type {
        listen,
        send,
      } type{};  // cppcheck-suppress internalAstError
    };

    NetworkConfig m_config;
    BaseEventLoop* m_event_loop;
    mutable std::mutex m_rsocket_mutex;
    std::optional<EventSignalHandle> m_close_signal;
    tbb::concurrent_queue<CloseQueueItem> m_close_queue;
    std::unique_ptr<DomainManager> m_domain_manager{nullptr};
    std::unique_ptr<CqReactor> m_cq_reactor{nullptr};
    std::map<EndPointAddress, ListenSocket> m_lsockets;
    std::multimap<EndPointAddress, ReceiveSocket> m_rsockets;
    ThreadSafeMap<EndPointAddress, SendSocketBuffered> m_ssockets_buffered;
    ThreadSafeMap<EndPointAddress, SendSocketZeroCopy> m_ssockets_zero_copy;
  };
}  // namespace netio3::libfabric

#endif  // NETIO3BACKEND_BACKENDLIBFABRIC_CONNECTIONMANAGER_HPP