Network communication

This chapter describes how to establish or close connections and how to send and receive data including the handling of errors. The interface is identical for both the asyncmsg and the libfabric backend. The next chapters will describe the differences in the implementation in detail.

Creating a backend instance

The backend should be constructed using the Netio3Backend::create factory function. This function takes the backend type (asyncmsg or libfabric), the configuration, and an event loop as arguments. The configuration contains the following parameters:

  • NetworkMode mode: Should be NetworkMode::RDMA for libfrabric and NetworkMode::TCP for asyncmsg.

  • ThreadSafetyModel thread_safety: Either ThreadSafetyModel::SAFE or ThreadSafetyModel::UNSAFE. See below for more information. (Thread safety)

  • Configuration for shared receive buffers: - bool use_shared_receive_buffers: If set to true, the backend will use shared receive buffers. If false, the following parameters are ignored. - std::size_t buf_size: The size of the receive buffer. - std::size_t num_buf: The number of receive buffers.

  • Configuration for shared send buffers: - bool use_shared_send_buffers: If set to true, the backend will use shared send buffers. If false, the following parameters are ignored. - std::size_t buf_size: The size of the send buffer. - std::size_t num_buf: The number of send buffers.

  • Callbacks to be invoked on certain events: - OnDataCb on_data_cb: A callback that is executed when data is received. - OnConnectionEstablishedCb on_connection_established_cb: A callback that is executed when a connection is established. - OnConnectionClosedKeysCb on_connection_closed_cb: A callback that is executed when a connection is closed. - OnConnectionRefusedCb on_connection_refused_cb: A callback that is executed when a connection is refused. - OnSendCompleted on_send_completed_cb: A callback that is executed when a send operation is completed.

Shared receive buffers and shared send buffers allow sharing the same set of resources (like buffers) between all connections. This can be useful to reduce memory consumption for applications opening many connections as the memory footprint is constant and does not grow with the number of connections. It might incur a small performance penalty if only few connections are opened (and, of course, all buffers are shared and not dedicated to a single connection so if one connection is heavily used, it might impact the performance of others).

Tip

For shared receive buffers, the buffers can be shared either between all connections or between all connections that are created from a listen endpoint. For the latter, the use_shared_receive_buffers parameter must be set to true when opening the listen endpoint.

The callback signatures are explained in the relevant sections.

auto event_loop = std::make_shared<EpollEventLoop>();
auto backend = Netio3Backend::create(
  BackendType::ASYNCMSG,
  {
    .mode = NetworkMode::RDMA,
    .thread_safety = ThreadSafetyModel::SAFE,
    // Callbacks
  },
  evloop
);

Note

It is possible to use NetworkMode::TCP for the libfabric backend. This is only useful for testing the libfabric backend without RDMA capabilities. It is not recommended to be used in production. Use the asyncmsg backend for TCP communication. On the other hand, the asyncmsg will always use TCP even if NetworkMode::RDMA is passed.

Opening a connection

Opening a connection requires two sides: A listener and an active endpoint connecting to the listener. Established connections are bidirectional, meaning that both sides can send and receive data.

Listen endpoint (accepting)

To accept connections, the backend must open a listen endpoint to listen for incoming connections. This is done by calling the open_listen_endpoint function on the backend instance. This function takes an EndPointAddress containing IP address and port, and connection parameters as arguments. The connection parameters determine the capabilities of all active endpoints spawned by the listen endpoint contain the following parameters:

  • recv_params for receiving data with: - std::size_t buf_size: The size of the receive buffer. - std::size_t num_buf: The number of receive buffers. - bool use_shared_receive_buffers: If set to true, all active endpoints spawned by this

    listen endpoint will share receive buffers.

  • send_buffered_params for buffered sending with: - std::size_t buf_size: The size of the send buffer. - std::size_t num_buf: The number of send buffers.

  • send_zero_copy_params for zero-copy sending with: - std::size_t buf_size: The size of the send buffer. - mr_start: The start of the buffer/memory region to send from.

These parameters are passed to all active endpoints that are created from this listen endpoint. They define if the spawned active endpoints can receive data, send data using buffered sending or zero-copy sending.

For receiving data, the parameters define the size and number of buffers that are used to receive data. These parameters define the maximum size of messages that can be received and the maximum number of messages that can be received simultanously (i.e. before being processed by the on_data callback). For shared receive buffers, see the explanation above.

The send methods will be described below.

