Subscribing to felix-tohost

felix-tohost publishes streams of data identified by the FID. felix-client allows you to subscribe to these streams to start receiving data. felix-client will attempt to minimize the amount of network connections it opens to the server and re-use existing connections where possible. The latency of the subscribe operation therefore depends open whether a new connection needs to be opened or not. felix-client receives the necessary parameters to open connections from the bus file. If a connection is lost, felix-client will automatically attempt to re-establish the connection.

Synchronous and asynchronous subscription

There are two sets of functions to subscribe (or unsubscribe) to felix-tohost. The synchronous functions block until the subscription is done. The asynchronous functions return immediately and the subscription is done in the background.

The synchronous functions take a timeout parameter and will block up to this timeout. If the timeout is reached and not all subscriptions are done, the function throws an exception.

The asynchronous functions are postfixed with _nb (non-blocking). They return immediately and the subscription is done in the background. The user can register a callback to be notified when the subscription (or unsubscription) is done. The callback is called in the context of the event loop thread.

typedef std::function<void(uint64_t fid)> FelixClientThreadInterface::OnConnectCallback
typedef std::function<void(uint64_t fid)> FelixClientThreadInterface::OnDisconnectCallback

Warning

Do not block in the callback. This can deadlock the event loop and potentially impact the sender as well through backpressure.

Generally, it is recommended to use the synchronous functions. They massively simplify the code and should be sufficient in most cases. Only use the asynchronous functions if you have a good reason to do so.

Note

The callbacks are only called for asynchronous subscriptions. For synchronous subscriptions, success is indicated by the function returning without exception.

Subscribing

There are two functions to subscribe to a stream of data. The first function subscribes to a single FID, the second one subscribes to a list of FIDs.

virtual void FelixClientThreadExtension520::subscribe(std::uint64_t fid, std::chrono::milliseconds timeout) = 0

Synchronously subscribe to an fid.

The remote endpoint is identified from the felix-bus and a connection is established if not already present. The function will block until the link is subscribed or the timeout is reached. If the function returns without throwing an exception, the connection is established. In case the subscription fails the exception will contain more information about the cause.

Throws:
Parameters:
  • fid – The fid to subscribe to

  • timeout – The timeout for establishing the connection

virtual void FelixClientThreadExtension520::subscribe(const std::vector<uint64_t> &fids, std::chrono::milliseconds timeout) = 0

Synchronously subscribe to a vector of fids.

The remote endpoints are identified from the felix-bus and connections are established if not already present. The function will block until the links are subscribed or the timeout is reached. If the function returns without throwing an exception, the connections are established. In case the subscription fails the exception will contain more information about the cause.

Throws:
Parameters:
  • fids – The fids to subscribe to

  • timeout – The timeout for establishing the connection

There are three different error cases:

  1. The lookup in the bus failed. The underlying reason is reported by the exception and can be retrieved by calling get_underlying_message.

  2. The subscription failed or timed out. In this case, the exception reports the current state. There are four functions to retrieve the state:

    • get_fids_success: returns the list of FIDs that were successfully subscribed to (you will receive data for these FIDs).

    • get_fids_bad: returns the list of FIDs that were not subscribed to (you will not receive data for these FIDs).

    • get_fids_failed: returns the list to which the attempt to subscribe failed to (increasing the timeout will not help, something fundamentally wrong happened).

    • get_fids_timeout: returns the list of FIDs that timed out (increasing the timeout might help). If timed out subscription succeed after the function returned, felix-client will automatically attempt to unsubscribe from those links.

  3. You are already subscribed to all provided FIDs.

Tip

If this function returns without exception, you are subscribed to all provided FIDs. No callbacks will be called for synchronous requests.

The asynchronous signatures are similar. They do not take a timeout parameter. They either take a single FID or a list of FIDs.

virtual void FelixClientThreadExtension520::subscribe_nb(std::uint64_t fid) = 0

Synchronously subscribe to an fid.

The remote endpoint is identified from the felix-bus and a connection is established if not already present. The function will return immediately. A success is communicated through the on_connect callback. If the subscription request fails, an exception is thrown. Other failures are not communicated (but the on_connect callback will not be called).

Throws:
Parameters:
  • fid – The fid to subscribe to

  • timeout – The timeout for establishing the connection

virtual void FelixClientThreadExtension520::subscribe_nb(const std::vector<uint64_t> &fids) = 0

Aynchronously subscribe to a vector of fids.

