.. _program_listing_file_BackendLibfabric_BackendLibfabric.cpp: Program Listing for File BackendLibfabric.cpp ============================================= |exhale_lsh| :ref:`Return to documentation for file ` (``BackendLibfabric/BackendLibfabric.cpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #include "BackendLibfabric.hpp" #include #include "ConnectionManager.hpp" #include "netio3-backend/Issues.hpp" netio3::libfabric::BackendLibfabric::BackendLibfabric(const NetworkConfig& config, std::shared_ptr evloop) : NetworkBackend(config, evloop), m_connection_manager{libfabric::ConnectionManager::create(evloop.get(), m_config)} {} void netio3::libfabric::BackendLibfabric::open_send_endpoint( const EndPointAddress& address, const ConnectionParameters& connection_params) { if (not check_ip_address(address)) { throw InvalidEndpointAddress(ERS_HERE, address.address(), address.port()); } if (connection_params.mr_start == nullptr) { m_connection_manager->open_send_endpoint_buffered(address, connection_params); ERS_DEBUG(1, std::format("Requested to open send endpoint for {}:{} with buffered socket", address.address(), address.port())); } else { m_connection_manager->open_send_endpoint_zero_copy(address, connection_params); ERS_DEBUG(1, std::format("Requested to open send endpoint for {}:{} with zero copy socket", address.address(), address.port())); } } netio3::EndPointAddress netio3::libfabric::BackendLibfabric::open_listen_endpoint( const EndPointAddress& address, const ConnectionParametersRecv& connection_params) { if (not check_ip_address(address)) { throw InvalidEndpointAddress(ERS_HERE, address.address(), address.port()); } ERS_DEBUG( 1, std::format("Requested to open listen endpoint for {}:{}", address.address(), address.port())); return m_connection_manager->open_listen_endpoint(address, connection_params); } void netio3::libfabric::BackendLibfabric::close_send_endpoint(const EndPointAddress& address) { if (not check_ip_address(address)) { throw InvalidEndpointAddress(ERS_HERE, address.address(), address.port()); } m_connection_manager->close_send_endpoint(address); ERS_DEBUG( 1, std::format("Requested to close send endpoint for {}:{}", address.address(), address.port())); } void netio3::libfabric::BackendLibfabric::close_listen_endpoint(const EndPointAddress& address) { if (not check_ip_address(address)) { throw InvalidEndpointAddress(ERS_HERE, address.address(), address.port()); } m_connection_manager->close_listen_endpoint(address); ERS_DEBUG( 1, std::format("Requested to close listen endpoint for {}:{}", address.address(), address.port())); } netio3::NetioStatus netio3::libfabric::BackendLibfabric::send_data( const EndPointAddress& address, const std::span data, const std::span header_data, const std::uint64_t key) { return m_connection_manager->apply_to_send_socket_zero_copy( address, [&data, &header_data, &key](libfabric::SendSocketZeroCopy& socket) { return socket.send_data(data, header_data, key); }); } netio3::NetioStatus netio3::libfabric::BackendLibfabric::send_data( const EndPointAddress& address, const std::span iov, const std::span header_data, const std::uint64_t key) { return m_connection_manager->apply_to_send_socket_zero_copy( address, [&iov, &header_data, &key](libfabric::SendSocketZeroCopy& socket) { return socket.send_data(iov, header_data, key); }); } netio3::NetioStatus netio3::libfabric::BackendLibfabric::send_data_copy( const EndPointAddress& /* address */, const std::span /* data */, const std::span /* header_data */, const std::uint64_t /* key */) { throw NotSupported(ERS_HERE, "send_data_copy is not supported in libfabric backend"); } netio3::NetioStatus netio3::libfabric::BackendLibfabric::send_data_copy( const EndPointAddress& /* address */, const std::span /* iov */, const std::span /* header_data */, const std::uint64_t /* key */) { throw NotSupported(ERS_HERE, "send_data_copy is not supported in libfabric backend"); } netio3::NetworkBuffer* netio3::libfabric::BackendLibfabric::get_buffer( const EndPointAddress& address) { return m_connection_manager->apply_to_send_socket_buffered( address, [](libfabric::SendSocketBuffered& socket) { return socket.get_buffer(); }); } netio3::NetioStatus netio3::libfabric::BackendLibfabric::send_buffer(const EndPointAddress& address, NetworkBuffer* buffer) { return m_connection_manager->apply_to_send_socket_buffered( address, [buffer](libfabric::SendSocketBuffered& endpoint) { auto* actual_buffer = dynamic_cast(buffer); if (actual_buffer == nullptr) { ers::error(InvalidBuffer(ERS_HERE, "libfabric::Buffer")); return NetioStatus::FAILED; } return endpoint.send_buffer(actual_buffer, buffer->pos()); }); } std::size_t netio3::libfabric::BackendLibfabric::get_num_available_buffers( const EndPointAddress& address) { return m_connection_manager->get_num_available_buffers(address); } bool netio3::libfabric::BackendLibfabric::check_ip_address(const EndPointAddress& address) { if (address.address().empty()) { return false; } sockaddr_in sa{}; return inet_pton(AF_INET, address.address().c_str(), &(sa.sin_addr)) != 0; }