Program Listing for File BackendLibfabricConnectionless.cpp
↰ Return to documentation for file (BackendLibfabric/BackendLibfabricConnectionless.cpp)
#include "BackendLibfabricConnectionless.hpp"
#include <ers/ers.h>
#include "ConnectionlessEndpointManager.hpp"
#include "netio3-backend/Issues.hpp"
netio3::libfabric::BackendLibfabricConnectionless::BackendLibfabricConnectionless(const NetworkConfig& config,
std::shared_ptr<BaseEventLoop> evloop) :
NetworkBackend(config, evloop),
m_event_loop{evloop.get()}
{}
void netio3::libfabric::BackendLibfabricConnectionless::open_active_endpoint(
const EndPointAddress& address,
const ConnectionParameters& connection_params)
{
if (not check_ip_address(address)) {
throw InvalidEndpointAddress(ERS_HERE, address.address(), address.port());
}
if (m_endpoint_manager == nullptr) {
m_endpoint_manager = libfabric::ConnectionlessEndpointManager::create(m_config, address, m_event_loop, 0);
}
if (connection_params.recv_params.num_buf > 0) {
throw NotSupported("Receiving data through active endpoints is not supported in connectionless mode. Use open_listen_endpoint instead.");
}
m_endpoint_manager->open_send_endpoint(address, connection_params);
}
netio3::EndPointAddress netio3::libfabric::BackendLibfabricConnectionless::open_listen_endpoint(
const EndPointAddress& address,
const ConnectionParameters& connection_params)
{
if (not check_ip_address(address)) {
throw InvalidEndpointAddress(ERS_HERE, address.address(), address.port());
}
if (m_endpoint_manager == nullptr) {
m_endpoint_manager = libfabric::ConnectionlessEndpointManager::create(m_config, address, m_event_loop, FI_SOURCE);
}
if (connection_params.send_buffered_params.num_buf > 0 or
connection_params.send_zero_copy_params.mr_start != nullptr) {
throw NotSupported("Sending data through listen endpoints is not supported in connectionless mode. Use open_active_endpoint instead.");
}
m_endpoint_manager->open_receive_endpoint(address, connection_params);
return address;
}
void netio3::libfabric::BackendLibfabricConnectionless::close_active_endpoint(const EndPointAddress& address)
{
if (not check_ip_address(address)) {
throw InvalidEndpointAddress(ERS_HERE, address.address(), address.port());
}
m_endpoint_manager->close_send_endpoint(address);
}
void netio3::libfabric::BackendLibfabricConnectionless::close_listen_endpoint(const EndPointAddress& address)
{
if (not check_ip_address(address)) {
throw InvalidEndpointAddress(ERS_HERE, address.address(), address.port());
}
m_endpoint_manager->close_receive_endpoint(address);
}
netio3::NetioStatus netio3::libfabric::BackendLibfabricConnectionless::send_data(
const EndPointAddress& address,
const std::span<std::uint8_t> data,
const std::span<const std::uint8_t> header_data,
const std::uint64_t key)
{
return m_endpoint_manager->apply_to_send_endpoint_zero_copy(
address, [&data, &header_data, &key](libfabric::SendEndpointZeroCopy& endpoint, fi_addr_t addr) {
return endpoint.send_data(data, header_data, key, addr);
});
}
netio3::NetioStatus netio3::libfabric::BackendLibfabricConnectionless::send_data(
const EndPointAddress& address,
const std::span<const iovec> iov,
const std::span<const std::uint8_t> header_data,
const std::uint64_t key)
{
return m_endpoint_manager->apply_to_send_endpoint_zero_copy(
address, [&iov, &header_data, &key](libfabric::SendEndpointZeroCopy& endpoint, fi_addr_t addr) {
return endpoint.send_data(iov, header_data, key, addr);
});
}
netio3::NetioStatus netio3::libfabric::BackendLibfabricConnectionless::send_data_copy(
const EndPointAddress& /* address */,
const std::span<const std::uint8_t> /* data */,
const std::span<const std::uint8_t> /* header_data */,
const std::uint64_t /* key */)
{
throw NotSupported(ERS_HERE, "send_data_copy is not supported in libfabric backend");
}
netio3::NetioStatus netio3::libfabric::BackendLibfabricConnectionless::send_data_copy(
const EndPointAddress& /* address */,
const std::span<const iovec> /* iov */,
const std::span<const std::uint8_t> /* header_data */,
const std::uint64_t /* key */)
{
throw NotSupported(ERS_HERE, "send_data_copy is not supported in libfabric backend");
}
netio3::NetworkBuffer* netio3::libfabric::BackendLibfabricConnectionless::get_buffer(
const EndPointAddress& address)
{
return m_endpoint_manager->apply_to_send_endpoint_buffered(
address, [](libfabric::SendEndpointBuffered& endpoint, fi_addr_t /*addr*/) { return endpoint.get_buffer(); });
}
netio3::NetioStatus netio3::libfabric::BackendLibfabricConnectionless::send_buffer(const EndPointAddress& address,
NetworkBuffer* buffer)
{
return m_endpoint_manager->apply_to_send_endpoint_buffered(
address, [buffer](libfabric::SendEndpointBuffered& endpoint, fi_addr_t addr) {
auto* actual_buffer = dynamic_cast<libfabric::Buffer*>(buffer);
if (actual_buffer == nullptr) {
ers::error(InvalidBuffer(ERS_HERE, "libfabric::Buffer"));
return NetioStatus::FAILED;
}
return endpoint.send_buffer(actual_buffer, buffer->pos(), addr);
});
}
std::size_t netio3::libfabric::BackendLibfabricConnectionless::get_num_available_buffers(
const EndPointAddress& address)
{
return m_endpoint_manager->get_num_available_buffers(address);
}
bool netio3::libfabric::BackendLibfabricConnectionless::check_ip_address(const EndPointAddress& address)
{
if (address.address().empty()) {
return false;
}
sockaddr_in sa{};
return inet_pton(AF_INET, address.address().c_str(), &(sa.sin_addr)) != 0;
}