.. _publishing: Publishing / Subscribing ======================== Publishing and subscribing is done in terms of tags. A tags is a 64-bit value that identifies which stream of data is being published/subscribed to (in the case of FELIX a tag is a ``FelixID``). A subscriber contacts a publisher on its subscription port and sends a subscription message for each tag that it wants data from. When a publisher publishes data for a given tag, it sends data to all of the subscribers who have subscribed to that tag. The publish / subscribe classes are built on the backs of the sender and receiver classes. Internally :cpp:type:`netio3::NetioPublisher` uses a :cpp:type:`netio3::NetioSender` to publish its data and :cpp:class:`netio3::NetioSubscriber` has a :cpp:class:`netio3::NetioReceiver` to receive the subscribed data. A separate connection using the netio3-backend directly is used for subscription requests and acknowledgements. As with the sender and receiver classes, the publish and subscribe classes can work in an immediate mode, where each message is sent in a separate packet, or a buffered mode where data are accumulated until a packet is full or the time since the first message in the packet reaches a threshold. .. note:: The :cpp:type:`netio3::NetioPublisher` class is actually an alias for a :cpp:class:`netio3::NetioPublisherTemplate` template class with the type :cpp:class:`netio3::SubscriptionEndpointMap`. Publishing ^^^^^^^^^^ .. code-block:: cpp const EndPointAddress local_ep{"192.168.200.10", 1337}; const NetioPublisherConfig config { .backend_type = NetworkType::LIBFABRIC, .backend_mode = NetworkMode::RDMA, .buffersize = 16384, .nbuffers = 16, .method = SendMethod::BUFFERED, .thread_safety = ThreadSafetyModel::SAFE }; NetioPublisher publisher(config, evloop, local_ep); The configuration of the publisher is described by a :cpp:class:`netio3::NetioPublisherConfig` object. This selects the type of backend and mode of communication. The NetioPublisher will listen on ``local_ep`` for subscription requests .. note:: Subscription requests and acknowledgements are always sent with the the asyncmsg backend (:cpp:class:`netio3::asyncmsg::BackendAsyncmsg`). This is independent of the backend used for the data transfer. Do not worry if you see messages in the logs from this backend when you select a different backend for your data. Of course, there is no point in publishing unless someone has subscribed to the data. For our example program, let's register a callback funstion to be executed when someone subscribes and wait for that to be called before our first publish attempt. .. code-block:: cpp const uint64_t tag = 0xf007ba11; std::atomic_bool subscriber = false; publisher.set_on_subscription([&] (uint64_t subs_tag, const EndPointAddress&) { if (subs_tag == tag) { std::cout << "Subscribed to our tag\n"; subscriber.store(true); subscriber.notify_all(); } }); // Wait for subscriber subscriber.wait(false); Now we can try publishing some data. .. code-block:: cpp std::vector buffer {1, 2, 3, 4}; std::vector iov {{.iov_base = buffer.data(), .iov_len = buffer.size()}}; while (subscriber.load()) { auto status = publisher.publish(tag, iov, false); while (status == NetioPublisherStatus::NO_RESOURCES) { status = publisher.publish(tag, iov, true); } buffer[0]++; std::this_thread::sleep_for(10ms); } Here we try publishing data for tag ``f007ba11``, describing the data with an iovec until the subscriber unsubscribes or disconnects. After each publish any publishes that failed with a `NetioPublisherStatus::NO_RESOURCES` status is retried in a second call to publish. The difference between this call and the original publish is that this will only send to subscribers who have not already been sent this buffer. .. note:: The NetioPublisher::publish methods return a :cpp:enum:`netio3::NetioPublisherStatus` which has an extra possibile value w.r.t. :cpp:enum:`netio3::NetioStatus`. Choosing a publish method """"""""""""""""""""""""" Like the :cpp:func:`netio3::NetioSender::send_data`, the NetioPublisher provides :cpp:func:`netio3::NetioPublisher::publish` methods for buffered or zero-copy sending of a single span of data or an iovec of addresses to gather data from. .. note:: If the :cpp:struct:`netio3::NetioPublisherConfig` specifies zero-copy mode, you must use the :cpp:func:`netio3::NetioPublisher::publish` method with a key argument. .. doxygenfunction:: netio3::NetioPublisherTemplate::publish(uint64_t tag,std::span data, bool retry, uint8_t user_status = 0) :no-link: :outline: .. doxygenfunction:: netio3::NetioPublisherTemplate::publish(uint64_t tag,std::span data, bool retry, uint8_t user_status = 0) :no-link: :outline: .. doxygenfunction:: netio3::NetioPublisherTemplate::publish(uint64_t tag,std::span data, bool retry, uint8_t user_status, uint64_t key) :no-link: :outline: .. doxygenfunction:: netio3::NetioPublisherTemplate::publish(uint64_t tag,std::span data,bool retry,uint8_t user_status,uint64_t key) :no-link: :outline: .. _subscribing: Subscribing to published data ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Subscribing to data is done with the :cpp:class:`netio3::NetioSubscriber` class. Once the subscription is set up, all the data reception is handled by the 'on data' callback. .. code-block:: cpp const std::string local_address{"192.168.200.10"}; const NetioSubscriberConfig config { .backend_type = LIBFABRIC, .backend_mode = RDMA, .thread_safety = ThreadSafetyModel::SAFE }; NetioSubscriber subscriber(config, evloop, local_address); Here we instantiate a NetioSubscriber object with a NetioSubscriberConfig and our local address. The :cpp:class:`NetioSubscriberConfig` struct is used to configure the subscriber. In this case, we are selecting the `libfabric` backend in RDMA mode. .. code-block:: cpp std::atomic_int nmessages_received = 0; subscriber.set_on_data_cb([&](std::uint64_t rcv_tag, std::span payload, std::uint8_t status) { // Here we decode the message and deal with the data nmessages_received++; }); Next we configure the subscriber with a callback for when a subscription is successful and most importantly, a callback for when our subscibed data arrive. The data callback is passed messages each with its own tag and status. Alternatively, we could register the `on_buffer` callback to be given the raw contents of the buffer not split into individual messages. .. code-block:: cpp // Address of publisher EndPointAddress ep{"192.168.200.10", 1337}; const size_t RCV_BUFFER_SIZE = 2048; const uint32_t NRCV_BUFFERS = 16; const auto connection_params = ConnectionParametersRecv{RCV_BUFFER_SIZE, NRCV_BUFFERS}; const uint64_t tag = 0xf007ba11; auto status = subscriber.subscribe(tag, ep, connection_params); if (status != NetioStatus::OK) { // Handle error case } Then, attempt the subscription and wait for the on data callback to be called. .. code-block:: cpp std::string input; while (input.empty() || input[0] != 'q') { std::cout << "Enter q to quit "; std::getline(std::cin, input); std::cout << "Received total of " << nmessages_received.load() << " messages\n"; } status = subscriber.unsubscribe(tag, ep); if (status != NetioStatus::OK) { std::cout << "Error unsubscribing\n"; } std::this_thread::sleep_for(10ms); After we have finished receiving data, we unsubscribe from the publisher giving the same tag and enpoint we used in the subscribe call earlier.