Network communication ===================== This chapter describes how to establish or close connections and how to send and receive data including the handling of errors. The interface is identical for both the asyncmsg and the libfabric backend. The next chapters will describe the differences in the implementation in detail. Creating a backend instance --------------------------- The backend should be constructed using the ``Netio3Backend::create`` factory function. This function takes the backend type (asyncmsg or libfabric), the configuration, and an event loop as arguments. The configuration contains the following parameters: - ``NetworkMode mode``: Should be ``NetworkMode::RDMA`` for libfrabric and ``NetworkMode::TCP`` for asyncmsg. - ``ThreadSafetyModel thread_safety``: Either ``ThreadSafetyModel::SAFE`` or ``ThreadSafetyModel::UNSAFE``. See below for more information. (:ref:`Thread safety `) - ``OnDataCb on_data_cb``: A callback that is executed when data is received. - ``OnConnectionEstablishedCb on_connection_established_cb``: A callback that is executed when a connection is established. - ``OnConnectionClosedKeysCb on_connection_closed_cb``: A callback that is executed when a connection is closed. - ``OnConnectionRefusedCb on_connection_refused_cb``: A callback that is executed when a connection is refused. - ``OnSendCompleted on_send_completed_cb``: A callback that is executed when a send operation is completed. The callback signatures are explained in the relevant sections. .. code-block:: cpp auto event_loop = std::make_shared(); auto backend = Netio3Backend::create( BackendType::ASYNCMSG, { .mode = NetworkMode::RDMA, .thread_safety = ThreadSafetyModel::SAFE, // Callbacks }, evloop ); .. note:: It is possible to use ``NetworkMode::TCP`` for the libfabric backend. This is only useful for testing the libfabric backend without RDMA capabilities. It is not recommended to be used in production. Use the asyncmsg backend for TCP communication. On the other hand, the asyncmsg will always use TCP even if ``NetworkMode::RDMA`` is passed. Opening a connection -------------------- Opening a connection differs for sending and receiving data. Receiver ~~~~~~~~ To receive data, the backend must open a listen endpoint to listen for incoming connections. This is done by calling the ``open_listen_endpoint`` function on the backend instance. This function takes an ``EndPointAddress`` containing IP address and port, and connection parameters as arguments. The connection parameters contain the following parameters: - ``std::size_t buf_size``: The size of the receive buffer. - ``std::size_t num_buf``: The number of receive buffers. These paramters are passed to all receive endpoints that are created from this listen endpoint. They define the maximum size of messages that can be received and the maximum number of messages that can be received simultanously (i.e. before being processed by the ``on_data`` callback). .. tip:: These parameters are only used by the libfabric backend. The asyncmsg backend does not need to create dedicated buffers to receive data (see :ref:`asyncmsg documentation `). .. tip:: If port 0 is passed as the port number, the operating system will assign a random port number. The function will return the assigned port number. If opening the listen endpoint fails, the function can throw an exception: - ``InvalidEndpointAddress`` if the endpoint address is invalid. - ``ListenEndpointAlreadyExists`` If already requested to listen on this address and port. - ``FailedOpenListenEndpoint`` if the listen endpoint could not be opened. .. doxygenfunction:: netio3::NetworkBackend::open_listen_endpoint :no-link: Sender ~~~~~~ To open a send endpoint, the other side must have opened a listen endpoint. Then, the sender can connect to the listen endpoint by calling the ``open_send_endpoint`` function. This function takes an ``EndPointAddress`` containing IP address and port of the listen endpoint, and connection parameters as arguments. The connection parameters contain the following parameters: - ``std::size_t buf_size``: The size of the send buffer. - ``std::size_t num_buf``: The number of send buffers. - ``std::uint8_t* mr_start``: The start of the memory region that should be registered for RDMA operations. If ``mr_start`` is a ``nullptr`` (default), the send endpoint will allocate ``num_buf`` buffers of size ``buf_size`` for buffered sending. If ``mr_start`` is not a ``nullptr``, the send endpoint will register the memory region starting at ``mr_start`` and of size ``buf_size``. The ``num_buf`` parameter is ignored in this case. The send methods will be described :ref:`below `. If the request is invalid so that it will not even be attempted to create a connection, the function can throw three exceptions: - ``InvalidEndpointAddress`` if the endpoint address is invalid. - ``SendEndpointAlreadyExists`` if the send endpoint already exists. - ``FailedOpenSendEndpoint`` if opening the endpoint failed for other reasons. .. note:: If the connection was successfully requested to open but refused by the other side, a callback is executed instead of throwing an exception. This is required since the connection is established asynchronously. .. doxygenfunction:: netio3::NetworkBackend::open_send_endpoint :no-link: Callbacks ~~~~~~~~~ Once a connection has been established (for both sender and receiver), the ``on_connection_established_cb`` callback is executed. This callback is passed the ``EndPointAddress`` of the remote endpoint. If opening a connection failed the ``on_connection_refused_cb`` callback is executed. This callback is passed the ``EndPointAddress`` of the remote endpoint. This function will only be called for the sender. Example ~~~~~~~ .. code-block:: cpp auto evloop = std::make_shared(); auto backend = Netio3Backend::create( BackendType::LIBFABRIC, { .mode = NetworkMode::RDMA, .thread_safety = ThreadSafetyModel::SAFE, .on_connection_established_cb = [](const EndPointAddress& remote) { std::cout << std::format("Connection established with {}:{}\n", remote.address, remote.port); }, .on_connection_refused_cb = [](const EndPointAddress& remote) { std::cout << std::format("Connection refused by {}:{}\n", remote.address, remote.port); }, }, evloop ); auto thread = std::jthread([this]() { m_evloop->run(); }); const auto listen_endpoint = backend->open_listen_endpoint({ip, 0}, {.buf_size = 4096, .num_buf = 16}); backend->open_send_endpoint(listen_endpoint, {.buf_size = 4096, .num_buf = 16, .mr_start = nullptr}); // Will invoke on_connection_established_cb const auto wrong_listen_port = 1234; backend->open_send_endpoint({ip, wrong_listen_port}, {.buf_size = 4096, .num_buf = 16, .mr_start = nullptr}); // Will invoke on_connection_refused_cb because no listen endpoint is open on port 1234 (unless we got really really lucky) .. note:: The event loop must be running for the connection to be established. Otherwise the events will not be processed. .. _sending_data: Sending data ------------ There are three different modes of sending: - Buffered sending - Zero-copy sending - Immediate sending copying the data .. caution:: You can only use buffered sending for send endpoints that were created with a ``mr_start`` of ``nullptr``. For libfabric, you can only use zero-copy sending if the send endpoint was created with a ``mr_start`` that is not a ``nullptr``. Immediate sending is only available for ASYNCMSG. Send operations are always asynchronous. The ``on_send_completed_cb`` callback is executed when the send operation is completed. To match the send operation with the data that was sent, a ``key`` is passed to the callback in addition to the remote endpoint address. For buffered sending, this key corresponds to the index of the buffer that was sent. For the other send methods, the key is passed as an argument to the send function. Success or failure is communicated using status codes. There are three possible status codes: - ``NetioStatus::OK``: The operation was successful. - ``NetioStatus::NO_RESOURCES``: The request can currently not be processed. For example, this can happen if no buffers are available. Retrying after resources have become available is likely to succeed. - ``NetioStatus::FAILED``: Sending failed and retrying is unlikely to succeed. All send functions report their completion using the ``on_send_completed_cb`` callback. The callback is passed the remote endpoint address and the key that was passed to the send function or, for buffered sending, the buffer ID. Buffered sending ~~~~~~~~~~~~~~~~ .. important:: You can only use buffered sending for send endpoints that were created with a ``mr_start`` of ``nullptr``. The buffered sending API consists of two functions: ``get_buffer`` and ``send_buffer``. ``get_buffer`` returns a pointer to a buffer if one is available. If no buffer is available the function returns a ``nullptr``. The buffer can be filled with data and then sent using the ``send_buffer`` function. Since buffers are allocated per send endpoint, the ``get_buffer`` function takes the endpoint as an argument. The ``send_buffer`` function takes the buffer pointer and the endpoint as arguments and returns a status code. Both functions can throw an ``InvalidSendEndpoint`` exception if the endpoint is invalid (no connection is open). ``get_buffer`` can also throw a ``NoBuffersAllocated`` exception if no buffers were allocated because the endpoint was configured for zero-copy sending. .. code-block:: cpp auto* buffer = backend->get_buffer(send_endpoint); if (buffer != nullptr) { // Always check if a buffer was returned buffer->write(data); const auto status = backend->send_buffer(send_endpoint, buffer); switch (status) { // Handle status } } .. tip:: The buffer supports multiple ``write`` functions taking a span of bytes, an ``iovec`` instance or any integral type. .. doxygenfunction:: netio3::NetworkBackend::get_buffer :no-link: .. doxygenfunction:: netio3::NetworkBackend::send_buffer :no-link: Zero-copy sending ~~~~~~~~~~~~~~~~~ Zero-copy sending avoids copying the data into a buffer. Not copying the data can be more performant but comes with the restriction that the data has to remain valid until the send operation is completed. Note, that the library can not guarantee this. It is the responsibility of the user to ensure that the data is not modified (changed, deleted, moved or touched in any way) until the send operation is completed. Failure to do so leads to undefined behavior and likely segmentation faults. .. important:: For libfabric, you can only use zero-copy sending if the send endpoint was created with a ``mr_start`` that is not a ``nullptr``. The data can only be sent from inside that provided memory region. .. note:: TCP will internally still copy the data into kernel space. Only for RDMA this is a true zero-copy operation. However, the additional copy into the buffer is still avoided. In addition to the data (that has to be inside the registered memory region for RDMA), a small amount of additional data can be passed to the send function. This data (up to 16 bytes) will be prepended to the payload and allows passing extra information that does not have to be located in the memory region. It also can be discarded after calling ``send_data``. Data can be passed in two different ways: As a span of bytes or a vector of ``iovec`` instances. The key provided to ``send_data`` will not be transmitted over the network. It is only used to match the send operation with the completion callback. The function can throw the following exceptions: - ``InvalidEndpointAddress`` if the address is invalid. - ``UnknownSendEndpoint`` if no connection was opened on this endpoint. .. code-block:: cpp std::vector data = {0x1, 0x2}; const std::vector header_data = {0x0}; const auto key = std::uint64_t{0}; backend->open_send_endpoint({ip, listen_port}, {.buf_size = data.size(), .mr_start = data.data()}); const auto status = backend->send_data(send_endpoint, data, header_data, key); // handle status // header_data can be discarded here // data must not be modified until on_send_completed_cb with matching key is called .. doxygenfunction:: netio3::NetworkBackend::send_data :no-link: Immediate sending ~~~~~~~~~~~~~~~~~ .. important:: Immediate sending is only available for asyncmsg. Immediate sending is the most convenient way to send data for TCP. It neither requires buffer allocation nor does it put any constraints on memory management on the user. It comes at the cost of an additional copy of the data into the library's memory space and is not supported for RDMA. The signature is identical to the zero-copy sending function. .. note:: The separation between data and header data is not necessary for immediate sending but kept for consistency with the zero-copy send functions. The function can throw the following exceptions: - ``InvalidEndpointAddress`` if the address is invalid. - ``UnknownSendEndpoint`` if no connection was opened on this endpoint. - ``NotSupported`` if the backend does not support immediate sending. .. code-block:: cpp const std::vector data = {0x1, 0x2}; const std::vector header_data = {0x0}; const auto key = std::uint64_t{0}; backend->open_send_endpoint({ip, listen_port}, {}); const auto status = backend->send_data_copy(send_endpoint, data, header_data, key); // handle status // header_data can be discarded here // data can be modified or deleted immediately .. doxygenfunction:: netio3::NetworkBackend::send_data_copy :no-link: Data reception -------------- Data is received via the ``on_data_cb`` callback. Errors receiving data are not reported. .. important:: The data provided in the callback is only valid within the callback. If the data needs to be handled after the callback execution has been finished it has to be copied. Closing a connection -------------------- A connection can be closed using the ``close_send_endpoint`` or ``close_listen_endpoint`` functions. Both functions take the endpoint as an argument. As all other operations, closing a connection is asynchronous. The ``on_connection_closed_cb`` callback is executed when the connection is closed. The callback receives the remote endpoint address and all keys of outstanding (and therefore cancelled) send operations as an argument. Closing a listen endpoint will also close all receive endpoints that were created from this listen endpoint. The ``on_connection_closed_cb`` callback will in fact only be called for receive endpoints and not listen endpoints. Closing a listen endpoint that never spawned a receive endpoint will not trigger the callback. The ``close_send_endpoint`` function can throw the following exceptions: - ``InvalidEndpointAddress`` if the endpoint address is invalid. - ``UnknownSendEndpoint`` if the send endpoint does not exist. - ``FailedCloseSendEndpoint`` if closing the send socket failed. The ``close_listen_endpoint`` function can throw the following exceptions: - ``InvalidEndpointAddress`` if the endpoint address is invalid. - ``UnknownListenEndpoint`` if the listen endpoint does not exist. - ``FailedCloseListenEndpoint`` if closing the listen socket failed. .. doxygenfunction:: netio3::NetworkBackend::close_send_endpoint :no-link: .. doxygenfunction:: netio3::NetworkBackend::close_listen_endpoint :no-link: .. _thread_safety: Thread safety ------------- The backend can be configured to be thread-safe or not. The thread safety model is passed to the backend factory function. If the backend is thread-safe, all functions can be called from any thread. If the backend is not thread-safe, all functions must be called from the same thread that runs the event loop. Note, that for the thread-unsafe mode even sending data has to be executed on the event loop thread which requires the use of either signals or timers. .. note:: The asyncmsg backend is always thread-safe. Example ------- Receiver: .. code-block:: cpp auto evloop = std::make_shared(); auto backend = Netio3Backend::create( BackendType::LIBFABRIC, { .mode = NetworkMode::RDMA, .thread_safety = ThreadSafetyModel::SAFE, .on_data_cb = [](std::span data) { std::cout << std::format("Received data of size {}\n", std::size(data)); }, .on_connection_established_cb = [](const EndPointAddress& remote) { std::cout << std::format("Connection established with {}:{}\n", remote.address, remote.port); }, .on_connection_closed_cb = [](const EndPointAddress& remote, const std::vector& keys) { std::cout << std::format("Connection closed with {}:{}\n", remote.address, remote.port); }, .on_connection_refused_cb = [](const EndPointAddress& remote) { std::cout << std::format("Connection refused by {}:{}\n", remote.address, remote.port); }, }, evloop ); auto thread = std::jthread([this]() { m_evloop->run(); }); const auto listen_endpoint = backend->open_listen_endpoint({ip, port}, {.buf_size = 4096, .num_buf = 16}); // Wait for connection // Receive data backend->close_listen_endpoint(listen_endpoint); evloop->stop(); Sender: .. code-block:: cpp auto evloop = std::make_shared(); auto backend = Netio3Backend::create( BackendType::LIBFABRIC, { .mode = NetworkMode::RDMA, .thread_safety = ThreadSafetyModel::SAFE, .on_connection_established_cb = [](const EndPointAddress& remote) { std::cout << std::format("Connection established with {}:{}\n", remote.address, remote.port); }, .on_connection_closed_cb = [](const EndPointAddress& remote, const std::vector& keys) { std::cout << std::format("Connection closed with {}:{}\n", remote.address, remote.port); }, .on_connection_refused_cb = [](const EndPointAddress& remote) { std::cout << std::format("Connection refused by {}:{}\n", remote.address, remote.port); }, .on_send_completed_cb = [](const EndPointAddress& remote, std::uint64_t key) { std::cout << std::format("Send operation to {}:{} with key {} completed\n", remote.address, remote.port, key); }, }, evloop ); auto thread = std::jthread([this]() { m_evloop->run(); }); const auto endpoint = EndPointAddress{ip, port}; backend->open_send_endpoint(endpoint, {.buf_size = 4096, .num_buf = 16, .mr_start = nullptr}); // Wait for connection auto* buf = backend->get_buffer(endpoint); if (buf != nullptr) { buf->write(data); const auto status = backend->send_buffer(endpoint, buf); switch (status) { // Handle status // Try again if NO_RESOURCES was returned or throw exception if FAILED was returned (for example) } } backend->close_send_endpoint(endpoint); evloop->stop();