Sending and receiving messages

Sending

Sending messages is done with the netio3::NetioSender class. First you instantiate a netio3::NetioSender object, then open a connection to the netio3::EndPointAddress you want to communicate with. Once the connection is established, you can call one of the netio3::NetioSender::send_data() methods.

const NetioSenderConfig sender_config{
    .backend_type = NetworkType::ASYNCMSG,
    .backend_mode = NetworkMode::TCP,
    .buffersize = 1024,
    .nbuffers = 10
};
NetioSender sender (sender_config, evloop);

The netio3::NetioSender is configured with a netio3::NetioSenderConfig object which is passed to the constructor along with the event loop (see netio3::BaseEventLoop).

std::atomic_bool connection_cb = false;
bool connection_ready = false;
sender.set_on_connection_established ([&](const EndPointAddress&) {
    connection_ready = true;
    connection_cb.store(true);
    connection_cb.notify_all();
});

bool connection_failed = false;
sender.set_on_connection_refused ([&](const EndPointAddress&) {
    connection_failed = true;
    connection_cb.store(true);
    connection_cb.notify_all();
});

const EndPointAddress remoteEp{"127.0.0.1", 1337};
auto& connection = sender.open_connection(remoteEp);

// Wait for connection to establish
connection_cb.wait(false);

if (connection_failed) {
    // Tidy up and exit or throw
}

After constructing the netio3::NetioSender, the next step is to open a connection to the destination network endpoint. The netio3::NetioSender::open_connection() method returns a netio3::NetioSender::Connection object immediately but the connection is established asynchronously so may still fail. If you want to be informed of a successful connection, you can set a callback to be called on connection established. There is also a callback for connection refused. Here we set the connection callbacks to set a flag that we can wait for before attempting to send our message.

std::string text{"Hello world"};
auto message = std::span<uint8_t>(
    reinterpret_cast<uint8_t*>(text.data()),
    text.size());
const uint64_t tag=0x1234abcdfeedface;

status = sender.send_data(connection,
            tag,
            message);
if (status != NetioStatus::OK) {
   // Something went wrong

The message is represented as a std::span of unsigned 8-bit values.

The netio3::NetioSender class provides several methods for sending data. The main two options are netio3::NetioSender::send_data() which sends the data immediately and netio3::NetioSender::buffered_send_data() which will accumulate multiple packets in a buffer and send either when the buffer is full or a time-out is reached. In the example above, the ‘immediate’ version is used, sending a single span of data.

Note that there are send_data methods that take the endpoint address of an opened connection as the first argument so you don’t have to keep track of the connection between the open and the send but it saves a map lookup to use the connection object.

All netio3::NetioSender::send_data() methods include a tag in their parameter lists. This tag is used by the publish/subscribe mechanism but can be any arbitrary value in a simple send/receive set up.

If you need to be notified of the successful completion of the send, you can set an on send completed callback (see netio3::NetioSender::set_on_send_completed()).

Choosing a send_data method

There are 3 modes of communication, ‘send’, ‘buffered_send’ and ‘zero_copy_send’ each with a group of methods netio3::NetioSender::send_data(), netio3::NetioSender::buffered_send_data() and netio3::NetioSender::zero_copy_send_data().

Although in all cases the data is sent asynchronously, in the first 2 the data are copied into a local intermediate buffer and so the memory passed to the send_data method may be reused or deallocated immediately. In the zero-copy case, the memory containing the data being sent must remain valid until the user receives a callback indicating that the send is complete.

The plain send methods send a single message straight away. The data will be copied into a buffer so the message memory may be reused as soon as the method returns even though the data will be sent asynchronously.

NetioStatus netio3::NetioSender::send_data(const EndPointAddress &ep, uint64_t tag, std::span<const iovec> data, uint8_t user_status = 0)

The buffered send methods accumulate several messages into one buffer before sending, either when the buffer is full or the time since the first message was inserted reaches a predefined limit. If you want to force the sending of the buffer at any point, you may call the netio3::NetioSender::flush_buffer() method.

NetioStatus netio3::NetioSender::buffered_send_data(const EndPointAddress &ep, uint64_t tag, std::span<const iovec> data, uint8_t user_status = 0)

The zero-copy methods initiate the sending of data immediately without a copy. The completion of the sending is asynchronous so the memory the containing the data must not be modified until the send is complete. To track usage of the transfer, the zero-copy send methods accept a key parameter which is then returned by the send complete callback when the transfer is complete.

NetioStatus netio3::NetioSender::zero_copy_send_data(const EndPointAddress &ep, std::uint64_t tag, std::span<const iovec> iov, uint8_t user_status, std::uint64_t key)

Receiving messages

To receive messages, you first need to instantiate a NetioReceiver object and set it’s “on_data” callback to handle your messages. Then you need to set it to listen on a port.

const NetioReceiverConfig rec_config {
    .backend_type = NetworkType::ASYNCMSG,
    .backend_mode = NetworkMode::TCP,
    .thread_safety = ThreadSafetyModel::SAFE
};
NetioReceiver receiver(rec_config, evloop);

std::atomic_uint nmessages_received = 0;
receiver.set_on_data_cb([&](uint64_t tag,
                            std::span<const uint8_t> payload,
                            uint8_t status){
  const std::string msg(payload.begin(),payload.end());
  std::cout << std::format(
      "Received message with tag {:x}, size {}, status {}, message <{}>\n",
      tag, payload.size(), status, msg);
  nmessages_received++;
});

ConnectionParametersRecv connection_pars{.buf_size=512,
                                         .num_buf=4};
EndPointAddress local_ep{"127.0.0.1", 1337};
receiver.listen(local_ep, connection_pars);

The main work of the receiver is implemented in the “on_data” callback. There are two options for the callback, one just receiving a single message at a time (set via the netio3::NetioReceiver::set_on_data_cb() method) and the other receiving a complete buffer of possibly multiple messages (set via the netio3::NetioReceiver::set_on_buffer_cb() method). In this example we use the netio3::NetioReceiver::set_on_data_cb() method to set a callback (netio3::CbMessageReceived) to just print a summary and count the messages.

After setting the data receiving callback we have to set the receiver to listen on a network port for incoming connections. The receiver needs to be told how many buffers to allocate for receiving and how large they need to be. This is done via a netio3::ConnectionParametersRecv object.