Program Listing for File ReceiveBufferManager.cpp

Return to documentation for file (BackendLibfabric/ReceiveBufferManager.cpp)

#include "ReceiveBufferManager.hpp"

#include <rdma/fi_domain.h>
#include <rdma/fi_endpoint.h>

#include <tracy/Tracy.hpp>

#include "Issues.hpp"

netio3::libfabric::ReceiveBufferManager::ReceiveBufferManager(
  DomainManager& domain_manager,
  fid_ep* ep,
  const ConnectionParametersRecv& conn_params,
  const fi_info* info,
  BaseEventLoop* event_loop) :
  m_ep{ep},
  m_conn_params{prepare_connection_parameters(conn_params, info)},
  m_retry_post_signal{event_loop->create_signal([this](int) { retry_post_buffers(); }, false)}
{
  init_buffers(domain_manager);
  ERS_DEBUG(2, std::format("Allocating memory regions number of buffers: {}", m_buffers.size()));
  m_retry_post_buffers.reserve(m_conn_params.num_buf);
  post_buffers();
}

netio3::libfabric::ReceiveBufferManager::~ReceiveBufferManager()
{
  ERS_DEBUG(2, "Entered");
  for (const auto& buf : m_buffers) {
    close_buffer(buf);
  }
}

void netio3::libfabric::ReceiveBufferManager::init_buffers(DomainManager& domain_manager)
{
  m_buffers.reserve(m_conn_params.num_buf);
  for (uint64_t i = 0; i < m_conn_params.num_buf; i++) {
    // Key does not matter for receive buffers
    m_buffers.emplace_back(m_conn_params.buf_size, 0);
    register_buffer(m_buffers.back(), domain_manager, FI_RECV);
  }
}

void netio3::libfabric::ReceiveBufferManager::post_buffers()
{
  ZoneScoped;
  for (auto& buf : m_buffers) {
    post_buffer(&buf);
  }
}

void netio3::libfabric::ReceiveBufferManager::post_buffer(Buffer* buf)
{
  ZoneScoped;
  const auto ret = do_post_buffer(buf);
  if (ret == -FI_EAGAIN) {
    m_retry_post_buffers.push_back(buf);
    m_retry_post_signal.fire();
    return;
  }
  if (ret != 0) {
    ers::error(LibFabricBufferError(
      ERS_HERE,
      std::format("Failed to post a buffer to receive inbound messages, error {} - {}",
                  ret,
                  fi_strerror(-ret))));
  }
}

void netio3::libfabric::ReceiveBufferManager::retry_post_buffers()
{
  ZoneScoped;
  auto still_needs_retry = std::vector<Buffer*>{};
  still_needs_retry.reserve(m_retry_post_buffers.size());
  for (auto* buf : m_retry_post_buffers) {
    const auto ret = do_post_buffer(buf);
    if (ret == -FI_EAGAIN) {
      still_needs_retry.push_back(buf);
    } else if (ret != 0) [[unlikely]] {
      ers::error(LibFabricBufferError(
        ERS_HERE,
        std::format("Failed to post a buffer to receive inbound messages, error {} - {}",
                    ret,
                    fi_strerror(-ret))));
    }
  }

  if (not still_needs_retry.empty()) {
    m_retry_post_buffers.swap(still_needs_retry);
    m_retry_post_signal.fire();
  } else {
    m_retry_post_buffers.clear();
  }
}

int netio3::libfabric::ReceiveBufferManager::do_post_buffer(Buffer* buf) const
{
  ZoneScoped;
  ERS_DEBUG(2, "Entered");
  iovec iov{buf->data().data(), buf->size()};
  void* desc = fi_mr_desc(buf->mr);
  fi_msg msg{};
  msg.msg_iov = &iov; /* scatter-gather array */
  msg.desc = &desc;
  msg.iov_count = 1;
  msg.addr = 0;
  msg.context = buf;
  msg.data = 0;

  std::uint64_t flags = FI_REMOTE_CQ_DATA;  // FI_MULTI_RECV;
  return fi_recvmsg(m_ep, &msg, flags);
}

netio3::ConnectionParametersRecv
netio3::libfabric::ReceiveBufferManager::prepare_connection_parameters(
  const ConnectionParametersRecv& requested,
  const fi_info* info) const
{
  auto params = requested;
  if (params.num_buf > info->rx_attr->size) {
    ers::warning(TooManyBuffersRequested(params.num_buf, info->rx_attr->size));
    params.num_buf = info->rx_attr->size;
  }
  return params;
}