If num_buf for either receiving or buffered sending is set to 0, the capability is not requested. If mr_start is set to nullptr, zero-copy sending is not requested. A capability not being requested does not mean that it is not supported. Providers like asyncmsg that do not require allocating buffers for data reception or registering memory regions for zero-copy sending can still support these capabilities.

Caution

Some providers (e.g. libfabric) cannot support buffered and zero-cop sending at the same time.

Tip

Zero-copy and receive parameters are only used by the libfabric backend. The asyncmsg backend does not need to create dedicated buffers to receive data (see asyncmsg documentation).

Tip

If port 0 is passed as the port number, the operating system will assign a random port number. The function will return the assigned port number.

If opening the listen endpoint fails, the function can throw an exception:

  • InvalidEndpointAddress if the endpoint address is invalid.

  • ListenEndpointAlreadyExists If already requested to listen on this address and port.

  • FailedOpenListenEndpoint if the listen endpoint could not be opened.

virtual EndPointAddress netio3::NetworkBackend::open_listen_endpoint(const EndPointAddress &address, const ConnectionParameters &connection_params) = 0

Opens a listen endpoint for the specified address with the given connection parameters.

Should accept all incoming connections. The connection parameters are passed to the spawned active endpoint. If port 0 was passed in, it should return the port number that was used to open the listen endpoint.

Parameters:
  • address – The address of the endpoint to open (local address)

  • connection_params – The connection parameters for spawned endpoints

Throws:
  • InvalidEndpointAddress – if the endpoint address is invalid

  • ListenEndpointAlreadyExists – If already requested to listen on this address and port

  • FailedOpenListenEndpoint – if the listen endpoint could not be opened

  • InvalidConnectionParameters – if the connection parameters conflict with global settings (shared buffers)

Returns:

The actual address the server is listening on

Active endpoint (connecting)

To connect to a listener, the other side must have opened a listen endpoint. Then, the intiator can connect to the listen endpoint by calling the open_active_endpoint function. This function takes an EndPointAddress containing IP address and port of the listen endpoint, and connection parameters as arguments. The connection parameters contain the same parameters as the listen endpoint. They define the capabilities of the active endpoint that is created.

Caution

Some providers (e.g. libfabric) cannot support buffered and zero-cop sending at the same time.

If the request is invalid so that it will not even be attempted to create a connection, the function can throw three exceptions:

  • InvalidEndpointAddress if the endpoint address is invalid.

  • ActiveEndpointAlreadyExists if the active endpoint already exists.

  • FailedOpenActiveEndpoint if opening the endpoint failed for other reasons.

Note

If the connection was successfully requested to open but refused by the other side, a callback is executed instead of throwing an exception. This is required since the connection is established asynchronously.

Warning

doxygenfunction: Cannot find function “netio3::Netio3Backend::open_active_endpoint” in doxygen xml output for project “netio3-backend” from directory: ./_doxygen_build/xml

Callbacks

Once a connection has been established (for both sender and receiver), the on_connection_established_cb callback is executed. This callback is passed the EndPointAddress of the remote endpoint and its capabilities (send buffered, by copy, zero-copy, and receive). If opening a connection failed the on_connection_refused_cb callback is executed. This callback is passed the EndPointAddress of the remote endpoint. This function will only be called for the sender.

Example

auto evloop = std::make_shared<EpollEventLoop>();
auto backend = Netio3Backend::create(
  BackendType::LIBFABRIC,
  {
    .mode = NetworkMode::RDMA,
    .thread_safety = ThreadSafetyModel::SAFE,
    .on_connection_established_cb = [](const EndPointAddress& remote, EndpointCapabilities capabilities) {
      std::cout << std::format("Connection established with {}:{}\n", remote.address, remote.port);
    },
    .on_connection_refused_cb = [](const EndPointAddress& remote) {
      std::cout << std::format("Connection refused by {}:{}\n", remote.address, remote.port);
    },
  },
  evloop
);

auto thread = std::jthread([this]() { m_evloop->run(); });

const auto listen_endpoint = backend->open_listen_endpoint({ip, 0}, {recv_params{.buf_size = 4096, .num_buf = 16}});
backend->open_active_endpoint(listen_endpoint, {.send_buffered_params{.buf_size = 4096, .num_buf = 16}});
// Will invoke on_connection_established_cb

const auto wrong_listen_port = 1234;
backend->open_active_endpoint({ip, wrong_listen_port}, {.send_buffered_params{.buf_size = 4096, .num_buf = 16}});
// Will invoke on_connection_refused_cb because no listen endpoint is open on port 1234 (unless we got really really lucky)

