Publishing / Subscribing
Publishing and subscribing is done in terms of tags. A tags is a
64-bit value that identifies which stream of data is being
published/subscribed to (in the case of FELIX a tag is a FelixID).
A subscriber contacts a publisher on its subscription port and sends
a subscription message for each tag that it wants data from. When a
publisher publishes data for a given tag, it sends data to all of the
subscribers who have subscribed to that tag.
The publish / subscribe classes are built on the backs of the sender
and receiver classes. Internally netio3::NetioPublisher uses a netio3::NetioSender to publish its data
and netio3::NetioSubscriber has a netio3::NetioReceiver to receive the subscribed data.
A separate connection using the netio3-backend directly is used for
subscription requests and acknowledgements.
As with the sender and receiver classes, the publish and subscribe classes can work in an immediate mode, where each message is sent in a separate packet, or a buffered mode where data are accumulated until a packet is full or the time since the first message in the packet reaches a threshold.
Note
The netio3::NetioPublisher class is actually an
alias for a netio3::NetioPublisherTemplate
template class with the type
netio3::SubscriptionEndpointMap.
Publishing
const EndPointAddress local_ep{"192.168.200.10", 1337};
const NetioPublisherConfig config {
.backend_type = NetworkType::LIBFABRIC,
.backend_mode = NetworkMode::RDMA,
.buffersize = 16384,
.nbuffers = 16,
.method = SendMethod::BUFFERED,
.thread_safety = ThreadSafetyModel::SAFE
};
NetioPublisher publisher(config, evloop, local_ep);
The configuration of the publisher is described by a
netio3::NetioPublisherConfig object. This selects the type of
backend and mode of communication.
The NetioPublisher will listen on local_ep for subscription requests
Note
Subscription requests and acknowledgements are always sent
with the the asyncmsg
backend (netio3::asyncmsg::BackendAsyncmsg). This is
independent of the backend used for the data transfer. Do not worry
if you see messages in the logs from this backend when you select a
different backend for your data.
Of course, there is no point in publishing unless someone has subscribed to the data. For our example program, let’s register a callback funstion to be executed when someone subscribes and wait for that to be called before our first publish attempt.
const uint64_t tag = 0xf007ba11;
std::atomic_bool subscriber = false;
publisher.set_on_subscription([&] (uint64_t subs_tag, const EndPointAddress&) {
if (subs_tag == tag) {
std::cout << "Subscribed to our tag\n";
subscriber.store(true);
subscriber.notify_all();
}
});
// Wait for subscriber
subscriber.wait(false);
Now we can try publishing some data.
std::vector<uint8_t> buffer {1, 2, 3, 4};
std::vector<iovec> iov {{.iov_base = buffer.data(),
.iov_len = buffer.size()}};
while (subscriber.load()) {
auto status = publisher.publish(tag, iov, false);
while (status == NetioPublisherStatus::NO_RESOURCES) {
status = publisher.publish(tag, iov, true);
}
buffer[0]++;
std::this_thread::sleep_for(10ms);
}
Here we try publishing data for tag f007ba11, describing the data
with an iovec until the subscriber unsubscribes or disconnects.
After each publish any publishes that failed with a NetioPublisherStatus::NO_RESOURCES status is retried in a second call to publish. The difference between this call and the original publish is that this will only send to subscribers who have not already been sent this buffer.
Note
The NetioPublisher::publish methods return a
netio3::NetioPublisherStatus which has an extra
possibile value w.r.t. netio3::NetioStatus.
Choosing a publish method
Like the netio3::NetioSender::send_data(), the
NetioPublisher provides netio3::NetioPublisher::publish()
methods for buffered or zero-copy sending of a single span of data or
an iovec of addresses to gather data from.
Note
If the netio3::NetioPublisherConfig specifies
zero-copy mode, you must use the
netio3::NetioPublisher::publish() method with a
key argument.
-
inline NetioPublisherStatus netio3::NetioPublisherTemplate::publish(uint64_t tag, std::span<const iovec> data, bool retry, uint8_t user_status = 0)
-
inline NetioPublisherStatus netio3::NetioPublisherTemplate::publish(uint64_t tag, std::span<const uint8_t> data, bool retry, uint8_t user_status = 0)
-
inline NetioPublisherStatus netio3::NetioPublisherTemplate::publish(uint64_t tag, std::span<const iovec> data, bool retry, uint8_t user_status, uint64_t key)
-
inline NetioPublisherStatus netio3::NetioPublisherTemplate::publish(uint64_t tag, std::span<uint8_t> data, bool retry, uint8_t user_status, uint64_t key)
Subscribing to published data
Subscribing to data is done with the
netio3::NetioSubscriber class. Once the subscription is
set up, all the data reception is handled by the ‘on data’ callback.
const std::string local_address{"192.168.200.10"};
const NetioSubscriberConfig config {
.backend_type = LIBFABRIC,
.backend_mode = RDMA,
.thread_safety = ThreadSafetyModel::SAFE
};
NetioSubscriber subscriber(config, evloop, local_address);
Here we instantiate a NetioSubscriber object with a
NetioSubscriberConfig and our local address. The
NetioSubscriberConfig struct is used to configure the
subscriber. In this case, we are selecting the libfabric backend in
RDMA mode.
std::atomic_int nmessages_received = 0;
subscriber.set_on_data_cb([&](std::uint64_t rcv_tag,
std::span<const std::uint8_t> payload,
std::uint8_t status) {
// Here we decode the message and deal with the data
nmessages_received++;
});
Next we configure the subscriber with a callback for when a subscription is successful and most importantly, a callback for when our subscibed data arrive. The data callback is passed messages each with its own tag and status. Alternatively, we could register the on_buffer callback to be given the raw contents of the buffer not split into individual messages.
// Address of publisher
EndPointAddress ep{"192.168.200.10", 1337};
const size_t RCV_BUFFER_SIZE = 2048;
const uint32_t NRCV_BUFFERS = 16;
const auto connection_params =
ConnectionParametersRecv{RCV_BUFFER_SIZE, NRCV_BUFFERS};
const uint64_t tag = 0xf007ba11;
auto status = subscriber.subscribe(tag, ep, connection_params);
if (status != NetioStatus::OK) {
// Handle error case
}
Then, attempt the subscription and wait for the on data callback to be called.
std::string input;
while (input.empty() || input[0] != 'q') {
std::cout << "Enter q to quit ";
std::getline(std::cin, input);
std::cout << "Received total of " << nmessages_received.load() << " messages\n";
}
status = subscriber.unsubscribe(tag, ep);
if (status != NetioStatus::OK) {
std::cout << "Error unsubscribing\n";
}
std::this_thread::sleep_for(10ms);
After we have finished receiving data, we unsubscribe from the publisher giving the same tag and enpoint we used in the subscribe call earlier.