Program Listing for File SendEndpointBuffered.cpp
↰ Return to documentation for file (BackendLibfabric/SendEndpointBuffered.cpp)
#include "SendEndpointBuffered.hpp"
#include <utility>
#include <tracy/Tracy.hpp>
#include "Issues.hpp"
#include "netio3-backend/Issues.hpp"
#include "netio3-backend/Netio3Backend.hpp"
netio3::libfabric::SendEndpointBuffered::SendEndpointBuffered(
ActiveEndpoint& endpoint,
const ConnectionParametersSendBuffered& connection_params,
std::optional<SendBufferManager>& shared_buffer_manager,
DomainManager& domain_manager) :
m_endpoint{endpoint},
m_sender{m_endpoint.get()},
m_buffer_manager{
create_buffer_manager(prepare_connection_parameters(connection_params), domain_manager)},
m_buffer_manager_ptr{std::invoke([this, &shared_buffer_manager]() -> SendBufferManager* {
if (m_buffer_manager.has_value()) {
return &m_buffer_manager.value();
}
if (shared_buffer_manager.has_value()) {
return &shared_buffer_manager.value();
}
throw std::logic_error("Have no shared send buffer manager, but also did not allocate one");
})}
{}
std::size_t netio3::libfabric::SendEndpointBuffered::get_num_available_buffers()
{
ZoneScoped;
return m_buffer_manager_ptr->get_num_available_buffers();
}
netio3::ConnectionParametersSendBuffered netio3::libfabric::SendEndpointBuffered::prepare_connection_parameters(
const ConnectionParametersSendBuffered& requested) const
{
auto params = requested;
const auto& info = m_endpoint.get().get_endpoint().fi;
if (params.num_buf > info->tx_attr->size) {
ers::warning(TooManyBuffersRequested(params.num_buf, info->tx_attr->size));
params.num_buf = info->tx_attr->size;
}
return params;
}
std::optional<netio3::libfabric::SendBufferManager>
netio3::libfabric::SendEndpointBuffered::create_buffer_manager(
const ConnectionParametersSendBuffered& connection_params,
DomainManager& domain_manager) const
{
ZoneScoped;
if (connection_params.use_shared_send_buffers) {
return std::nullopt;
}
try {
return std::optional<SendBufferManager>{
std::in_place, connection_params, domain_manager};
} catch (const LibFabricBufferError& e) {
throw FailedOpenActiveEndpoint(ERS_HERE,
m_endpoint.get().get_address().address(),
m_endpoint.get().get_address().port(),
e.message());
}
}