Note

The event loop must be running for the connection to be established. Otherwise the events will not be processed.

Sending data

There are three different modes of sending:

  • Buffered sending

  • Zero-copy sending

  • Immediate sending copying the data

Caution

You can only use buffered sending for active endpoints that were created with a mr_start of nullptr.

For libfabric, you can only use zero-copy sending if the active endpoint was created with a mr_start that is not a nullptr.

Immediate sending is only available for ASYNCMSG.

Send operations are always asynchronous. The on_send_completed_cb callback is executed when the send operation is completed. To match the send operation with the data that was sent, a key is passed to the callback in addition to the remote endpoint address. For buffered sending, this key corresponds to the index of the buffer that was sent. For the other send methods, the key is passed as an argument to the send function.

Success or failure is communicated using status codes. There are three possible status codes: - NetioStatus::OK: The operation was successful. - NetioStatus::NO_RESOURCES: The request can currently not be processed. For example, this can happen if no buffers are available. Retrying after resources have become available is likely to succeed. - NetioStatus::FAILED: Sending failed and retrying is unlikely to succeed.

All send functions report their completion using the on_send_completed_cb callback. The callback is passed the remote endpoint address and the key that was passed to the send function or, for buffered sending, the buffer ID.

Buffered sending

Important

You can only use buffered sending for active endpoints that were created with a num_buf greater than 0.

The buffered sending API consists of two functions: get_buffer and send_buffer. get_buffer returns a pointer to a buffer if one is available. If no buffer is available the function returns a nullptr. The buffer can be filled with data and then sent using the send_buffer function. Since buffers are allocated per active endpoint, the get_buffer function takes the endpoint as an argument. The send_buffer function takes the buffer pointer and the endpoint as arguments and returns a status code.

Both functions can throw an InvalidSendEndpoint exception if the endpoint is invalid (no connection is open). get_buffer can also throw a NoBuffersAllocated exception if no buffers were allocated because the endpoint was configured for zero-copy sending.

auto* buffer = backend->get_buffer(send_endpoint);
if (buffer != nullptr) {  // Always check if a buffer was returned
  buffer->write(data);
  const auto status = backend->send_buffer(send_endpoint, buffer);
  switch (status) {
    // Handle status
  }
}

Tip

The buffer supports multiple write functions taking a span of bytes, an iovec instance or any integral type.

virtual NetworkBuffer *netio3::NetworkBackend::get_buffer(const EndPointAddress &address) = 0

Retrieves a network buffer for the specified endpoint address.

Only works for endpoints that are opened for buffered sending. If no buffer is available returns a nullptr.

@important The user needs to check that the returned buffer is not null.

Parameters:

address – The address of the endpoint to retrieve the buffer for

Throws:
  • UnknownActiveEndpoint – if the endpoint does not exist or has no send capabilities

  • NoBuffersAllocated – if no buffers were allocated

Returns:

A pointer to the network buffer

virtual NetioStatus netio3::NetworkBackend::send_buffer(const EndPointAddress &address, NetworkBuffer *buffer) = 0

Sends a network buffer to the specified address.

This function sends the data buffer to the specified endpoint. A connection must be registered for this endpoint before sending data. If the buffer is not a buffer handed out through get_buffer FAILED is returned. The buffer will be returned to the connection after the send operation is completed.

Parameters:
  • address – The address to send the buffer to

  • buffer – The network buffer to send

Throws:

UnknownActiveEndpoint – if the endpoint does not exist or has no send capabilities

Returns:

The status of the send operation

Zero-copy sending

Zero-copy sending avoids copying the data into a buffer. Not copying the data can be more performant but comes with the restriction that the data has to remain valid until the send operation is completed. Note, that the library can not guarantee this. It is the responsibility of the user to ensure that the data is not modified (changed, deleted, moved or touched in any way) until the send operation is completed. Failure to do so leads to undefined behavior and likely segmentation faults.

Important

For libfabric, you can only use zero-copy sending if the active endpoint was created with a mr_start that is not a nullptr. The data can only be sent from inside that provided memory region.

Note

TCP will internally still copy the data into kernel space. Only for RDMA this is a true zero-copy operation. However, the additional copy into the buffer is still avoided.

