.. _program_listing_file_BackendLibfabric_BackendLibfabricConnectionless.cpp: Program Listing for File BackendLibfabricConnectionless.cpp =========================================================== |exhale_lsh| :ref:`Return to documentation for file ` (``BackendLibfabric/BackendLibfabricConnectionless.cpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #include "BackendLibfabricConnectionless.hpp" #include #include "ConnectionlessEndpointManager.hpp" #include "netio3-backend/Issues.hpp" netio3::libfabric::BackendLibfabricConnectionless::BackendLibfabricConnectionless(const NetworkConfig& config, std::shared_ptr evloop) : NetworkBackend(config, evloop), m_event_loop{evloop.get()} {} void netio3::libfabric::BackendLibfabricConnectionless::open_active_endpoint( const EndPointAddress& address, const ConnectionParameters& connection_params) { if (not check_ip_address(address)) { throw InvalidEndpointAddress(ERS_HERE, address.address(), address.port()); } if (m_endpoint_manager == nullptr) { m_endpoint_manager = libfabric::ConnectionlessEndpointManager::create(m_config, address, m_event_loop, EndpointRole::Active); } if (connection_params.recv_params.num_buf > 0) { throw NotSupported("Receiving data through active endpoints is not supported in connectionless mode. Use open_listen_endpoint instead."); } m_endpoint_manager->open_send_endpoint(address, connection_params); } netio3::EndPointAddress netio3::libfabric::BackendLibfabricConnectionless::open_listen_endpoint( const EndPointAddress& address, const ConnectionParameters& connection_params) { if (not check_ip_address(address)) { throw InvalidEndpointAddress(ERS_HERE, address.address(), address.port()); } if (m_config.local_ip.has_value() && address.address() != *m_config.local_ip) { throw FailedOpenListenEndpoint( ERS_HERE, address.address(), address.port(), std::format("listen address {} does not match configured local_ip {}", address.address(), *m_config.local_ip)); } if (m_endpoint_manager == nullptr) { m_endpoint_manager = libfabric::ConnectionlessEndpointManager::create(m_config, address, m_event_loop, EndpointRole::Passive); } if (connection_params.send_buffered_params.num_buf > 0 or connection_params.send_zero_copy_params.mr_start != nullptr) { throw NotSupported("Sending data through listen endpoints is not supported in connectionless mode. Use open_active_endpoint instead."); } m_endpoint_manager->open_receive_endpoint(address, connection_params); return address; } void netio3::libfabric::BackendLibfabricConnectionless::close_active_endpoint(const EndPointAddress& address) { if (not check_ip_address(address)) { throw InvalidEndpointAddress(ERS_HERE, address.address(), address.port()); } m_endpoint_manager->close_send_endpoint(address); } void netio3::libfabric::BackendLibfabricConnectionless::close_listen_endpoint(const EndPointAddress& address) { if (not check_ip_address(address)) { throw InvalidEndpointAddress(ERS_HERE, address.address(), address.port()); } m_endpoint_manager->close_receive_endpoint(address); } netio3::NetioStatus netio3::libfabric::BackendLibfabricConnectionless::send_data( const EndPointAddress& address, const std::span data, const std::span header_data, const std::uint64_t key) { return m_endpoint_manager->apply_to_send_endpoint_zero_copy( address, [&data, &header_data, &key](libfabric::SendEndpointZeroCopy& endpoint, fi_addr_t addr) { return endpoint.send_data(data, header_data, key, addr); }); } netio3::NetioStatus netio3::libfabric::BackendLibfabricConnectionless::send_data( const EndPointAddress& address, const std::span iov, const std::span header_data, const std::uint64_t key) { return m_endpoint_manager->apply_to_send_endpoint_zero_copy( address, [&iov, &header_data, &key](libfabric::SendEndpointZeroCopy& endpoint, fi_addr_t addr) { return endpoint.send_data(iov, header_data, key, addr); }); } netio3::NetioStatus netio3::libfabric::BackendLibfabricConnectionless::send_data_copy( const EndPointAddress& /* address */, const std::span /* data */, const std::span /* header_data */, const std::uint64_t /* key */) { throw NotSupported(ERS_HERE, "send_data_copy is not supported in libfabric backend"); } netio3::NetioStatus netio3::libfabric::BackendLibfabricConnectionless::send_data_copy( const EndPointAddress& /* address */, const std::span /* iov */, const std::span /* header_data */, const std::uint64_t /* key */) { throw NotSupported(ERS_HERE, "send_data_copy is not supported in libfabric backend"); } netio3::NetworkBuffer* netio3::libfabric::BackendLibfabricConnectionless::get_buffer( const EndPointAddress& address) { return m_endpoint_manager->apply_to_send_endpoint_buffered( address, [](libfabric::SendEndpointBuffered& endpoint, fi_addr_t /*addr*/) { return endpoint.get_buffer(); }); } netio3::NetioStatus netio3::libfabric::BackendLibfabricConnectionless::send_buffer(const EndPointAddress& address, NetworkBuffer* buffer) { return m_endpoint_manager->apply_to_send_endpoint_buffered( address, [buffer](libfabric::SendEndpointBuffered& endpoint, fi_addr_t addr) { auto* actual_buffer = dynamic_cast(buffer); if (actual_buffer == nullptr) { ers::error(InvalidBuffer(ERS_HERE, "libfabric::Buffer")); return NetioStatus::FAILED; } return endpoint.send_buffer(actual_buffer, buffer->pos(), addr); }); } std::size_t netio3::libfabric::BackendLibfabricConnectionless::get_num_available_buffers( const EndPointAddress& address) { return m_endpoint_manager->get_num_available_buffers(address); } bool netio3::libfabric::BackendLibfabricConnectionless::check_ip_address(const EndPointAddress& address) { if (address.address().empty()) { return false; } sockaddr_in sa{}; return inet_pton(AF_INET, address.address().c_str(), &(sa.sin_addr)) != 0; }