The remote endpoints are identified from the felix-bus and connections are established if not already present. The function will return immediately. A success is communicated through the on_connect callback. If the subscription request fails, an exception is thrown. Other failures are not communicated (but the on_connect callback will not be called).

Throws:
Parameters:
  • fids – The fids to subscribe to

  • timeout – The timeout for establishing the connection

There are still three different error cases:

  1. The lookup in the bus failed. The underlying reason is reported by the exception and can be retrieved by calling get_underlying_message.

  2. Sending the subscription request failed. The list of FIDs that failed can be retrieved by calling get_fids_bad.

  3. You are already subscribed to all provided FIDs.

Note

Because the underlying subscription mechanism is asynchronous, there are no guarantees that the subscription will be successful. If a request was sent, the only way to notice that the subscription failed is the absence of the This callback.

Unsubscribing

Unsubscription either happens at destruction or explicitly. There is a synchronous and an asynchronous version.

virtual void FelixClientThreadExtension520::unsubscribe(std::uint64_t fid, std::chrono::milliseconds timeout) = 0

Synchronously unsubscribe from a tag.

This function unsubscribes from a tag. The function will block until the link is unsubscribed or the timeout is reached. If the function returns without throwing an exception, the link is unsubscribed. In case the unsubscription fails the exception will contain more information about the cause.

Throws:
Parameters:
  • fid – The tag to unsubscribe from

  • timeout – The timeout for unsubscribing

virtual void FelixClientThreadExtension520::unsubscribe_nb(std::uint64_t fid) = 0

Asynchronously unsubscribe from a tag.

This function unsubscribes from a tag. The function will return immediately. A success is communicated through the on_disconnect callback. If the unsubscription request fails, an exception is thrown. Other failures are not communicated (but the on_disconnect callback will not be called).

Throws:
Parameters:

fid – The tag to unsubscribe from

Errors are again reported by exceptions. The following error cases are possible:

  1. You are not subscribed to the given link

  2. Sending the unsubscription request failed.

  3. (Only for synchronous requests) The unsubscription timed out.

Note

If an unsubscription request timed out but succeeds in the future, the unsubscription will be effective.

Loosing connection

If the a connection is lost (e.g. because felix-tohost went down), the client will automatically attempt to re-establish the connection. The user can register a callback to be notified when the connection is lost or re-established.

typedef std::function<void(const std::set<std::uint64_t> &fids)> FelixClientThreadExtension520::OnSubscriptionsLostCallback
typedef std::function<void(std::uint64_t fid)> FelixClientThreadExtension520::OnResubscriptionCallback

Deprecated API

In the old API, these functions were synchronous whenever the config parameter FELIX_CLIENT_TIMEOUT was set to a value greater than 0. Otherwise it was asynchronous. Also, the old API always calls callbacks (even for synchronous requests) and reports most errors as a felix::ResourceNotAvailableException without further context. In case of partial success, the state was not well defined.

Example

 1void on_data(const uint64_t fid, const std::span<const uint8_t> data, const uint8_t status) {
 2  std::println("Received data for fid {:#x}: {} bytes, status {}", fid, data.size(), status);
 3}
 4
 5// Create a subscription
 6try {
 7  fct.subscribe({0x1000000000000000, 0x1010000000000000}, 1000ms);
 8} catch (const felix::FelixClientException& e) {
 9  std::println("Failed to subscribe: {}", e.what());
10  std::println("FIDs success: {::#x}", e.get_fids_success());
11  std::println("FIDs failed: {::#x}", e.get_fids_failed());
12  std::println("FIDs timeout: {::#x}", e.get_fids_timeout());
13  return;
14}
15
16// Receive data ...
17
18try {
19  client.unsubscribe({0x1000000000000000, 0x1010000000000000});
20} catch (const felix::FelixClientException& e) {
21  std::println("Failed to unsubscribe: {}", e.what());
22  return;
23}

(Incomplete) example using asynchronous connection establishment (exception handling omitted):

 1const auto fid = 0x1000000000000000;
 2
 3std::atomic_bool connected{false};
 4
 5void on_connect_callback(const std::uint64_t fid_connected) {
 6  std::println("Connected to FID: {:#x}", fid_connected);
 7  if (fid_connected == fid) {
 8    connected = true;
 9  }
10}
11
12fct.subscribe_nb(fid);
13
14connected.wait(false);
15// Questions:
16// - What about timeout?
17// - If using a timeout, what happens if the connection is established after the timeout?
18// - Track multiple FIDs