Program Listing for File SendEndpointZeroCopy.cpp
↰ Return to documentation for file (BackendLibfabric/SendEndpointZeroCopy.cpp)
#include "SendEndpointZeroCopy.hpp"
#include <algorithm>
#include <tracy/Tracy.hpp>
#include <rdma/fi_domain.h>
#include <utility>
#include "Issues.hpp"
#include "netio3-backend/Issues.hpp"
#include "netio3-backend/Netio3Backend.hpp"
netio3::libfabric::SendEndpointZeroCopy::SendEndpointZeroCopy(
ActiveEndpoint& endpoint,
const ConnectionParametersSendZeroCopy& connection_params,
std::optional<ZeroCopyBufferManager>& shared_buffer_manager,
DomainManager& domain_manager) :
m_endpoint{endpoint},
m_conn_parameters{connection_params},
m_sender{m_endpoint.get()},
m_buffer_manager{
create_buffer_manager(connection_params, domain_manager)},
m_buffer_manager_ptr{std::invoke([this, &shared_buffer_manager] {
if (m_buffer_manager) {
return &m_buffer_manager.value();
}
if (shared_buffer_manager.has_value()) {
return &shared_buffer_manager.value();
}
throw std::logic_error("Have no shared buffer manager, but also did not allocate one");
})}
{}
netio3::NetioStatus netio3::libfabric::SendEndpointZeroCopy::send_data(
const std::span<const iovec> iov,
const std::span<const std::uint8_t> header_data,
const std::uint64_t key,
fi_addr_t addr)
{
ZoneScoped;
ERS_DEBUG(2, std::format("Send zero copy data with key {}", key));
const auto header = m_buffer_manager_ptr->get_header(header_data, key);
if (header.status != NetioStatus::OK) {
return header.status;
}
std::vector<iovec> data{};
data.reserve(iov.size() + 1);
data.push_back(header.data);
data.insert(data.end(), iov.begin(), iov.end());
std::vector<fid_mr*> mrs{};
mrs.reserve(data.size());
mrs.push_back(m_buffer_manager_ptr->get_header_mr());
std::ranges::generate_n(
std::back_inserter(mrs), iov.size(), [&] { return m_buffer_manager_ptr->get_mr(); });
auto status = m_sender.send_data(std::span{data}, mrs, header.bufnum, addr);
if (status != NetioStatus::OK) {
// Send operation failed, no reason to communicate the buffer number
std::ignore = m_buffer_manager_ptr->return_header(header.bufnum);
}
return status;
}
netio3::NetioStatus netio3::libfabric::SendEndpointZeroCopy::send_data(
const std::span<std::uint8_t> data,
const std::span<const std::uint8_t> header_data,
const std::uint64_t key,
fi_addr_t addr)
{
ZoneScoped;
return send_data(
std::vector{iovec{.iov_base = data.data(), .iov_len = data.size()}}, header_data, key, addr);
}
std::uint64_t netio3::libfabric::SendEndpointZeroCopy::release_buffer(const std::uint64_t bufnum)
{
ERS_DEBUG(2, std::format("Releasing buffer {}", bufnum));
return m_buffer_manager_ptr->return_header(bufnum);
}
std::size_t netio3::libfabric::SendEndpointZeroCopy::get_num_available_buffers()
{
ZoneScoped;
return m_buffer_manager_ptr->get_num_available_buffers();
}
std::vector<std::uint64_t> netio3::libfabric::SendEndpointZeroCopy::get_pending_sends()
{
ZoneScoped;
return m_buffer_manager_ptr->get_pending_sends();
}
std::optional<netio3::libfabric::ZeroCopyBufferManager>
netio3::libfabric::SendEndpointZeroCopy::create_buffer_manager(
const ConnectionParametersSendZeroCopy& connection_params,
DomainManager& domain_manager) const
{
ZoneScoped;
if (connection_params.use_shared_send_buffers) {
return std::nullopt;
}
try {
return std::optional<ZeroCopyBufferManager>{std::in_place, connection_params, domain_manager};
} catch (const LibFabricBufferError& e) {
throw FailedOpenActiveEndpoint(
m_endpoint.get().get_address().address(), m_endpoint.get().get_address().port(), e.message());
}
}