In addition to the data (that has to be inside the registered memory region for RDMA), a small amount of additional data can be passed to the send function. This data (up to 16 bytes) will be prepended to the payload and allows passing extra information that does not have to be located in the memory region. It also can be discarded after calling send_data. Data can be passed in two different ways: As a span of bytes or a vector of iovec instances. The key provided to send_data will not be transmitted over the network. It is only used to match the send operation with the completion callback.

The function can throw the following exceptions:

  • InvalidEndpointAddress if the address is invalid.

  • UnknownActiveEndpoint if no connection was opened on this endpoint.

std::vector<std::uint8_t> data = {0x1, 0x2};
const std::vector<std::uint8_t> header_data = {0x0};
const auto key = std::uint64_t{0};
backend->open_active_endpoint({ip, listen_port}, {.send_zero_copy_params{.buf_size = data.size(), .mr_start = data.data()}});
const auto status = backend->send_data(send_endpoint, data, header_data, key);
// handle status
// header_data can be discarded here
// data must not be modified until on_send_completed_cb with matching key is called

Warning

doxygenfunction: Unable to resolve function “netio3::NetworkBackend::send_data” with arguments None in doxygen xml output for project “netio3-backend” from directory: ./_doxygen_build/xml. Potential matches:

- NetioStatus send_data(const EndPointAddress &address, std::span<const iovec> iov, std::span<const std::uint8_t> header_data, std::uint64_t key) = 0
- NetioStatus send_data(const EndPointAddress &address, std::span<std::uint8_t> data, std::span<const std::uint8_t> header_data, std::uint64_t key) = 0

Immediate sending

Important

Immediate sending is only available for asyncmsg.

Immediate sending is the most convenient way to send data for TCP. It neither requires buffer allocation nor does it put any constraints on memory management on the user. It comes at the cost of an additional copy of the data into the library’s memory space and is not supported for RDMA. The signature is identical to the zero-copy sending function.

Note

The separation between data and header data is not necessary for immediate sending but kept for consistency with the zero-copy send functions.

The function can throw the following exceptions:

  • InvalidEndpointAddress if the address is invalid.

  • UnknownActiveEndpoint if no connection was opened on this endpoint.

  • NotSupported if the backend does not support immediate sending.

const std::vector<std::uint8_t> data = {0x1, 0x2};
const std::vector<std::uint8_t> header_data = {0x0};
const auto key = std::uint64_t{0};
backend->open_active_endpoint({ip, listen_port}, {});
const auto status = backend->send_data_copy(send_endpoint, data, header_data, key);
// handle status
// header_data can be discarded here
// data can be modified or deleted immediately

Warning

doxygenfunction: Unable to resolve function “netio3::NetworkBackend::send_data_copy” with arguments None in doxygen xml output for project “netio3-backend” from directory: ./_doxygen_build/xml. Potential matches:

- NetioStatus send_data_copy(const EndPointAddress &address, std::span<const iovec> iov, std::span<const std::uint8_t> header_data, std::uint64_t key) = 0
- NetioStatus send_data_copy(const EndPointAddress &address, std::span<const std::uint8_t> data, std::span<const std::uint8_t> header_data, std::uint64_t key) = 0

Data reception

Data is received via the on_data_cb callback. Errors receiving data are not reported.

Important

The data provided in the callback is only valid within the callback. If the data needs to be handled after the callback execution has been finished it has to be copied.

Closing a connection

A connection can be closed using the close_active_endpoint or close_listen_endpoint functions. Both functions take the endpoint as an argument. As all other operations, closing a connection is asynchronous. The on_connection_closed_cb callback is executed when the connection is closed. The callback receives the remote endpoint address and all keys of outstanding (and therefore cancelled) send operations as an argument. Closing a listen endpoint will also close all active endpoints that were created from this listen endpoint. The on_connection_closed_cb callback will in fact only be called for active endpoints and not listen endpoints. Closing a listen endpoint that never spawned an active endpoint will not trigger the callback.

close_active_endpoint can be used to close active endpoints that was opened both by calling open_active_endpoint or by accepting connections on listeners.

The close_active_endpoint function can throw the following exceptions:

  • InvalidEndpointAddress if the endpoint address is invalid.

  • UnknownActiveEndpoint if the active endpoint does not exist.

  • FailedCloseActiveEndpoint if closing the send socket failed.

The close_listen_endpoint function can throw the following exceptions:

  • InvalidEndpointAddress if the endpoint address is invalid.

  • UnknownListenEndpoint if the listen endpoint does not exist.

  • FailedCloseListenEndpoint if closing the listen socket failed.

