Subscribing to felix-tohost
felix-tohost publishes streams of data identified by the FID. felix-client allows you to subscribe to these streams to start receiving data. felix-client will attempt to minimize the amount of network connections it opens to the server and re-use existing connections where possible. The latency of the subscribe operation therefore depends open whether a new connection needs to be opened or not. felix-client receives the necessary parameters to open connections from the bus file. If a connection is lost, felix-client will automatically attempt to re-establish the connection.
Synchronous and asynchronous subscription
There are two sets of functions to subscribe (or unsubscribe) to felix-tohost. The synchronous functions block until the subscription is done. The asynchronous functions return immediately and the subscription is done in the background.
The synchronous functions take a timeout parameter and will block up to this timeout. If the timeout is reached and not all subscriptions are done, the function throws an exception.
The asynchronous functions are postfixed with _nb (non-blocking). They return immediately and the subscription is
done in the background. The user can register a callback to be notified when the subscription (or unsubscription) is
done. The callback is called in the context of the event loop thread.
-
typedef std::function<void(uint64_t fid)> FelixClientThreadInterface::OnConnectCallback
-
typedef std::function<void(uint64_t fid)> FelixClientThreadInterface::OnDisconnectCallback
Warning
Do not block in the callback. This can deadlock the event loop and potentially impact the sender as well through backpressure.
Generally, it is recommended to use the synchronous functions. They massively simplify the code and should be sufficient in most cases. Only use the asynchronous functions if you have a good reason to do so.
Note
The callbacks are only called for asynchronous subscriptions. For synchronous subscriptions, success is indicated by the function returning without exception.
Subscribing
There are two functions to subscribe to a stream of data. The
first function
subscribes to a single FID, the
second one
subscribes to a list of FIDs.
-
virtual void FelixClientThreadExtension520::subscribe(std::uint64_t fid, std::chrono::milliseconds timeout) = 0
Synchronously subscribe to an fid.
The remote endpoint is identified from the felix-bus and a connection is established if not already present. The function will block until the link is subscribed or the timeout is reached. If the function returns without throwing an exception, the connection is established. In case the subscription fails the exception will contain more information about the cause.
- Throws:
felix::BusException – If the information is not available in the bus
felix::FailedSubscribeException – If the subscription fails
felix::AlreadySubscribedException – If the link is already subscribed
- Parameters:
fid – The fid to subscribe to
timeout – The timeout for establishing the connection
-
virtual void FelixClientThreadExtension520::subscribe(const std::vector<uint64_t> &fids, std::chrono::milliseconds timeout) = 0
Synchronously subscribe to a vector of fids.
The remote endpoints are identified from the felix-bus and connections are established if not already present. The function will block until the links are subscribed or the timeout is reached. If the function returns without throwing an exception, the connections are established. In case the subscription fails the exception will contain more information about the cause.
- Throws:
felix::BusException – If the information is not available in the bus
felix::FailedSubscribeException – If the subscription to any link fails
felix::AlreadySubscribedException – If all links are already subscribed
- Parameters:
fids – The fids to subscribe to
timeout – The timeout for establishing the connection
There are three different error cases:
The lookup in the bus failed. The underlying reason is reported by the exception and can be retrieved by calling
get_underlying_message.The subscription failed or timed out. In this case, the exception reports the current state. There are four functions to retrieve the state:
get_fids_success: returns the list of FIDs that were successfully subscribed to (you will receive data for these FIDs).get_fids_bad: returns the list of FIDs that were not subscribed to (you will not receive data for these FIDs).get_fids_failed: returns the list to which the attempt to subscribe failed to (increasing the timeout will not help, something fundamentally wrong happened).get_fids_timeout: returns the list of FIDs that timed out (increasing the timeout might help). If timed out subscription succeed after the function returned, felix-client will automatically attempt to unsubscribe from those links.
You are already subscribed to all provided FIDs.
Tip
If this function returns without exception, you are subscribed to all provided FIDs. No callbacks will be called for synchronous requests.
The asynchronous signatures are similar. They do not take a timeout parameter. They either take
a single FID or
a list of FIDs.
-
virtual void FelixClientThreadExtension520::subscribe_nb(std::uint64_t fid) = 0
Synchronously subscribe to an fid.
The remote endpoint is identified from the felix-bus and a connection is established if not already present. The function will return immediately. A success is communicated through the on_connect callback. If the subscription request fails, an exception is thrown. Other failures are not communicated (but the on_connect callback will not be called).
- Throws:
felix::BusException – If the information is not available in the bus
felix::FailedAsyncSubscribeException – If the subscription fails
felix::AlreadySubscribedException – If the link is already subscribed
- Parameters:
fid – The fid to subscribe to
timeout – The timeout for establishing the connection
-
virtual void FelixClientThreadExtension520::subscribe_nb(const std::vector<uint64_t> &fids) = 0
Aynchronously subscribe to a vector of fids.
The remote endpoints are identified from the felix-bus and connections are established if not already present. The function will return immediately. A success is communicated through the on_connect callback. If the subscription request fails, an exception is thrown. Other failures are not communicated (but the on_connect callback will not be called).
- Throws:
felix::BusException – If the information is not available in the bus
felix::FailedAsyncSubscribeException – If the subscription fails
felix::AlreadySubscribedException – If all links is already subscribed
- Parameters:
fids – The fids to subscribe to
timeout – The timeout for establishing the connection
There are still three different error cases:
The lookup in the bus failed. The underlying reason is reported by the exception and can be retrieved by calling
get_underlying_message.Sending the subscription request failed. The list of FIDs that failed can be retrieved by calling
get_fids_bad.You are already subscribed to all provided FIDs.
Note
Because the underlying subscription mechanism is asynchronous, there are no guarantees that the subscription will
be successful. If a request was sent, the only way to notice that the subscription failed is the absence of the
This callback.
Unsubscribing
Unsubscription either happens at destruction or explicitly. There is a
synchronous
and an asynchronous version.
-
virtual void FelixClientThreadExtension520::unsubscribe(std::uint64_t fid, std::chrono::milliseconds timeout) = 0
Synchronously unsubscribe from a tag.
This function unsubscribes from a tag. The function will block until the link is unsubscribed or the timeout is reached. If the function returns without throwing an exception, the link is unsubscribed. In case the unsubscription fails the exception will contain more information about the cause.
- Throws:
felix::NotSubscribedException – If the FID is not subscribed
felix::FailedUnsubscribeException – If the unsubscription fails
felix::UnsubscribeTimeoutException – If the unsubscription times out
- Parameters:
fid – The tag to unsubscribe from
timeout – The timeout for unsubscribing
-
virtual void FelixClientThreadExtension520::unsubscribe_nb(std::uint64_t fid) = 0
Asynchronously unsubscribe from a tag.
This function unsubscribes from a tag. The function will return immediately. A success is communicated through the on_disconnect callback. If the unsubscription request fails, an exception is thrown. Other failures are not communicated (but the on_disconnect callback will not be called).
- Throws:
felix::NotSubscribedException – If the FID is not subscribed
felix::FailedUnsubscribeException – If the unsubscription fails
- Parameters:
fid – The tag to unsubscribe from
Errors are again reported by exceptions. The following error cases are possible:
You are not subscribed to the given link
Sending the unsubscription request failed.
(Only for synchronous requests) The unsubscription timed out.
Note
If an unsubscription request timed out but succeeds in the future, the unsubscription will be effective.
Loosing connection
If the a connection is lost (e.g. because felix-tohost went down), the client will automatically attempt to
re-establish the connection. The user can register a callback to be notified when the
connection is lost or
re-established.
-
typedef std::function<void(const std::set<std::uint64_t> &fids)> FelixClientThreadExtension520::OnSubscriptionsLostCallback
-
typedef std::function<void(std::uint64_t fid)> FelixClientThreadExtension520::OnResubscriptionCallback
Deprecated API
In the old API, these functions were synchronous whenever the config parameter FELIX_CLIENT_TIMEOUT was
set to a value greater than 0. Otherwise it was asynchronous. Also, the old API always calls callbacks (even for
synchronous requests) and reports most errors as a felix::ResourceNotAvailableException without
further context. In case of partial success, the state was not well defined.
Example
1void on_data(const uint64_t fid, const std::span<const uint8_t> data, const uint8_t status) {
2 std::println("Received data for fid {:#x}: {} bytes, status {}", fid, data.size(), status);
3}
4
5// Create a subscription
6try {
7 fct.subscribe({0x1000000000000000, 0x1010000000000000}, 1000ms);
8} catch (const felix::FelixClientException& e) {
9 std::println("Failed to subscribe: {}", e.what());
10 std::println("FIDs success: {::#x}", e.get_fids_success());
11 std::println("FIDs failed: {::#x}", e.get_fids_failed());
12 std::println("FIDs timeout: {::#x}", e.get_fids_timeout());
13 return;
14}
15
16// Receive data ...
17
18try {
19 client.unsubscribe({0x1000000000000000, 0x1010000000000000});
20} catch (const felix::FelixClientException& e) {
21 std::println("Failed to unsubscribe: {}", e.what());
22 return;
23}
(Incomplete) example using asynchronous connection establishment (exception handling omitted):
1const auto fid = 0x1000000000000000;
2
3std::atomic_bool connected{false};
4
5void on_connect_callback(const std::uint64_t fid_connected) {
6 std::println("Connected to FID: {:#x}", fid_connected);
7 if (fid_connected == fid) {
8 connected = true;
9 }
10}
11
12fct.subscribe_nb(fid);
13
14connected.wait(false);
15// Questions:
16// - What about timeout?
17// - If using a timeout, what happens if the connection is established after the timeout?
18// - Track multiple FIDs