Program Listing for File ReceiveBufferManager.cpp
↰ Return to documentation for file (BackendLibfabric/ReceiveBufferManager.cpp)
#include "ReceiveBufferManager.hpp"
#include <rdma/fi_domain.h>
#include <rdma/fi_endpoint.h>
#include <tracy/Tracy.hpp>
#include "Issues.hpp"
netio3::libfabric::ReceiveBufferManager::ReceiveBufferManager(
DomainManager& domain_manager,
fid_ep* ep,
const ConnectionParametersRecv& conn_params,
const fi_info* info,
BaseEventLoop* event_loop) :
m_ep{ep},
m_conn_params{prepare_connection_parameters(conn_params, info)},
m_retry_post_signal{event_loop->create_signal([this](int) { retry_post_buffers(); }, false)}
{
init_buffers(domain_manager);
ERS_DEBUG(2, std::format("Allocating memory regions number of buffers: {}", m_buffers.size()));
m_retry_post_buffers.reserve(m_conn_params.num_buf);
post_buffers();
}
netio3::libfabric::ReceiveBufferManager::~ReceiveBufferManager()
{
ERS_DEBUG(2, "Entered");
for (const auto& buf : m_buffers) {
close_buffer(buf);
}
}
void netio3::libfabric::ReceiveBufferManager::init_buffers(DomainManager& domain_manager)
{
m_buffers.reserve(m_conn_params.num_buf);
for (uint64_t i = 0; i < m_conn_params.num_buf; i++) {
// Key does not matter for receive buffers
m_buffers.emplace_back(m_conn_params.buf_size, 0);
register_buffer(m_buffers.back(), domain_manager, FI_RECV);
}
}
void netio3::libfabric::ReceiveBufferManager::post_buffers()
{
ZoneScoped;
for (auto& buf : m_buffers) {
post_buffer(&buf);
}
}
void netio3::libfabric::ReceiveBufferManager::post_buffer(Buffer* buf)
{
ZoneScoped;
const auto ret = do_post_buffer(buf);
if (ret == -FI_EAGAIN) {
m_retry_post_buffers.push_back(buf);
m_retry_post_signal.fire();
return;
}
if (ret != 0) {
ers::error(LibFabricBufferError(
ERS_HERE,
std::format("Failed to post a buffer to receive inbound messages, error {} - {}",
ret,
fi_strerror(-ret))));
}
}
void netio3::libfabric::ReceiveBufferManager::retry_post_buffers()
{
ZoneScoped;
auto still_needs_retry = std::vector<Buffer*>{};
still_needs_retry.reserve(m_retry_post_buffers.size());
for (auto* buf : m_retry_post_buffers) {
const auto ret = do_post_buffer(buf);
if (ret == -FI_EAGAIN) {
still_needs_retry.push_back(buf);
} else if (ret != 0) [[unlikely]] {
ers::error(LibFabricBufferError(
ERS_HERE,
std::format("Failed to post a buffer to receive inbound messages, error {} - {}",
ret,
fi_strerror(-ret))));
}
}
if (not still_needs_retry.empty()) {
m_retry_post_buffers.swap(still_needs_retry);
m_retry_post_signal.fire();
} else {
m_retry_post_buffers.clear();
}
}
int netio3::libfabric::ReceiveBufferManager::do_post_buffer(Buffer* buf) const
{
ZoneScoped;
ERS_DEBUG(2, "Entered");
iovec iov{buf->data().data(), buf->size()};
void* desc = fi_mr_desc(buf->mr);
fi_msg msg{};
msg.msg_iov = &iov; /* scatter-gather array */
msg.desc = &desc;
msg.iov_count = 1;
msg.addr = 0;
msg.context = buf;
msg.data = 0;
std::uint64_t flags = FI_REMOTE_CQ_DATA; // FI_MULTI_RECV;
return fi_recvmsg(m_ep, &msg, flags);
}
netio3::ConnectionParametersRecv
netio3::libfabric::ReceiveBufferManager::prepare_connection_parameters(
const ConnectionParametersRecv& requested,
const fi_info* info) const
{
auto params = requested;
if (params.num_buf > info->rx_attr->size) {
ers::warning(TooManyBuffersRequested(params.num_buf, info->rx_attr->size));
params.num_buf = info->rx_attr->size;
}
return params;
}