Warning

doxygenfunction: Cannot find function “netio3::Netio3Backend::close_active_endpoint” in doxygen xml output for project “netio3-backend” from directory: ./_doxygen_build/xml

virtual void netio3::NetworkBackend::close_listen_endpoint(const EndPointAddress &address) = 0

Closes the listen endpoint for the specified address.

Shall also close all active endpoints that were spawned by this listen endpoint.

May enqueue the endpoint to be closed by the event loop for thread synchronization purposes.

Parameters:

address – The address of the endpoint to close

Throws:
  • InvalidEndpointAddress – if the endpoint address is invalid

  • UnknownListenEndpoint – if the listen endpoint does not exist

  • FailedCloseListenEndpoint – if closing the listen endpoint failed

Thread safety

The backend can be configured to be thread-safe or not. The thread safety model is passed to the backend factory function. If the backend is thread-safe, all functions can be called from any thread. If the backend is not thread-safe, all functions must be called from the same thread that runs the event loop. Note, that for the thread-unsafe mode even sending data has to be executed on the event loop thread which requires the use of either signals or timers.

Note

The asyncmsg backend is always thread-safe.

Example

Receiver:

auto evloop = std::make_shared<EpollEventLoop>();
auto backend = Netio3Backend::create(
  BackendType::LIBFABRIC,
  {
    .mode = NetworkMode::RDMA,
    .thread_safety = ThreadSafetyModel::SAFE,
    .on_data_cb = [](std::span<const std::uint8_t> data) {
      std::cout << std::format("Received data of size {}\n", std::size(data));
    },
    .on_connection_established_cb = [](const EndPointAddress& remote, EndpointCapabilities capabilities) {
      std::cout << std::format("Connection established with {}:{}\n", remote.address, remote.port);
      // capabilities will have true for receive, false for the rest
    },
    .on_connection_closed_cb = [](const EndPointAddress& remote, const std::vector<std::uint64_t>& keys) {
      std::cout << std::format("Connection closed with {}:{}\n", remote.address, remote.port);
    },
    .on_connection_refused_cb = [](const EndPointAddress& remote) {
      std::cout << std::format("Connection refused by {}:{}\n", remote.address, remote.port);
    },
  },
  evloop
);
auto thread = std::jthread([this]() { m_evloop->run(); });
// Open a listen endpoint to accept connections and receive data on those
const auto listen_endpoint = backend->open_listen_endpoint({ip, port}, {.recv_params{.buf_size = 4096, .num_buf = 16}});
// Wait for connection
// Receive data
backend->close_listen_endpoint(listen_endpoint);
evloop->stop();

Sender:

auto evloop = std::make_shared<EpollEventLoop>();
auto backend = Netio3Backend::create(
  BackendType::LIBFABRIC,
  {
    .mode = NetworkMode::RDMA,
    .thread_safety = ThreadSafetyModel::SAFE,
    .on_connection_established_cb = [](const EndPointAddress& remote, EndpointCapabilities capabilities) {
      std::cout << std::format("Connection established with {}:{}\n", remote.address, remote.port);
      // capabilities will have true for buffered sending, false for the rest
    },
    .on_connection_closed_cb = [](const EndPointAddress& remote, const std::vector<std::uint64_t>& keys) {
      std::cout << std::format("Connection closed with {}:{}\n", remote.address, remote.port);
    },
    .on_connection_refused_cb = [](const EndPointAddress& remote) {
      std::cout << std::format("Connection refused by {}:{}\n", remote.address, remote.port);
    },
    .on_send_completed_cb = [](const EndPointAddress& remote, std::uint64_t key) {
      std::cout << std::format("Send operation to {}:{} with key {} completed\n", remote.address, remote.port, key);
    },
  },
  evloop
);
auto thread = std::jthread([this]() { m_evloop->run(); });
const auto endpoint = EndPointAddress{ip, port};
// Open a send endpoint to send data to the receiver (buffered sending)
backend->open_active_endpoint(endpoint, {.send_buffered_params{.buf_size = 4096, .num_buf = 16}});
// Wait for connection
auto* buf = backend->get_buffer(endpoint);
if (buf != nullptr) {
  buf->write(data);
  const auto status = backend->send_buffer(endpoint, buf);
  switch (status) {
    // Handle status
    // Try again if NO_RESOURCES was returned or throw exception if FAILED was returned (for example)
  }
}

backend->close_active_endpoint(endpoint);
evloop->stop();