Program Listing for File SendEndpointZeroCopy.cpp

Return to documentation for file (BackendLibfabric/SendEndpointZeroCopy.cpp)

#include "SendEndpointZeroCopy.hpp"

#include <algorithm>

#include <tracy/Tracy.hpp>

#include <rdma/fi_domain.h>
#include <utility>

#include "Issues.hpp"
#include "netio3-backend/Issues.hpp"
#include "netio3-backend/Netio3Backend.hpp"

netio3::libfabric::SendEndpointZeroCopy::SendEndpointZeroCopy(
  ActiveEndpoint& endpoint,
  const ConnectionParametersSendZeroCopy& connection_params,
  std::optional<ZeroCopyBufferManager>& shared_buffer_manager,
  DomainManager& domain_manager) :
  m_endpoint{endpoint},
  m_conn_parameters{connection_params},
  m_sender{m_endpoint.get()},
  m_buffer_manager{
    create_buffer_manager(connection_params, domain_manager)},
  m_buffer_manager_ptr{std::invoke([this, &shared_buffer_manager] {
    if (m_buffer_manager) {
      return &m_buffer_manager.value();
    }
    if (shared_buffer_manager.has_value()) {
      return &shared_buffer_manager.value();
    }
    throw std::logic_error("Have no shared buffer manager, but also did not allocate one");
  })}
{}

netio3::NetioStatus netio3::libfabric::SendEndpointZeroCopy::send_data(
  const std::span<const iovec> iov,
  const std::span<const std::uint8_t> header_data,
  const std::uint64_t key,
  fi_addr_t addr)
{
  ZoneScoped;
  ERS_DEBUG(2, std::format("Send zero copy data with key {}", key));
  const auto header = m_buffer_manager_ptr->get_header(header_data, key);
  if (header.status != NetioStatus::OK) {
    return header.status;
  }
  std::vector<iovec> data{};
  data.reserve(iov.size() + 1);
  data.push_back(header.data);
  data.insert(data.end(), iov.begin(), iov.end());
  std::vector<fid_mr*> mrs{};
  mrs.reserve(data.size());
  mrs.push_back(m_buffer_manager_ptr->get_header_mr());
  std::ranges::generate_n(
    std::back_inserter(mrs), iov.size(), [&] { return m_buffer_manager_ptr->get_mr(); });
  auto status = m_sender.send_data(std::span{data}, mrs, header.bufnum, addr);
  if (status != NetioStatus::OK) {
    // Send operation failed, no reason to communicate the buffer number
    std::ignore = m_buffer_manager_ptr->return_header(header.bufnum);
  }
  return status;
}

netio3::NetioStatus netio3::libfabric::SendEndpointZeroCopy::send_data(
  const std::span<std::uint8_t> data,
  const std::span<const std::uint8_t> header_data,
  const std::uint64_t key,
  fi_addr_t addr)
{
  ZoneScoped;
  return send_data(
    std::vector{iovec{.iov_base = data.data(), .iov_len = data.size()}}, header_data, key, addr);
}

std::uint64_t netio3::libfabric::SendEndpointZeroCopy::release_buffer(const std::uint64_t bufnum)
{
  ERS_DEBUG(2, std::format("Releasing buffer {}", bufnum));
  return m_buffer_manager_ptr->return_header(bufnum);
}

std::size_t netio3::libfabric::SendEndpointZeroCopy::get_num_available_buffers()
{
  ZoneScoped;
  return m_buffer_manager_ptr->get_num_available_buffers();
}

std::vector<std::uint64_t> netio3::libfabric::SendEndpointZeroCopy::get_pending_sends()
{
  ZoneScoped;
  return m_buffer_manager_ptr->get_pending_sends();
}

std::optional<netio3::libfabric::ZeroCopyBufferManager>
netio3::libfabric::SendEndpointZeroCopy::create_buffer_manager(
  const ConnectionParametersSendZeroCopy& connection_params,
  DomainManager& domain_manager) const
{
  ZoneScoped;
  if (connection_params.use_shared_send_buffers) {
    return std::nullopt;
  }
  try {
    return std::optional<ZeroCopyBufferManager>{std::in_place, connection_params, domain_manager};
  } catch (const LibFabricBufferError& e) {
    throw FailedOpenActiveEndpoint(
      m_endpoint.get().get_address().address(), m_endpoint.get().get_address().port(), e.message());
  }
}