Program Listing for File ReceiveEndpoint.cpp

Return to documentation for file (BackendLibfabric/ReceiveEndpoint.cpp)

#include "ReceiveEndpoint.hpp"

#include <rdma/fi_cm.h>
#include <rdma/fi_domain.h>
#include <rdma/fi_endpoint.h>

#include <tracy/Tracy.hpp>

#include "Helpers.hpp"
#include "Issues.hpp"
#include "netio3-backend/Issues.hpp"

netio3::libfabric::ReceiveEndpoint::ReceiveEndpoint(ActiveEndpoint& endpoint,
                                                    ConnectionParametersRecv conn_params,
                                                    std::optional<SharedReceiveContextManager>& shared_context_manager,
                                                    DomainManager& domain_manager,
                                                    BaseEventLoop* event_loop) :
  m_endpoint{endpoint},
  m_conn_params{std::move(conn_params)},
  m_buffer_manager{create_buffer_manager(domain_manager, event_loop)},
  m_buffer_manager_ptr{std::invoke([this, &shared_context_manager]() -> ReceiveBufferManager* {
    if (m_buffer_manager.has_value()) {
      return &m_buffer_manager.value();
    }
    if (shared_context_manager.has_value()) {
      return shared_context_manager->get_buffer_manager();
    }
    throw std::logic_error("Have no shared receive buffer manager, but also did not allocate one");
  })}
{
}

std::optional<netio3::libfabric::ReceiveBufferManager>
netio3::libfabric::ReceiveEndpoint::create_buffer_manager(
  DomainManager& domain_manager,
  BaseEventLoop* event_loop) const
{
  ZoneScoped;
  if (m_conn_params.use_shared_receive_buffers) {
    return std::nullopt;
  }
  try {
    return ReceiveBufferManager{domain_manager,
                                m_endpoint.get().get_endpoint().ep.get(),
                                m_conn_params,
                                m_endpoint.get().get_endpoint().fi.get(),
                                event_loop};
  } catch (const LibFabricBufferError& e) {
    throw FailedOpenActiveEndpoint(ERS_HERE,
                                   m_endpoint.get().get_address().address(),
                                   m_endpoint.get().get_address().port(),
                                   e.message());
  }
}