.. _program_listing_file_BackendLibfabric_SendEndpointZeroCopy.cpp: Program Listing for File SendEndpointZeroCopy.cpp ================================================= |exhale_lsh| :ref:`Return to documentation for file ` (``BackendLibfabric/SendEndpointZeroCopy.cpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #include "SendEndpointZeroCopy.hpp" #include #include #include #include #include "Issues.hpp" #include "netio3-backend/Issues.hpp" #include "netio3-backend/Netio3Backend.hpp" netio3::libfabric::SendEndpointZeroCopy::SendEndpointZeroCopy( ActiveEndpoint& endpoint, const ConnectionParametersSendZeroCopy& connection_params, std::optional& shared_buffer_manager, DomainManager& domain_manager) : m_endpoint{endpoint}, m_conn_parameters{connection_params}, m_sender{m_endpoint.get()}, m_buffer_manager{ create_buffer_manager(connection_params, domain_manager)}, m_buffer_manager_ptr{std::invoke([this, &shared_buffer_manager] { if (m_buffer_manager) { return &m_buffer_manager.value(); } if (shared_buffer_manager.has_value()) { return &shared_buffer_manager.value(); } throw std::logic_error("Have no shared buffer manager, but also did not allocate one"); })} {} netio3::NetioStatus netio3::libfabric::SendEndpointZeroCopy::send_data( const std::span iov, const std::span header_data, const std::uint64_t key, fi_addr_t addr) { ZoneScoped; ERS_DEBUG(2, std::format("Send zero copy data with key {}", key)); const auto header = m_buffer_manager_ptr->get_header(header_data, key); if (header.status != NetioStatus::OK) { return header.status; } std::vector data{}; data.reserve(iov.size() + 1); data.push_back(header.data); data.insert(data.end(), iov.begin(), iov.end()); std::vector mrs{}; mrs.reserve(data.size()); mrs.push_back(m_buffer_manager_ptr->get_header_mr()); std::ranges::generate_n( std::back_inserter(mrs), iov.size(), [&] { return m_buffer_manager_ptr->get_mr(); }); auto status = m_sender.send_data(std::span{data}, mrs, header.bufnum, addr); if (status != NetioStatus::OK) { // Send operation failed, no reason to communicate the buffer number std::ignore = m_buffer_manager_ptr->return_header(header.bufnum); } return status; } netio3::NetioStatus netio3::libfabric::SendEndpointZeroCopy::send_data( const std::span data, const std::span header_data, const std::uint64_t key, fi_addr_t addr) { ZoneScoped; return send_data( std::vector{iovec{.iov_base = data.data(), .iov_len = data.size()}}, header_data, key, addr); } std::uint64_t netio3::libfabric::SendEndpointZeroCopy::release_buffer(const std::uint64_t bufnum) { ERS_DEBUG(2, std::format("Releasing buffer {}", bufnum)); return m_buffer_manager_ptr->return_header(bufnum); } std::size_t netio3::libfabric::SendEndpointZeroCopy::get_num_available_buffers() { ZoneScoped; return m_buffer_manager_ptr->get_num_available_buffers(); } std::vector netio3::libfabric::SendEndpointZeroCopy::get_pending_sends() { ZoneScoped; return m_buffer_manager_ptr->get_pending_sends(); } std::optional netio3::libfabric::SendEndpointZeroCopy::create_buffer_manager( const ConnectionParametersSendZeroCopy& connection_params, DomainManager& domain_manager) const { ZoneScoped; if (connection_params.use_shared_send_buffers) { return std::nullopt; } try { return std::optional{std::in_place, connection_params, domain_manager}; } catch (const LibFabricBufferError& e) { throw FailedOpenActiveEndpoint( m_endpoint.get().get_address().address(), m_endpoint.get().get_address().port(), e.message()); } }