Program Listing for File BackendLibfabric.cpp
↰ Return to documentation for file (BackendLibfabric/BackendLibfabric.cpp)
#include "BackendLibfabric.hpp"
#include <ers/ers.h>
#include "ConnectionManager.hpp"
#include "netio3-backend/Issues.hpp"
netio3::libfabric::BackendLibfabric::BackendLibfabric(const NetworkConfig& config,
std::shared_ptr<BaseEventLoop> evloop) :
NetworkBackend(config, evloop),
m_connection_manager{evloop.get(), m_config}
{}
void netio3::libfabric::BackendLibfabric::open_active_endpoint(
const EndPointAddress& address,
const ConnectionParameters& connection_params)
{
if (not check_ip_address(address)) {
throw InvalidEndpointAddress(ERS_HERE, address.address(), address.port());
}
m_connection_manager.open_active_endpoint(address, connection_params);
}
netio3::EndPointAddress netio3::libfabric::BackendLibfabric::open_listen_endpoint(
const EndPointAddress& address,
const ConnectionParameters& connection_params)
{
if (not check_ip_address(address)) {
throw InvalidEndpointAddress(ERS_HERE, address.address(), address.port());
}
ERS_DEBUG(
1,
std::format("Requested to open listen endpoint for {}:{}", address.address(), address.port()));
return m_connection_manager.open_listen_endpoint(address, connection_params);
}
void netio3::libfabric::BackendLibfabric::close_active_endpoint(const EndPointAddress& address)
{
if (not check_ip_address(address)) {
throw InvalidEndpointAddress(ERS_HERE, address.address(), address.port());
}
m_connection_manager.close_active_endpoint(address);
ERS_DEBUG(
1,
std::format("Requested to close active endpoint for {}:{}", address.address(), address.port()));
}
void netio3::libfabric::BackendLibfabric::close_listen_endpoint(const EndPointAddress& address)
{
if (not check_ip_address(address)) {
throw InvalidEndpointAddress(ERS_HERE, address.address(), address.port());
}
m_connection_manager.close_listen_endpoint(address);
ERS_DEBUG(
1,
std::format("Requested to close listen endpoint for {}:{}", address.address(), address.port()));
}
netio3::NetioStatus netio3::libfabric::BackendLibfabric::send_data(
const EndPointAddress& address,
const std::span<std::uint8_t> data,
const std::span<const std::uint8_t> header_data,
const std::uint64_t key)
{
return m_connection_manager.apply_to_send_endpoint_zero_copy(
address, [&data, &header_data, &key](std::unique_ptr<libfabric::SendEndpointZeroCopy>& endpoint) {
return endpoint->send_data(data, header_data, key);
});
}
netio3::NetioStatus netio3::libfabric::BackendLibfabric::send_data(
const EndPointAddress& address,
const std::span<const iovec> iov,
const std::span<const std::uint8_t> header_data,
const std::uint64_t key)
{
return m_connection_manager.apply_to_send_endpoint_zero_copy(
address, [&iov, &header_data, &key](std::unique_ptr<libfabric::SendEndpointZeroCopy>& endpoint) {
return endpoint->send_data(iov, header_data, key);
});
}
netio3::NetioStatus netio3::libfabric::BackendLibfabric::send_data_copy(
const EndPointAddress& /* address */,
const std::span<const std::uint8_t> /* data */,
const std::span<const std::uint8_t> /* header_data */,
const std::uint64_t /* key */)
{
throw NotSupported(ERS_HERE, "send_data_copy is not supported in libfabric backend");
}
netio3::NetioStatus netio3::libfabric::BackendLibfabric::send_data_copy(
const EndPointAddress& /* address */,
const std::span<const iovec> /* iov */,
const std::span<const std::uint8_t> /* header_data */,
const std::uint64_t /* key */)
{
throw NotSupported(ERS_HERE, "send_data_copy is not supported in libfabric backend");
}
netio3::NetworkBuffer* netio3::libfabric::BackendLibfabric::get_buffer(
const EndPointAddress& address)
{
return m_connection_manager.apply_to_send_endpoint_buffered(
address, [](std::unique_ptr<libfabric::SendEndpointBuffered>& endpoint) { return endpoint->get_buffer(); });
}
netio3::NetioStatus netio3::libfabric::BackendLibfabric::send_buffer(const EndPointAddress& address,
NetworkBuffer* buffer)
{
return m_connection_manager.apply_to_send_endpoint_buffered(
address, [buffer](std::unique_ptr<libfabric::SendEndpointBuffered>& endpoint) {
auto* actual_buffer = dynamic_cast<libfabric::Buffer*>(buffer);
if (actual_buffer == nullptr) {
ers::error(InvalidBuffer(ERS_HERE, "libfabric::Buffer"));
return NetioStatus::FAILED;
}
return endpoint->send_buffer(actual_buffer, buffer->pos());
});
}
std::size_t netio3::libfabric::BackendLibfabric::get_num_available_buffers(
const EndPointAddress& address)
{
return m_connection_manager.get_num_available_buffers(address);
}
bool netio3::libfabric::BackendLibfabric::check_ip_address(const EndPointAddress& address)
{
if (address.address().empty()) {
return false;
}
sockaddr_in sa{};
return inet_pton(AF_INET, address.address().c_str(), &(sa.sin_addr)) != 0;
}