.. _program_listing_file_BackendLibfabric_ReceiveBufferManager.cpp: Program Listing for File ReceiveBufferManager.cpp ================================================= |exhale_lsh| :ref:`Return to documentation for file ` (``BackendLibfabric/ReceiveBufferManager.cpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #include "ReceiveBufferManager.hpp" #include #include #include #include "Issues.hpp" netio3::libfabric::ReceiveBufferManager::ReceiveBufferManager( DomainManager& domain_manager, fid_ep* ep, const ConnectionParametersRecv& conn_params, const fi_info* info, BaseEventLoop* event_loop) : m_ep{ep}, m_conn_params{prepare_connection_parameters(conn_params, info)}, m_retry_post_signal{event_loop->create_signal([this](int) { retry_post_buffers(); }, false)} { init_buffers(domain_manager); ERS_DEBUG(2, std::format("Allocating memory regions number of buffers: {}", m_buffers.size())); m_retry_post_buffers.reserve(m_conn_params.num_buf); post_buffers(); } netio3::libfabric::ReceiveBufferManager::~ReceiveBufferManager() { ERS_DEBUG(2, "Entered"); for (const auto& buf : m_buffers) { close_buffer(buf); } } void netio3::libfabric::ReceiveBufferManager::init_buffers(DomainManager& domain_manager) { m_buffers.reserve(m_conn_params.num_buf); for (uint64_t i = 0; i < m_conn_params.num_buf; i++) { // Key does not matter for receive buffers m_buffers.emplace_back(m_conn_params.buf_size, 0); register_buffer(m_buffers.back(), domain_manager, FI_RECV); } } void netio3::libfabric::ReceiveBufferManager::post_buffers() { ZoneScoped; for (auto& buf : m_buffers) { post_buffer(&buf); } } void netio3::libfabric::ReceiveBufferManager::post_buffer(Buffer* buf) { ZoneScoped; const auto ret = do_post_buffer(buf); if (ret == -FI_EAGAIN) { m_retry_post_buffers.push_back(buf); m_retry_post_signal.fire(); return; } if (ret != 0) { ers::error(LibFabricBufferError( ERS_HERE, std::format("Failed to post a buffer to receive inbound messages, error {} - {}", ret, fi_strerror(-ret)))); } } void netio3::libfabric::ReceiveBufferManager::retry_post_buffers() { ZoneScoped; auto still_needs_retry = std::vector{}; still_needs_retry.reserve(m_retry_post_buffers.size()); for (auto* buf : m_retry_post_buffers) { const auto ret = do_post_buffer(buf); if (ret == -FI_EAGAIN) { still_needs_retry.push_back(buf); } else if (ret != 0) [[unlikely]] { ers::error(LibFabricBufferError( ERS_HERE, std::format("Failed to post a buffer to receive inbound messages, error {} - {}", ret, fi_strerror(-ret)))); } } if (not still_needs_retry.empty()) { m_retry_post_buffers.swap(still_needs_retry); m_retry_post_signal.fire(); } else { m_retry_post_buffers.clear(); } } int netio3::libfabric::ReceiveBufferManager::do_post_buffer(Buffer* buf) const { ZoneScoped; ERS_DEBUG(2, "Entered"); iovec iov{buf->data().data(), buf->size()}; void* desc = fi_mr_desc(buf->mr); fi_msg msg{}; msg.msg_iov = &iov; /* scatter-gather array */ msg.desc = &desc; msg.iov_count = 1; msg.addr = 0; msg.context = buf; msg.data = 0; std::uint64_t flags = FI_REMOTE_CQ_DATA; // FI_MULTI_RECV; return fi_recvmsg(m_ep, &msg, flags); } netio3::ConnectionParametersRecv netio3::libfabric::ReceiveBufferManager::prepare_connection_parameters( const ConnectionParametersRecv& requested, const fi_info* info) const { auto params = requested; if (params.num_buf > info->rx_attr->size) { ers::warning(TooManyBuffersRequested(params.num_buf, info->rx_attr->size)); params.num_buf = info->rx_attr->size; } return params; }