Publishing / Subscribing

Publishing and subscribing is done in terms of tags. A tags is a 64-bit value that identifies which stream of data is being published/subscribed to (in the case of FELIX a tag is a FelixID). A subscriber contacts a publisher on its subscription port and sends a subscription message for each tag that it wants data from. When a publisher publishes data for a given tag, it sends data to all of the subscribers who have subscribed to that tag.

The publish / subscribe classes are built on the backs of the sender and receiver classes. Internally netio3::NetioPublisher uses a netio3::NetioSender to publish its data and netio3::NetioSubscriber has a netio3::NetioReceiver to receive the subscribed data. A separate connection using the netio3-backend directly is used for subscription requests and acknowledgements.

As with the sender and receiver classes, the publish and subscribe classes can work in an immediate mode, where each message is sent in a separate packet, or a buffered mode where data are accumulated until a packet is full or the time since the first message in the packet reaches a threshold.

Note

The netio3::NetioPublisher class is actually an alias for a netio3::NetioPublisherTemplate template class with the type netio3::SubscriptionEndpointMap.

Publishing

const EndPointAddress local_ep{"192.168.200.10", 1337};

const NetioPublisherConfig config {
    .backend_type = NetworkType::LIBFABRIC,
    .backend_mode = NetworkMode::RDMA,
    .buffersize = 16384,
    .nbuffers = 16,
    .method = SendMethod::BUFFERED,
    .thread_safety = ThreadSafetyModel::SAFE
};

NetioPublisher publisher(config, evloop, local_ep);

The configuration of the publisher is described by a netio3::NetioPublisherConfig object. This selects the type of backend and mode of communication.

The NetioPublisher will listen on local_ep for subscription requests

Note

Subscription requests and acknowledgements are always sent with the the asyncmsg backend (netio3::asyncmsg::BackendAsyncmsg). This is independent of the backend used for the data transfer. Do not worry if you see messages in the logs from this backend when you select a different backend for your data.

Of course, there is no point in publishing unless someone has subscribed to the data. For our example program, let’s register a callback funstion to be executed when someone subscribes and wait for that to be called before our first publish attempt.

const uint64_t tag = 0xf007ba11;
std::atomic_bool subscriber = false;
publisher.set_on_subscription([&] (uint64_t subs_tag, const EndPointAddress&) {
  if (subs_tag == tag) {
    std::cout << "Subscribed to our tag\n";
    subscriber.store(true);
    subscriber.notify_all();
  }
});


// Wait for subscriber
subscriber.wait(false);

Now we can try publishing some data.

std::vector<uint8_t> buffer {1, 2, 3, 4};
std::vector<iovec> iov {{.iov_base = buffer.data(),
                         .iov_len = buffer.size()}};

while (subscriber.load()) {
    auto status = publisher.publish(tag, iov, false);

    while (status == NetioPublisherStatus::NO_RESOURCES) {
        status = publisher.publish(tag, iov, true);
    }

    buffer[0]++;
    std::this_thread::sleep_for(10ms);
}

Here we try publishing data for tag f007ba11, describing the data with an iovec until the subscriber unsubscribes or disconnects.

After each publish any publishes that failed with a NetioPublisherStatus::NO_RESOURCES status is retried in a second call to publish. The difference between this call and the original publish is that this will only send to subscribers who have not already been sent this buffer.

Note

The NetioPublisher::publish methods return a netio3::NetioPublisherStatus which has an extra possibile value w.r.t. netio3::NetioStatus.

Choosing a publish method

Like the netio3::NetioSender::send_data(), the NetioPublisher provides netio3::NetioPublisher::publish() methods for buffered or zero-copy sending of a single span of data or an iovec of addresses to gather data from.

Note

If the netio3::NetioPublisherConfig specifies zero-copy mode, you must use the netio3::NetioPublisher::publish() method with a key argument.

inline NetioPublisherStatus netio3::NetioPublisherTemplate::publish(uint64_t tag, std::span<const iovec> data, bool retry, uint8_t user_status = 0)
inline NetioPublisherStatus netio3::NetioPublisherTemplate::publish(uint64_t tag, std::span<const uint8_t> data, bool retry, uint8_t user_status = 0)
inline NetioPublisherStatus netio3::NetioPublisherTemplate::publish(uint64_t tag, std::span<const iovec> data, bool retry, uint8_t user_status, uint64_t key)
inline NetioPublisherStatus netio3::NetioPublisherTemplate::publish(uint64_t tag, std::span<uint8_t> data, bool retry, uint8_t user_status, uint64_t key)

Subscribing to published data

Subscribing to data is done with the netio3::NetioSubscriber class. Once the subscription is set up, all the data reception is handled by the ‘on data’ callback.

const std::string local_address{"192.168.200.10"};

const NetioSubscriberConfig config {
    .backend_type = LIBFABRIC,
    .backend_mode = RDMA,
    .thread_safety = ThreadSafetyModel::SAFE
};

NetioSubscriber subscriber(config, evloop, local_address);

Here we instantiate a NetioSubscriber object with a NetioSubscriberConfig and our local address. The NetioSubscriberConfig struct is used to configure the subscriber. In this case, we are selecting the libfabric backend in RDMA mode.

std::atomic_int nmessages_received = 0;
subscriber.set_on_data_cb([&](std::uint64_t rcv_tag,
                              std::span<const std::uint8_t> payload,
                              std::uint8_t status) {
   // Here we decode the message and deal with the data
   nmessages_received++;
});

Next we configure the subscriber with a callback for when a subscription is successful and most importantly, a callback for when our subscibed data arrive. The data callback is passed messages each with its own tag and status. Alternatively, we could register the on_buffer callback to be given the raw contents of the buffer not split into individual messages.

// Address of publisher
EndPointAddress ep{"192.168.200.10", 1337};

const size_t RCV_BUFFER_SIZE = 2048;
const uint32_t NRCV_BUFFERS = 16;
const auto connection_params =
    ConnectionParametersRecv{RCV_BUFFER_SIZE, NRCV_BUFFERS};
const uint64_t tag = 0xf007ba11;

auto status = subscriber.subscribe(tag, ep, connection_params);
if (status != NetioStatus::OK) {
   // Handle error case
}

Then, attempt the subscription and wait for the on data callback to be called.

std::string input;
while (input.empty() || input[0] != 'q') {
    std::cout << "Enter q to quit ";
    std::getline(std::cin, input);
    std::cout << "Received total of " << nmessages_received.load() << " messages\n";
}

status = subscriber.unsubscribe(tag, ep);
if (status != NetioStatus::OK) {
  std::cout << "Error unsubscribing\n";
}
std::this_thread::sleep_for(10ms);

After we have finished receiving data, we unsubscribe from the publisher giving the same tag and enpoint we used in the subscribe call earlier.