Program Listing for File BackendLibfabricConnectionless.cpp

Return to documentation for file (BackendLibfabric/BackendLibfabricConnectionless.cpp)

#include "BackendLibfabricConnectionless.hpp"

#include <ers/ers.h>

#include "ConnectionlessEndpointManager.hpp"
#include "netio3-backend/Issues.hpp"

netio3::libfabric::BackendLibfabricConnectionless::BackendLibfabricConnectionless(const NetworkConfig& config,
                                                      std::shared_ptr<BaseEventLoop> evloop) :
  NetworkBackend(config, evloop),
  m_event_loop{evloop.get()}
{}

void netio3::libfabric::BackendLibfabricConnectionless::open_active_endpoint(
  const EndPointAddress& address,
  const ConnectionParameters& connection_params)
{
  if (not check_ip_address(address)) {
    throw InvalidEndpointAddress(ERS_HERE, address.address(), address.port());
  }
  if (m_endpoint_manager == nullptr) {
    m_endpoint_manager = libfabric::ConnectionlessEndpointManager::create(m_config, address, m_event_loop, 0);
  }
  if (connection_params.recv_params.num_buf > 0) {
    throw NotSupported("Receiving data through active endpoints is not supported in connectionless mode. Use open_listen_endpoint instead.");
  }
  m_endpoint_manager->open_send_endpoint(address, connection_params);
}

netio3::EndPointAddress netio3::libfabric::BackendLibfabricConnectionless::open_listen_endpoint(
  const EndPointAddress& address,
  const ConnectionParameters& connection_params)
{
  if (not check_ip_address(address)) {
    throw InvalidEndpointAddress(ERS_HERE, address.address(), address.port());
  }
  if (m_endpoint_manager == nullptr) {
    m_endpoint_manager = libfabric::ConnectionlessEndpointManager::create(m_config, address, m_event_loop, FI_SOURCE);
  }
  if (connection_params.send_buffered_params.num_buf > 0 or
      connection_params.send_zero_copy_params.mr_start != nullptr) {
    throw NotSupported("Sending data through listen endpoints is not supported in connectionless mode. Use open_active_endpoint instead.");
  }
  m_endpoint_manager->open_receive_endpoint(address, connection_params);
  return address;
}

void netio3::libfabric::BackendLibfabricConnectionless::close_active_endpoint(const EndPointAddress& address)
{
  if (not check_ip_address(address)) {
    throw InvalidEndpointAddress(ERS_HERE, address.address(), address.port());
  }
  m_endpoint_manager->close_send_endpoint(address);
}

void netio3::libfabric::BackendLibfabricConnectionless::close_listen_endpoint(const EndPointAddress& address)
{
  if (not check_ip_address(address)) {
    throw InvalidEndpointAddress(ERS_HERE, address.address(), address.port());
  }
  m_endpoint_manager->close_receive_endpoint(address);
}

netio3::NetioStatus netio3::libfabric::BackendLibfabricConnectionless::send_data(
  const EndPointAddress& address,
  const std::span<std::uint8_t> data,
  const std::span<const std::uint8_t> header_data,
  const std::uint64_t key)
{
  return m_endpoint_manager->apply_to_send_endpoint_zero_copy(
    address, [&data, &header_data, &key](libfabric::SendEndpointZeroCopy& endpoint, fi_addr_t addr) {
      return endpoint.send_data(data, header_data, key, addr);
    });
}

netio3::NetioStatus netio3::libfabric::BackendLibfabricConnectionless::send_data(
  const EndPointAddress& address,
  const std::span<const iovec> iov,
  const std::span<const std::uint8_t> header_data,
  const std::uint64_t key)
{
  return m_endpoint_manager->apply_to_send_endpoint_zero_copy(
    address, [&iov, &header_data, &key](libfabric::SendEndpointZeroCopy& endpoint, fi_addr_t addr) {
      return endpoint.send_data(iov, header_data, key, addr);
    });
}

netio3::NetioStatus netio3::libfabric::BackendLibfabricConnectionless::send_data_copy(
  const EndPointAddress& /* address */,
  const std::span<const std::uint8_t> /* data */,
  const std::span<const std::uint8_t> /* header_data */,
  const std::uint64_t /* key */)
{
  throw NotSupported(ERS_HERE, "send_data_copy is not supported in libfabric backend");
}

netio3::NetioStatus netio3::libfabric::BackendLibfabricConnectionless::send_data_copy(
  const EndPointAddress& /* address */,
  const std::span<const iovec> /* iov */,
  const std::span<const std::uint8_t> /* header_data */,
  const std::uint64_t /* key */)
{
  throw NotSupported(ERS_HERE, "send_data_copy is not supported in libfabric backend");
}

netio3::NetworkBuffer* netio3::libfabric::BackendLibfabricConnectionless::get_buffer(
  const EndPointAddress& address)
{
  return m_endpoint_manager->apply_to_send_endpoint_buffered(
    address, [](libfabric::SendEndpointBuffered& endpoint, fi_addr_t /*addr*/) { return endpoint.get_buffer(); });
}

netio3::NetioStatus netio3::libfabric::BackendLibfabricConnectionless::send_buffer(const EndPointAddress& address,
                                                                     NetworkBuffer* buffer)
{
  return m_endpoint_manager->apply_to_send_endpoint_buffered(
    address, [buffer](libfabric::SendEndpointBuffered& endpoint, fi_addr_t addr) {
      auto* actual_buffer = dynamic_cast<libfabric::Buffer*>(buffer);
      if (actual_buffer == nullptr) {
        ers::error(InvalidBuffer(ERS_HERE, "libfabric::Buffer"));
        return NetioStatus::FAILED;
      }
      return endpoint.send_buffer(actual_buffer, buffer->pos(), addr);
    });
}

std::size_t netio3::libfabric::BackendLibfabricConnectionless::get_num_available_buffers(
  const EndPointAddress& address)
{
  return m_endpoint_manager->get_num_available_buffers(address);
}

bool netio3::libfabric::BackendLibfabricConnectionless::check_ip_address(const EndPointAddress& address)
{
  if (address.address().empty()) {
    return false;
  }
  sockaddr_in sa{};
  return inet_pton(AF_INET, address.address().c_str(), &(sa.sin_addr)) != 0;
}