Network communication
This chapter describes how to establish or close connections and how to send and receive data including the handling of errors. The interface is identical for both the asyncmsg and the libfabric backend. The next chapters will describe the differences in the implementation in detail.
Creating a backend instance
The backend should be constructed using the Netio3Backend::create
factory function. This function takes the backend type (asyncmsg or libfabric), the configuration, and an event loop as arguments.
The configuration contains the following parameters:
NetworkMode mode
: Should beNetworkMode::RDMA
for libfrabric andNetworkMode::TCP
for asyncmsg.ThreadSafetyModel thread_safety
: EitherThreadSafetyModel::SAFE
orThreadSafetyModel::UNSAFE
. See below for more information. (Thread safety)OnDataCb on_data_cb
: A callback that is executed when data is received.OnConnectionEstablishedCb on_connection_established_cb
: A callback that is executed when a connection is established.OnConnectionClosedKeysCb on_connection_closed_cb
: A callback that is executed when a connection is closed.OnConnectionRefusedCb on_connection_refused_cb
: A callback that is executed when a connection is refused.OnSendCompleted on_send_completed_cb
: A callback that is executed when a send operation is completed.
The callback signatures are explained in the relevant sections.
auto event_loop = std::make_shared<EpollEventLoop>();
auto backend = Netio3Backend::create(
BackendType::ASYNCMSG,
{
.mode = NetworkMode::RDMA,
.thread_safety = ThreadSafetyModel::SAFE,
// Callbacks
},
evloop
);
Note
It is possible to use NetworkMode::TCP
for the libfabric backend. This is only useful for testing the libfabric backend without RDMA capabilities.
It is not recommended to be used in production. Use the asyncmsg backend for TCP communication.
On the other hand, the asyncmsg will always use TCP even if NetworkMode::RDMA
is passed.
Opening a connection
Opening a connection differs for sending and receiving data.
Receiver
To receive data, the backend must open a listen endpoint to listen for incoming connections. This is done by calling the open_listen_endpoint
function on the backend instance.
This function takes an EndPointAddress
containing IP address and port, and connection parameters as arguments. The connection parameters contain the following parameters:
std::size_t buf_size
: The size of the receive buffer.std::size_t num_buf
: The number of receive buffers.
These paramters are passed to all receive endpoints that are created from this listen endpoint. They define the maximum size of messages that can be received and the maximum number of messages that can be received simultanously (i.e. before being processed by the on_data
callback).
Tip
These parameters are only used by the libfabric backend. The asyncmsg backend does not need to create dedicated buffers to receive data (see asyncmsg documentation).
Tip
If port 0 is passed as the port number, the operating system will assign a random port number. The function will return the assigned port number.
If opening the listen endpoint fails, the function can throw an exception:
InvalidEndpointAddress
if the endpoint address is invalid.ListenEndpointAlreadyExists
If already requested to listen on this address and port.FailedOpenListenEndpoint
if the listen endpoint could not be opened.
-
virtual EndPointAddress netio3::NetworkBackend::open_listen_endpoint(const EndPointAddress &address, const ConnectionParametersRecv &connection_params) = 0
Opens a listen endpoint for the specified address with the given connection parameters.
Should accept all incoming connections. The connection parameters are passed to the spawned receive sockets. If port 0 was passed in, it should return the port number that was used to open the listen endpoint.
- Parameters:
address – The address of the endpoint to open
connection_params – The connection parameters for the endpoint
- Throws:
InvalidEndpointAddress – if the endpoint address is invalid
ListenEndpointAlreadyExists – If already requested to listen on this address and port
FailedOpenListenEndpoint – if the listen endpoint could not be opened
- Returns:
The actual address the server is listening on
Sender
To open a send endpoint, the other side must have opened a listen endpoint. Then, the sender can connect to the listen endpoint by calling the open_send_endpoint
function.
This function takes an EndPointAddress
containing IP address and port of the listen endpoint, and connection parameters as arguments. The connection parameters contain the following parameters:
std::size_t buf_size
: The size of the send buffer.std::size_t num_buf
: The number of send buffers.std::uint8_t* mr_start
: The start of the memory region that should be registered for RDMA operations.
If mr_start
is a nullptr
(default), the send endpoint will allocate num_buf
buffers of size buf_size
for buffered sending.
If mr_start
is not a nullptr
, the send endpoint will register the memory region starting at mr_start
and of size buf_size
. The num_buf
parameter is ignored in this case.
The send methods will be described below.
If the request is invalid so that it will not even be attempted to create a connection, the function can throw three exceptions:
InvalidEndpointAddress
if the endpoint address is invalid.SendEndpointAlreadyExists
if the send endpoint already exists.FailedOpenSendEndpoint
if opening the endpoint failed for other reasons.
Note
If the connection was successfully requested to open but refused by the other side, a callback is executed instead of throwing an exception. This is required since the connection is established asynchronously.
-
virtual void netio3::NetworkBackend::open_send_endpoint(const EndPointAddress &address, const ConnectionParameters &connection_params) = 0
Opens a send endpoint for the specified address with the given connection parameters.
- Parameters:
address – The address of the endpoint to open
connection_params – The connection parameters for the endpoint
- Throws:
InvalidEndpointAddress – if the endpoint address is invalid
FailedOpenSendEndpoint – if the send endpoint could not be opened
SendEndpointAlreadyExists – if the send endpoint already exists
Callbacks
Once a connection has been established (for both sender and receiver), the on_connection_established_cb
callback is executed. This callback is passed the EndPointAddress
of the remote endpoint.
If opening a connection failed the on_connection_refused_cb
callback is executed. This callback is passed the EndPointAddress
of the remote endpoint. This function will only be called for the sender.
Example
auto evloop = std::make_shared<EpollEventLoop>();
auto backend = Netio3Backend::create(
BackendType::LIBFABRIC,
{
.mode = NetworkMode::RDMA,
.thread_safety = ThreadSafetyModel::SAFE,
.on_connection_established_cb = [](const EndPointAddress& remote) {
std::cout << std::format("Connection established with {}:{}\n", remote.address, remote.port);
},
.on_connection_refused_cb = [](const EndPointAddress& remote) {
std::cout << std::format("Connection refused by {}:{}\n", remote.address, remote.port);
},
},
evloop
);
auto thread = std::jthread([this]() { m_evloop->run(); });
const auto listen_endpoint = backend->open_listen_endpoint({ip, 0}, {.buf_size = 4096, .num_buf = 16});
backend->open_send_endpoint(listen_endpoint, {.buf_size = 4096, .num_buf = 16, .mr_start = nullptr});
// Will invoke on_connection_established_cb
const auto wrong_listen_port = 1234;
backend->open_send_endpoint({ip, wrong_listen_port}, {.buf_size = 4096, .num_buf = 16, .mr_start = nullptr});
// Will invoke on_connection_refused_cb because no listen endpoint is open on port 1234 (unless we got really really lucky)
Note
The event loop must be running for the connection to be established. Otherwise the events will not be processed.
Sending data
There are three different modes of sending:
Buffered sending
Zero-copy sending
Immediate sending copying the data
Caution
You can only use buffered sending for send endpoints that were created with a mr_start
of nullptr
.
For libfabric, you can only use zero-copy sending if the send endpoint was created with a mr_start
that is not a nullptr
.
Immediate sending is only available for ASYNCMSG.
Send operations are always asynchronous. The on_send_completed_cb
callback is executed when the send operation is completed.
To match the send operation with the data that was sent, a key
is passed to the callback in addition to the remote endpoint address.
For buffered sending, this key corresponds to the index of the buffer that was sent. For the other send methods, the key is passed as an argument to the send function.
Success or failure is communicated using status codes. There are three possible status codes:
- NetioStatus::OK
: The operation was successful.
- NetioStatus::NO_RESOURCES
: The request can currently not be processed. For example, this can happen if no buffers are available. Retrying after resources have become available is likely to succeed.
- NetioStatus::FAILED
: Sending failed and retrying is unlikely to succeed.
All send functions report their completion using the on_send_completed_cb
callback. The callback is passed the remote endpoint address and the key that was passed to the send function or, for buffered sending, the buffer ID.
Buffered sending
Important
You can only use buffered sending for send endpoints that were created with a mr_start
of nullptr
.
The buffered sending API consists of two functions: get_buffer
and send_buffer
. get_buffer
returns a pointer to a buffer if one is available.
If no buffer is available the function returns a nullptr
. The buffer can be filled with data and then sent using the send_buffer
function.
Since buffers are allocated per send endpoint, the get_buffer
function takes the endpoint as an argument.
The send_buffer
function takes the buffer pointer and the endpoint as arguments and returns a status code.
Both functions can throw an InvalidSendEndpoint
exception if the endpoint is invalid (no connection is open).
get_buffer
can also throw a NoBuffersAllocated
exception if no buffers were allocated because the endpoint was configured for zero-copy sending.
auto* buffer = backend->get_buffer(send_endpoint);
if (buffer != nullptr) { // Always check if a buffer was returned
buffer->write(data);
const auto status = backend->send_buffer(send_endpoint, buffer);
switch (status) {
// Handle status
}
}
Tip
The buffer supports multiple write
functions taking a span of bytes, an iovec
instance or any integral type.
-
virtual NetworkBuffer *netio3::NetworkBackend::get_buffer(const EndPointAddress &address) = 0
Retrieves a network buffer for the specified endpoint address.
Only works for endpoints that are opened for buffered sending. If no buffer is available returns a nullptr.
@important The user needs to check that the returned buffer is not null.
- Parameters:
address – The address of the endpoint to retrieve the buffer for
- Throws:
UnknownSendEndpoint – if the send endpoint does not exist
NoBuffersAllocated – if no buffers were allocated
- Returns:
A pointer to the network buffer
-
virtual NetioStatus netio3::NetworkBackend::send_buffer(const EndPointAddress &address, NetworkBuffer *buffer) = 0
Sends a network buffer to the specified address.
This function sends the data buffer to the specified endpoint. A connection must be registered for this endpoint before sending data. If the buffer is not a buffer handed out through get_buffer FAILED is returned. The buffer will be returned to the connection after the send operation is completed.
- Parameters:
address – The address to send the buffer to
buffer – The network buffer to send
- Throws:
UnknownSendEndpoint – if the send endpoint does not exist
- Returns:
The status of the send operation
Zero-copy sending
Zero-copy sending avoids copying the data into a buffer. Not copying the data can be more performant but comes with the restriction that the data has to remain valid until the send operation is completed. Note, that the library can not guarantee this. It is the responsibility of the user to ensure that the data is not modified (changed, deleted, moved or touched in any way) until the send operation is completed. Failure to do so leads to undefined behavior and likely segmentation faults.
Important
For libfabric, you can only use zero-copy sending if the send endpoint was created with a mr_start
that is not a nullptr
.
The data can only be sent from inside that provided memory region.
Note
TCP will internally still copy the data into kernel space. Only for RDMA this is a true zero-copy operation. However, the additional copy into the buffer is still avoided.
In addition to the data (that has to be inside the registered memory region for RDMA), a small amount of additional data can be passed to the send function.
This data (up to 16 bytes) will be prepended to the payload and allows passing extra information that does not have to be located in the memory region. It also can be discarded after calling send_data
.
Data can be passed in two different ways: As a span of bytes or a vector of iovec
instances.
The key provided to send_data
will not be transmitted over the network. It is only used to match the send operation with the completion callback.
The function can throw the following exceptions:
InvalidEndpointAddress
if the address is invalid.UnknownSendEndpoint
if no connection was opened on this endpoint.
std::vector<std::uint8_t> data = {0x1, 0x2};
const std::vector<std::uint8_t> header_data = {0x0};
const auto key = std::uint64_t{0};
backend->open_send_endpoint({ip, listen_port}, {.buf_size = data.size(), .mr_start = data.data()});
const auto status = backend->send_data(send_endpoint, data, header_data, key);
// handle status
// header_data can be discarded here
// data must not be modified until on_send_completed_cb with matching key is called
Warning
doxygenfunction: Unable to resolve function “netio3::NetworkBackend::send_data” with arguments None in doxygen xml output for project “netio3-backend” from directory: ./_doxygen_build/xml. Potential matches:
- NetioStatus send_data(const EndPointAddress &address, std::span<const iovec> iov, std::span<const std::uint8_t> header_data, std::uint64_t key) = 0
- NetioStatus send_data(const EndPointAddress &address, std::span<std::uint8_t> data, std::span<const std::uint8_t> header_data, std::uint64_t key) = 0
Immediate sending
Important
Immediate sending is only available for asyncmsg.
Immediate sending is the most convenient way to send data for TCP. It neither requires buffer allocation nor does it put any constraints on memory management on the user. It comes at the cost of an additional copy of the data into the library’s memory space and is not supported for RDMA. The signature is identical to the zero-copy sending function.
Note
The separation between data and header data is not necessary for immediate sending but kept for consistency with the zero-copy send functions.
The function can throw the following exceptions:
InvalidEndpointAddress
if the address is invalid.UnknownSendEndpoint
if no connection was opened on this endpoint.NotSupported
if the backend does not support immediate sending.
const std::vector<std::uint8_t> data = {0x1, 0x2};
const std::vector<std::uint8_t> header_data = {0x0};
const auto key = std::uint64_t{0};
backend->open_send_endpoint({ip, listen_port}, {});
const auto status = backend->send_data_copy(send_endpoint, data, header_data, key);
// handle status
// header_data can be discarded here
// data can be modified or deleted immediately
Warning
doxygenfunction: Unable to resolve function “netio3::NetworkBackend::send_data_copy” with arguments None in doxygen xml output for project “netio3-backend” from directory: ./_doxygen_build/xml. Potential matches:
- NetioStatus send_data_copy(const EndPointAddress &address, std::span<const iovec> iov, std::span<const std::uint8_t> header_data, std::uint64_t key) = 0
- NetioStatus send_data_copy(const EndPointAddress &address, std::span<const std::uint8_t> data, std::span<const std::uint8_t> header_data, std::uint64_t key) = 0
Data reception
Data is received via the on_data_cb
callback. Errors receiving data are not reported.
Important
The data provided in the callback is only valid within the callback. If the data needs to be handled after the callback execution has been finished it has to be copied.
Closing a connection
A connection can be closed using the close_send_endpoint
or close_listen_endpoint
functions. Both functions take the endpoint as an argument.
As all other operations, closing a connection is asynchronous. The on_connection_closed_cb
callback is executed when the connection is closed.
The callback receives the remote endpoint address and all keys of outstanding (and therefore cancelled) send operations as an argument.
Closing a listen endpoint will also close all receive endpoints that were created from this listen endpoint. The on_connection_closed_cb
callback will in fact only be called for receive endpoints and not listen endpoints.
Closing a listen endpoint that never spawned a receive endpoint will not trigger the callback.
The close_send_endpoint
function can throw the following exceptions:
InvalidEndpointAddress
if the endpoint address is invalid.UnknownSendEndpoint
if the send endpoint does not exist.FailedCloseSendEndpoint
if closing the send socket failed.
The close_listen_endpoint
function can throw the following exceptions:
InvalidEndpointAddress
if the endpoint address is invalid.UnknownListenEndpoint
if the listen endpoint does not exist.FailedCloseListenEndpoint
if closing the listen socket failed.
-
virtual void netio3::NetworkBackend::close_send_endpoint(const EndPointAddress &address) = 0
Closes the send endpoint for the specified address.
May enqueue the endpoint to be closed by the event loop for thread synchronization purposes.
- Parameters:
address – The address of the endpoint to close
- Throws:
InvalidEndpointAddress – if the endpoint address is invalid
UnknownSendEndpoint – if the send endpoint does not exist
FailedCloseSendEndpoint – if closing the send socket failed
-
virtual void netio3::NetworkBackend::close_listen_endpoint(const EndPointAddress &address) = 0
Closes the listen endpoint for the specified address.
Shall also close all receive sockets that were spawned by this listen socket.
May enqueue the endpoint to be closed by the event loop for thread synchronization purposes.
- Parameters:
address – The address of the endpoint to close
- Throws:
InvalidEndpointAddress – if the endpoint address is invalid
UnknownListenEndpoint – if the listen endpoint does not exist
FailedCloseListenEndpoint – if closing the listen socket failed
Thread safety
The backend can be configured to be thread-safe or not. The thread safety model is passed to the backend factory function. If the backend is thread-safe, all functions can be called from any thread. If the backend is not thread-safe, all functions must be called from the same thread that runs the event loop. Note, that for the thread-unsafe mode even sending data has to be executed on the event loop thread which requires the use of either signals or timers.
Note
The asyncmsg backend is always thread-safe.
Example
Receiver:
auto evloop = std::make_shared<EpollEventLoop>();
auto backend = Netio3Backend::create(
BackendType::LIBFABRIC,
{
.mode = NetworkMode::RDMA,
.thread_safety = ThreadSafetyModel::SAFE,
.on_data_cb = [](std::span<const std::uint8_t> data) {
std::cout << std::format("Received data of size {}\n", std::size(data));
},
.on_connection_established_cb = [](const EndPointAddress& remote) {
std::cout << std::format("Connection established with {}:{}\n", remote.address, remote.port);
},
.on_connection_closed_cb = [](const EndPointAddress& remote, const std::vector<std::uint64_t>& keys) {
std::cout << std::format("Connection closed with {}:{}\n", remote.address, remote.port);
},
.on_connection_refused_cb = [](const EndPointAddress& remote) {
std::cout << std::format("Connection refused by {}:{}\n", remote.address, remote.port);
},
},
evloop
);
auto thread = std::jthread([this]() { m_evloop->run(); });
const auto listen_endpoint = backend->open_listen_endpoint({ip, port}, {.buf_size = 4096, .num_buf = 16});
// Wait for connection
// Receive data
backend->close_listen_endpoint(listen_endpoint);
evloop->stop();
Sender:
auto evloop = std::make_shared<EpollEventLoop>();
auto backend = Netio3Backend::create(
BackendType::LIBFABRIC,
{
.mode = NetworkMode::RDMA,
.thread_safety = ThreadSafetyModel::SAFE,
.on_connection_established_cb = [](const EndPointAddress& remote) {
std::cout << std::format("Connection established with {}:{}\n", remote.address, remote.port);
},
.on_connection_closed_cb = [](const EndPointAddress& remote, const std::vector<std::uint64_t>& keys) {
std::cout << std::format("Connection closed with {}:{}\n", remote.address, remote.port);
},
.on_connection_refused_cb = [](const EndPointAddress& remote) {
std::cout << std::format("Connection refused by {}:{}\n", remote.address, remote.port);
},
.on_send_completed_cb = [](const EndPointAddress& remote, std::uint64_t key) {
std::cout << std::format("Send operation to {}:{} with key {} completed\n", remote.address, remote.port, key);
},
},
evloop
);
auto thread = std::jthread([this]() { m_evloop->run(); });
const auto endpoint = EndPointAddress{ip, port};
backend->open_send_endpoint(endpoint, {.buf_size = 4096, .num_buf = 16, .mr_start = nullptr});
// Wait for connection
auto* buf = backend->get_buffer(endpoint);
if (buf != nullptr) {
buf->write(data);
const auto status = backend->send_buffer(endpoint, buf);
switch (status) {
// Handle status
// Try again if NO_RESOURCES was returned or throw exception if FAILED was returned (for example)
}
}
backend->close_send_endpoint(endpoint);
evloop->stop();