Program Listing for File SendEndpointBuffered.cpp

Return to documentation for file (BackendLibfabric/SendEndpointBuffered.cpp)

#include "SendEndpointBuffered.hpp"

#include <utility>

#include <tracy/Tracy.hpp>

#include "Issues.hpp"
#include "netio3-backend/Issues.hpp"
#include "netio3-backend/Netio3Backend.hpp"

netio3::libfabric::SendEndpointBuffered::SendEndpointBuffered(
  ActiveEndpoint& endpoint,
  const ConnectionParametersSendBuffered& connection_params,
  std::optional<SendBufferManager>& shared_buffer_manager,
  DomainManager& domain_manager) :
  m_endpoint{endpoint},
  m_sender{m_endpoint.get()},
  m_buffer_manager{
    create_buffer_manager(prepare_connection_parameters(connection_params), domain_manager)},
  m_buffer_manager_ptr{std::invoke([this, &shared_buffer_manager]() -> SendBufferManager* {
    if (m_buffer_manager.has_value()) {
      return &m_buffer_manager.value();
    }
    if (shared_buffer_manager.has_value()) {
      return &shared_buffer_manager.value();
    }
    throw std::logic_error("Have no shared send buffer manager, but also did not allocate one");
  })}
{}

std::size_t netio3::libfabric::SendEndpointBuffered::get_num_available_buffers()
{
  ZoneScoped;
  return m_buffer_manager_ptr->get_num_available_buffers();
}

netio3::ConnectionParametersSendBuffered netio3::libfabric::SendEndpointBuffered::prepare_connection_parameters(
  const ConnectionParametersSendBuffered& requested) const
{
  auto params = requested;
  const auto& info = m_endpoint.get().get_endpoint().fi;
  if (params.num_buf > info->tx_attr->size) {
    ers::warning(TooManyBuffersRequested(params.num_buf, info->tx_attr->size));
    params.num_buf = info->tx_attr->size;
  }
  return params;
}

std::optional<netio3::libfabric::SendBufferManager>
netio3::libfabric::SendEndpointBuffered::create_buffer_manager(
  const ConnectionParametersSendBuffered& connection_params,
  DomainManager& domain_manager) const
{
  ZoneScoped;
  if (connection_params.use_shared_send_buffers) {
    return std::nullopt;
  }
  try {
    return std::optional<SendBufferManager>{
      std::in_place, connection_params, domain_manager};
  } catch (const LibFabricBufferError& e) {
    throw FailedOpenActiveEndpoint(ERS_HERE,
                                   m_endpoint.get().get_address().address(),
                                   m_endpoint.get().get_address().port(),
                                   e.message());
  }
}