Program Listing for File SendSocketBuffered.cpp

Return to documentation for file (BackendLibfabric/SendSocketBuffered.cpp)

#include "SendSocketBuffered.hpp"

#include <utility>

#include <tracy/Tracy.hpp>

#include "Issues.hpp"

netio3::libfabric::SendSocketBuffered::SendSocketBuffered(
  EndPointAddress address,
  const ConnectionParameters& connection_params,
  NetworkMode mode,
  fid_fabric* fabric,
  DomainContext& domain) :
  SendSocket{std::move(address), mode, fabric, domain.get_domain()},
  m_conn_parameters{prepare_connection_parameters(connection_params)}
{
  init_buffers(domain);
  init();
}

netio3::libfabric::SendSocketBuffered::~SendSocketBuffered()
{
  ERS_DEBUG(2, "Entered");
  for (const auto& buffer : m_buffers) {
    close_buffer(buffer);
  }
  ERS_DEBUG(2, "Finished");
}

std::size_t netio3::libfabric::SendSocketBuffered::get_num_available_buffers()
{
  ZoneScoped;
  return m_min_num_available_buffers.exchange(
    m_num_available_buffers.load(std::memory_order_relaxed), std::memory_order_relaxed);
}

void netio3::libfabric::SendSocketBuffered::init_buffers(DomainContext& domain)
{
  ZoneScoped;
  ERS_DEBUG(2,
            std::format("Registering  {} buffers of size {}",
                        m_conn_parameters.num_buf,
                        m_conn_parameters.buf_size));
  for (std::uint64_t key = 0; key < m_conn_parameters.num_buf; ++key) {
    m_buffers.emplace_back(domain, m_conn_parameters.buf_size, key);
    try {
      register_buffer(m_buffers.back(), domain, FI_SEND);
    } catch (const LibFabricBufferError& e) {
      throw FailedOpenSendEndpoint(
        ERS_HERE, get_address().address(), get_address().port(), e.message());
    }
    m_available_buffers.push(key);
    ++m_num_available_buffers;
  }
  m_min_num_available_buffers = m_num_available_buffers.load();
}

netio3::ConnectionParameters netio3::libfabric::SendSocketBuffered::prepare_connection_parameters(
  const ConnectionParameters& requested) const
{
  auto params = requested;
  const auto& info = get_endpoint().fi;
  if (params.num_buf > info->tx_attr->size) {
    ers::warning(TooManyBuffersRequested(params.num_buf, info->tx_attr->size));
    params.num_buf = info->tx_attr->size;
  }
  return params;
}