LCOV - code coverage report
Current view: top level - felix-client/src - felix_client.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 899 1161 77.4 %
Date: 2025-06-10 03:23:28 Functions: 71 80 88.8 %

          Line data    Source code
       1             : #include "felix/felix_client.hpp"
       2             : #include "felix/felix_client_exception.hpp"
       3             : #include "felix/felix_client_util.hpp"
       4             : #include "felix/felix_client_thread.hpp"
       5             : 
       6             : #include "felix/felix_fid.h"
       7             : 
       8             : #include "clog.h"
       9             : 
      10             : extern "C" {
      11             :     #include "jWrite.h"
      12             : }
      13             : 
      14             : #include <algorithm>
      15             : #include <uuid/uuid.h>
      16             : #include <unordered_set>
      17             : #include <numeric>
      18             : #include <regex>
      19             : #include <unistd.h>
      20             : #include <set>
      21             : 
      22         673 : FelixClient::FelixClient(const std::string& local_ip_or_interface,
      23             :                          std::string bus_dir,
      24             :                          std::string bus_group_name,
      25             :                          unsigned log_level,
      26             :                          bool verbose_bus,
      27             :                          unsigned netio_pages,
      28         673 :                          unsigned netio_pagesize) :
      29         673 :             ready(false),
      30         673 :             timer_delay_millis(0),
      31         673 :             ctx({0}),
      32         673 :             user_timer({0}),
      33         673 :             unbuffered_netio_pages(netio_pages),
      34         673 :             unbuffered_netio_pagesize(netio_pagesize),
      35        1346 :             local_ip_address(get_ip_from_interface(local_ip_or_interface)) {
      36             : 
      37             :     // // handle here to make sure trace logging looks ok
      38             :     // local_ip_address = get_ip_from_interface(local_ip_or_interface);
      39             : 
      40         673 :     if (local_ip_address == "") {
      41           0 :         clog_fatal("Could not deduce the local ip address from %s", local_ip_or_interface.c_str());
      42           0 :         throw FelixClientException("felix-client: Could not deduce the local ip address from local_ip_or_interface");
      43             :     }
      44             : 
      45         673 :     clog_info("Local IP: %s", local_ip_address.c_str());
      46             : 
      47         673 :     clog_info("Initializing netio-next");
      48         673 :     netio_init(&ctx);
      49             : 
      50         673 :     clog_set_level(log_level);
      51             : 
      52             :     // init callback
      53         673 :     ctx.evloop.cb_init = on_init_eventloop;
      54         673 :     ctx.evloop.data = this;
      55             : 
      56         673 :     contextHandler = std::make_unique<FelixClientContextHandler>();
      57             : 
      58             : 
      59         673 :     send_signal = netio_signal();
      60             : 
      61         673 :     netio_signal_init(&ctx.evloop, &send_signal);
      62         673 :     send_signal.data = this;
      63         673 :     send_signal.cb = on_send_signal;
      64             : 
      65             : 
      66         673 :     sub_signal = netio_signal();
      67         673 :     netio_signal_init(&ctx.evloop, &sub_signal);
      68         673 :     sub_signal.data = this;
      69         673 :     sub_signal.cb = on_sub_signal;
      70             : 
      71             :     // setup timer for reconnection/resubscription
      72         673 :     netio_timer_init(&ctx.evloop, &subscribe_timer);
      73         673 :     subscribe_timer.cb = on_timer;
      74         673 :     subscribe_timer.data = this;
      75         673 :     terminating_felix_client = false;
      76         673 :     contextHandler->terminatingFelixClient = false;
      77             : 
      78         673 :     uint64_t actual_delay = 1000;
      79         673 :     netio_timer_start_ms(&subscribe_timer, actual_delay);
      80             : 
      81         673 :     user_timer_init();
      82             : 
      83         673 :     clog_info("Initializing felix-bus");
      84        1346 :     bus.set_path(bus_dir);
      85             : 
      86         673 :     bus.set_groupname(bus_group_name);
      87         673 :     bus.set_verbose(verbose_bus);
      88         673 : }
      89             : 
      90           0 : std::vector<int> FelixClient::parseCpuRange(const std::string &range) {
      91           0 :     static const std::regex re(R"(((\d+)-(\d+))|(\d+))");
      92           0 :     std::smatch sm;
      93           0 :     std::vector<int> data;
      94           0 :     for (std::string s(range); std::regex_search(s, sm, re); s = sm.suffix()) {
      95           0 :         if (sm[1].str().empty()) {
      96           0 :             data.push_back(std::stoi(sm[0]));
      97             :         } else {
      98           0 :             for (int i = std::stoi(sm[2]); i <= std::stoi(sm[3]); ++i) {
      99           0 :                 data.push_back(i);
     100             :             }
     101             :         }
     102           0 :     }
     103           0 :     return data;
     104           0 : }
     105             : 
     106           0 : void FelixClient::set_thread_affinity(const std::string& affinity) {
     107           0 :     this->affinity = affinity;
     108           0 : }
     109             : 
     110         547 : FelixClient::~FelixClient(){
     111         547 :     clog_info("Cleaning up FelixClient.");
     112         781 :     for(auto& it : unbuf_mem_regions){
     113         234 :         clog_debug("Removing mem reg: 0x%x. There are %d mem regions remaining.", it.second.mr, unbuf_mem_regions.size());
     114         234 :         it.second.size = 0;
     115         234 :         free(it.second.data);
     116             :     }
     117        3260 : }
     118             : 
     119             : // event loop control
     120         673 : void FelixClient::run() {
     121         673 :     if (affinity != "") {
     122           0 :         std::vector<int> cpus = parseCpuRange(affinity);
     123           0 :         if (!cpus.empty()) {
     124           0 :             cpu_set_t pmask;
     125           0 :             CPU_ZERO(&pmask);
     126           0 :             for (int cpu : cpus) {
     127           0 :                 CPU_SET(cpu, &pmask);
     128             :             }
     129           0 :             int s = sched_setaffinity(0, sizeof(pmask), &pmask);
     130           0 :             if (s != 0) {
     131           0 :                 clog_error("Setting affinity to CPU rage [%s] failed with error = %d", affinity.c_str(), s);
     132             :             }
     133             :         }
     134           0 :     }
     135         673 :     pthread_setname_np(pthread_self(), "felix-client-thread");
     136             : 
     137         673 :     evloop_thread_id = std::this_thread::get_id();
     138         673 :     clog_info("In run: 0x%x", evloop_thread_id);
     139         673 :     clog_info("Running netio_run");
     140         673 :     netio_run(&ctx.evloop);
     141         559 :     clog_info("Finished netio_run");
     142         559 : }
     143             : 
     144             : /**
     145             :  * @brief Stops the eventloop and clean up resources
     146             :  *
     147             :  * Since the memory management in netio-next relies on connections and endpoints being closed properly,
     148             :  * this function takes care to execute this procedure in the right order and then stopping the eventloop afterwards.
     149             :  * Also netio-next and its data structures use c-style memory allocation, so resources need to be freed manually.
     150             :  *
     151             :  * First, all subscriptions are unsubscribed, then all send_sockets close their connections and they are eventually deleted.
     152             :  * This has to go through the eventloop. The timeouts ensure that everything is closed even if the eventloop or some other
     153             :  * network resources get stuck.
     154             :  *
     155             :  * When using felix-client-thread/felix-interface, this fuction is called automatically in the destructor.
     156             :  */
     157         595 : void FelixClient::stop() {
     158         595 :     clog_info("Stopping netio");
     159         595 :     terminating_felix_client = true;
     160         595 :     contextHandler->terminatingFelixClient = true;
     161         595 :     netio_timer_close(&ctx.evloop, &subscribe_timer);
     162         595 :     netio_timer_close(&ctx.evloop, &user_timer);
     163             : 
     164             :     //unsubscribing all elinks
     165         595 :     std::vector<uint64_t> fids_to_unsubscribe = contextHandler->getFidsToUnsubscribe();
     166         595 :     clog_debug("Closing %d subscriptions", fids_to_unsubscribe.size());
     167        2628 :     for(auto fid : fids_to_unsubscribe){
     168        2033 :         if(evloop_thread_id == std::this_thread::get_id()){
     169          49 :             send_unsubscribe_msg(fid);
     170             :         } else {
     171        1984 :             unsubscribe(fid);
     172             :         }
     173             :     }
     174             : 
     175             :     uint timeout = 1000;
     176             :     uint t_expired = 0;
     177             :     uint span = 25U;
     178             : 
     179             : 
     180        2704 :     while (!contextHandler->areAllFidsUnsubscribed() && (t_expired <= timeout)){
     181        2109 :         std::this_thread::sleep_for(std::chrono::milliseconds(span));
     182        2109 :         t_expired += span;
     183             : 
     184             :     }
     185         595 :     if (timeout <= t_expired){
     186          48 :         clog_error("Timeout during stopping netio, not all subscriptions have been closed. %d subscriptions remain open.", contextHandler->getFidsToUnsubscribe().size());
     187             :     }
     188             : 
     189             : 
     190         595 :     std::vector<netio_send_socket*> send_sockets = contextHandler->getSendSockets();
     191         595 :     total_number_send_sockets.store(send_sockets.size());
     192         595 :     clog_debug("Closing netio %d send sockets.", total_number_send_sockets.load());
     193        1096 :     for(auto& socket : send_sockets){
     194         501 :         netio_disconnect(socket);
     195             :     }
     196             : 
     197             :     t_expired = 0;
     198        1092 :     while ((total_number_send_sockets.load(std::memory_order_relaxed) != 0) && (t_expired <= timeout)){
     199         497 :         std::this_thread::sleep_for(std::chrono::milliseconds(span));
     200         497 :         t_expired += span;
     201             :     }
     202         595 :     if (timeout <= t_expired){
     203           0 :         clog_error("Timeout during stopping netio, not all send sockets have been disconnected, %d remain open.", total_number_send_sockets.load());
     204             :     }
     205         595 :     netio_signal_close(&ctx.evloop, &send_signal);
     206         595 :     netio_signal_close(&ctx.evloop, &sub_signal);
     207         595 :     netio_terminate_signal(&ctx.evloop);
     208         595 : }
     209             : 
     210             : 
     211             : /**
     212             :  * @brief Sends a single message
     213             :  *
     214             :  * This function is executed by the eventloop, which ensures there are no data races for completion objects/buffers in netio-next.
     215             :  *
     216             :  * In buffered and unbuffered mode, the message is copied either in the buffer or in a memory region.
     217             :  * Hence, after this function returns it is safe for the user to reuse the memory of this message.
     218             :  *
     219             :  * @param fid: FID
     220             :  * @param header: ToFlxHeader, contains elink and msg size
     221             :  * @param data: pointer to data to send
     222             :  * @param size: size of message
     223             :  * @param flush: for buffered mode: should the buffer be flushed after this message
     224             :  * @param prom_send_done: promise to notify user thread that the message was sent.
     225             :  */
     226         593 : int FelixClient::send_evloop(netio_tag_t fid, ToFlxHeader header, const uint8_t* data, size_t size, bool flush, std::promise<uint8_t> prom_send_done){
     227         593 :     if(contextHandler->getSocketState(fid) == DISCONNECTED){
     228           0 :         clog_error("Unbuffered socket is disconnected. Thread: 0x%x FID: 0x%x", std::this_thread::get_id(), fid);
     229           0 :         std::exception_ptr e = std::make_exception_ptr(FelixClientSendConnectionException());
     230           0 :         prom_send_done.set_exception(e);
     231           0 :         return 1;
     232           0 :     }
     233             : 
     234         593 :     int status = NETIO_STATUS_OK;
     235             : 
     236         593 :     if (contextHandler->getType(fid) == BUFFERED) {
     237         317 :         if(size > contextHandler->getPagesize(fid)){
     238           0 :             clog_error("Trying to send message that exceed the page size.");
     239           0 :             return 1;
     240             :         }
     241         317 :         netio_buffered_send_socket* buffered_socket = get_buffered_send_socket(fid);
     242         317 :         if(!contextHandler->exists(static_cast<void*>(buffered_socket))){return NETIO_STATUS_OK;}
     243         317 :         size_t len = sizeof(header) + size;
     244         317 :         std::vector<uint8_t> buf;
     245         317 :         buf.reserve(len);
     246         317 :         memcpy(buf.data(), &header, sizeof(header));
     247         317 :         memcpy(buf.data() + sizeof(header), data, size);
     248         317 :         clog_debug("Sending (buffered) for 0x%x, i.e. tag %d, socket 0x%p", fid, header.elink, buffered_socket);
     249         317 :         status = netio_buffered_send(buffered_socket, (void*)buf.data(), len);
     250         317 :         clog_debug("Status %d", status);
     251             : 
     252         317 :         if (flush) {
     253         317 :             netio_buffered_flush(buffered_socket);
     254             :         }
     255             : 
     256         317 :     } else {
     257         276 :         if(size > UNBUFFERED_MR_SIZE){
     258           0 :             clog_error("Trying to send message that exceed the page size.");
     259           0 :             return 1;
     260             :         }
     261         276 :         netio_send_socket* unbuffered_socket = get_unbuffered_send_socket(fid);
     262         276 :         std::string addr = contextHandler->getAddress(fid);
     263         276 :         clog_debug("Begin sending process. Buffer ptr: 0x%p", unbuf_mem_regions[addr].data);
     264         276 :         memcpy(unbuf_mem_regions[addr].data, &header, sizeof(header));
     265         276 :         memcpy((uint8_t*)unbuf_mem_regions[addr].data + sizeof(header), data, size);
     266         276 :         clog_debug("Memcopy done. Total size: %d (header: %d + size: %d)", sizeof(header) + size, sizeof(header), size);
     267         276 :         if(!contextHandler->exists(static_cast<void*>(unbuffered_socket))){return NETIO_STATUS_OK;}
     268         276 :         status = netio_send(unbuffered_socket, &unbuf_mem_regions[addr], unbuf_mem_regions[addr].data, sizeof(header) + size, (uint64_t)&unbuf_mem_regions[addr]);
     269         276 :     }
     270         593 :     prom_send_done.set_value(status);
     271         593 :     return status;
     272             : }
     273             : 
     274             : /**
     275             :  * @brief Sends a vector of multiple messages
     276             :  *
     277             :  * This function is executed by the eventloop, which ensures there are no data races for completion objects/buffers in netio-next.
     278             :  * It works similar to the previous send_eventloop function for a single message, but takes instead a vector of messages that will be sent in one shot.
     279             :  * In unbuffered mode the data is copied directly into the memory region of the corresponding socket while buffered mode uses an additional step.
     280             :  * THe function does not have the flush parameter in buffered mode which cause the buffere to be flushed after every message
     281             :  * (This behavior was requested to speed up the sending of padded messages for e.g. programming FW in the FE electronics.)
     282             :  *
     283             :  * @param fid: FID
     284             :  * @param msgs: std::vector of pointer to all messages to be sent (including the padding)
     285             :  * @param sizes: std::vector of message sizes for the corresponding messages in the previous parameter
     286             :  * @param prom_send_done: promise to notify user thread that the message was sent
     287             :  */
     288         303 : int FelixClient::send_evloop(netio_tag_t fid, const std::vector<const uint8_t*>& msgs, const std::vector<size_t>& sizes, std::promise<uint8_t> prom_send_done){
     289             : 
     290         303 :     if(contextHandler->getSocketState(fid) == DISCONNECTED){
     291           0 :         clog_error("Unbuffered socket is disconnected. Thread: 0x%x FID: 0x%x", std::this_thread::get_id(), fid);
     292           0 :         std::exception_ptr e = std::make_exception_ptr(FelixClientSendConnectionException());
     293           0 :         prom_send_done.set_exception(e);
     294           0 :         return 1;
     295           0 :     }
     296             : 
     297         303 :     ToFlxHeader header;
     298         606 :     size_t totalSize = accumulate(sizes.begin(), sizes.end(), 0) + sizeof(header) * msgs.size();
     299         303 :     size_t currentSize = 0;
     300             : 
     301             : 
     302         303 :     int status = NETIO_STATUS_OK;
     303             : 
     304         303 :     if (contextHandler->getType(fid) == BUFFERED) {
     305         133 :         if(totalSize > contextHandler->getPagesize(fid)){
     306           0 :             clog_error("Trying to send message that exceed the page size.");
     307           0 :             return 1;
     308             :         }
     309         133 :         netio_buffered_send_socket* buffered_socket = get_buffered_send_socket(fid);
     310         133 :         clog_debug("Socket = 0x%x with state: %d", buffered_socket, contextHandler->getSocketState(fid));
     311         133 :         if(!contextHandler->exists(static_cast<void*>(buffered_socket))){return NETIO_STATUS_OK;}
     312             : 
     313         133 :         std::vector<uint8_t> buf;
     314         133 :         buf.reserve(totalSize);
     315             : 
     316         407 :         for(unsigned i = 0; i < msgs.size(); i++){
     317             : 
     318         274 :             header.length = sizes[i];
     319         274 :             header.reserved = 0;
     320         274 :             header.elink = get_elink(fid);
     321             : 
     322         274 :             memcpy(buf.data() + currentSize, &header, sizeof(header));
     323         274 :             memcpy(buf.data() + currentSize + sizeof(header), msgs[i], sizes[i]);
     324             : 
     325         274 :             currentSize += sizeof(header) + sizes[i];
     326             :         }
     327             : 
     328         133 :         clog_debug("Sending (buffered) for 0x%x", fid);
     329         133 :         clog_debug("Sending %d messages with the total size of %d", msgs.size(), totalSize);
     330         133 :         {
     331         133 :             status = netio_buffered_send(buffered_socket, (void*)buf.data(), totalSize);
     332         133 :             clog_debug("Status %d", status);
     333         133 :             netio_buffered_flush(buffered_socket);
     334             :         }
     335         133 :     } else {
     336             :         // Unbuffered
     337         170 :         if(totalSize > UNBUFFERED_MR_SIZE){
     338           0 :             clog_error("Trying to send message that exceed the page size.");
     339           0 :             return 1;
     340             :         }
     341         170 :         std::string addr = contextHandler->getAddress(fid);
     342         170 :         netio_send_socket* unbuffered_socket = get_unbuffered_send_socket(fid);
     343         170 :         if(!contextHandler->exists(static_cast<void*>(unbuffered_socket))){return NETIO_STATUS_OK;}
     344             : 
     345         514 :         for(unsigned i = 0; i < msgs.size(); i++){
     346             : 
     347         344 :             header.length = sizes[i];
     348         344 :             header.reserved = 0;
     349         344 :             header.elink = get_elink(fid);
     350             : 
     351         344 :             memcpy((uint8_t*)unbuf_mem_regions[addr].data + currentSize, &header, sizeof(header));
     352         344 :             memcpy((uint8_t*)unbuf_mem_regions[addr].data + currentSize + sizeof(header), msgs[i], sizes[i]);
     353             : 
     354         344 :             currentSize += sizeof(header) + sizes[i];
     355             :         }
     356             : 
     357         170 :         clog_debug("Sending (unbuffered) for 0x%x", fid);
     358         170 :         clog_debug("Sending %d messages with the total size of %d", msgs.size(), totalSize);
     359             : 
     360         170 :         {
     361         170 :             status = netio_send(unbuffered_socket, &unbuf_mem_regions[addr], unbuf_mem_regions[addr].data, totalSize, (uint64_t)&unbuf_mem_regions[addr]);
     362             :         }
     363             : 
     364         170 :     }
     365         303 :     prom_send_done.set_value(status);
     366         303 :     return status;
     367             : 
     368             : }
     369             : 
     370             : /**
     371             :  * @brief Function run by the eventloop thread to establish send connection
     372             :  *
     373             :  * This function will either create a new send_socket and atmept to connect it or just check the current state of this connection.
     374             :  * If the connection is up or there is an error, it will notify the calling user thread by returning the corresponding error code via the std::promise.
     375             :  * If the connection is still about to be established, the signal is fired again and the state si checked again in the new function call.
     376             :  *
     377             :  *
     378             :  * @param fid: FID
     379             :  * @param prom_sub_done: promise to notify user thread when subscription is done
     380             :  */
     381         641 : void FelixClient::establish_send_connection(uint64_t fid, std::promise<uint8_t> prom_sub_done){
     382         641 :     if (contextHandler->getType(fid) == BUFFERED) {
     383         347 :         netio_buffered_send_socket* buffered_socket = get_buffered_send_socket(fid);
     384         347 :         clog_debug("Buffered socket = 0x%p with state: %d", buffered_socket, contextHandler->getSocketState(fid));
     385         347 :         if(contextHandler->getSocketState(fid) == CONNECTING || contextHandler->getSocketState(fid) == CONNECTED){
     386         347 :             prom_sub_done.set_value(0);
     387         347 :             return;
     388             :         } else {
     389           0 :             clog_error("Buffered socket is disconnected. Evloop thread: 0x%x FID: 0x%x state is %d", std::this_thread::get_id(), fid, contextHandler->getSocketState(fid));
     390           0 :             std::exception_ptr e = std::make_exception_ptr(FelixClientSendConnectionException());
     391           0 :             prom_sub_done.set_exception(e);
     392           0 :             return;
     393           0 :         }
     394             :     } else {
     395         294 :         netio_send_socket* unbuffered_socket = get_unbuffered_send_socket(fid);
     396         294 :         clog_debug("Unbuffered socket = 0x%x with state: %d", unbuffered_socket, contextHandler->getSocketState(fid));
     397         294 :         if(contextHandler->getSocketState(fid) == CONNECTING || contextHandler->getSocketState(fid) == CONNECTED){
     398         294 :             prom_sub_done.set_value(0);
     399         294 :             return;
     400             :         } else {
     401           0 :             clog_error("Unbuffered socket is disconnected. Evloop thread: 0x%x FID: 0x%x", std::this_thread::get_id(), fid);
     402           0 :             std::exception_ptr e = std::make_exception_ptr(FelixClientSendConnectionException());
     403           0 :             prom_sub_done.set_exception(e);
     404           0 :             return;
     405           0 :         }
     406             :     }
     407             : 
     408             : }
     409             : 
     410             : 
     411             : /**
     412             :  * @brief Sends a single message (exposed by the API)
     413             :  *
     414             :  * This function might be executed by the user thread when calling the API send_data() or by the eventloop when send_data() is called from within a callback.
     415             :  * First, it is checked which thread is executing this function since this impacts guarantees for thread safety.
     416             :  * If the function is called by the eventloop thread, send_data_intern() is called and then the function returns.
     417             :  * If called by the user thread and if there is no send connection to felix-toflx, the function will attempt to create one.
     418             :  * To avoid data races, this needs to be executed by the eventloop. The corresponding signal is fired and the function is blocked with a future/promise pair until the connection is up and can be used for sending. However, this is only necessary when a new connection needs to be established.
     419             :  * When the connection is available, the send signal is fired and the user thread is again blocked until the send is executed by the eventloop.
     420             :  *
     421             :  * The two future/promise pairs also have a timeout of 2s, respective 5s to avaoid deadlock situations when the eventloop thread is blocked by the user inside a callback.
     422             :  *
     423             :  * @param fid: FID
     424             :  * @param data: pointer to data to send
     425             :  * @param size: size of message
     426             :  * @param flush: for buffered mode: should the buffer be flushed after this message
     427             :  */
     428         611 : void FelixClient::send_data(netio_tag_t fid, const uint8_t* data, size_t size, bool flush) {
     429         611 :     if(terminating_felix_client.load()){
     430           0 :         clog_debug("Already terminating, should not send data");
     431           0 :         return;
     432             :     }
     433             : 
     434         611 :     try{
     435         611 :         contextHandler->createOrUpdateInfo(fid, &bus, false);
     436           2 :     } catch (std::exception& e) {
     437           2 :         clog_error("Problem while reading bus: %s", e.what());
     438           2 :         throw FelixClientSendConnectionException();
     439           2 :     }
     440             : 
     441         609 :     clog_debug("Address for FID 0x%x; %s:%d", fid, contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid));
     442             : 
     443         609 :     if (!contextHandler->exists(fid)) {
     444           0 :         clog_error("Invalid Bus entry");
     445           0 :         throw FelixClientSendConnectionException();
     446             :     }
     447             : 
     448         609 :     if(evloop_thread_id == std::this_thread::get_id()){
     449           0 :         send_data_intern(fid, data, size, flush);
     450           0 :         return;
     451             :     }
     452             : 
     453             :     //If there is no connection, we have to establish a new one through the evloop
     454         609 :     if (contextHandler->getSocketState(fid) == DISCONNECTED || contextHandler->getSubscriptionState(fid) == TIMEOUT){ //What about CONNECTING?
     455         519 :         std::promise<uint8_t> prom_sub_done;
     456         519 :         std::future<uint8_t> future_sub_done = prom_sub_done.get_future();
     457         519 :         contextHandler->pushSendTask(SendConnectEvent{fid, std::move(prom_sub_done)});
     458         519 :         netio_signal_fire(&send_signal);
     459         519 :         try{
     460         519 :             std::chrono::milliseconds timeout (5000);
     461         519 :             if (future_sub_done.wait_for(timeout)==std::future_status::timeout){
     462           0 :                 clog_error("Timeout to establish send connection expired, stop blocking to prevent deadlock.");
     463           0 :                 throw FelixClientResourceNotAvailableException();
     464             :             }
     465         519 :             future_sub_done.get();
     466           0 :         } catch(std::exception& e){
     467           0 :             clog_info("Exception in evloop thread: %s thread ID: 0x%x FID: 0x%x", e.what(), std::this_thread::get_id(), fid);
     468           0 :             throw;
     469           0 :         }
     470         519 :     }
     471             : 
     472         609 :     if(contextHandler->getSocketState(fid) == CONNECTING){
     473         513 :         if(!contextHandler->waitConnected(fid, 2000)){
     474          48 :             clog_error("Socket is not connected. Instead: %d on thread: 0x%x FID: 0x%x", contextHandler->getSocketState(fid), std::this_thread::get_id(), fid);
     475          48 :             throw FelixClientSendConnectionException();
     476             :         }
     477             :     }
     478             : 
     479         561 :     ToFlxHeader header;
     480         561 :     header.length = size;
     481         561 :     header.reserved = 0;
     482         561 :     header.elink = get_elink(fid);
     483         561 :     int status = NETIO_STATUS_OK;
     484             : 
     485             :     //send
     486         561 :     std::promise<uint8_t> prom_send_done;
     487         561 :     std::future<uint8_t> future_send_done = prom_send_done.get_future();
     488             : 
     489         561 :     contextHandler->pushSendTask(SendDataEvent{fid, header, data, size, flush, std::move(prom_send_done)});
     490         561 :     netio_signal_fire(&send_signal);
     491             : 
     492             :     // wait for promise
     493             : 
     494         561 :     try{
     495         561 :         std::chrono::milliseconds timeout (5000);
     496         561 :         if (future_send_done.wait_for(timeout)==std::future_status::timeout){
     497           0 :             clog_error("Send timeout expired, stop blocking to prevent deadlock."); //No information if send we executed later. Potentially data is corrupted from now on since function will return.
     498           0 :             throw FelixClientException("Send timeout expired");
     499             :         }
     500         561 :         status = future_send_done.get();
     501           0 :     } catch(std::exception& e){
     502           0 :         clog_info("Exception in evloop thread: %s thread ID: 0x%x FID: 0x%x", e.what(), std::this_thread::get_id(), fid);
     503           0 :         throw;
     504           0 :     }
     505             : 
     506         561 :     switch(status) {
     507         561 :         case NETIO_STATUS_OK: return;
     508           0 :         case NETIO_STATUS_TOO_BIG: throw FelixClientMessageTooBigException();
     509           0 :         case NETIO_STATUS_AGAIN: throw FelixClientResourceNotAvailableException();
     510           0 :         case NETIO_STATUS_PARTIAL: throw FelixClientPartiallyCompletedException();
     511           0 :         default: throw FelixClientException("Unknown status code: " + status);
     512             :     }
     513         561 : }
     514             : 
     515             : 
     516             : /**
     517             :  * @brief Sends a vector of multiple messages
     518             :  *
     519             :  * This function provides the same functionality as the previous one for single messages but takes a vector of messages as input and
     520             :  * sends them in a single mesaage to felix-toflx.
     521             :  * Another difference to the single message version is that the toflx header is not already added here but by the send_evloop() function.
     522             :  *
     523             :  * @param fid: FID
     524             :  * @param msgs: std::vector of pointer to all messages to be sent (including the padding)
     525             :  * @param sizes: std::vector of message sizes for the corresponding messages in the previous parameter
     526             :  * @param prom_send_done: promise to notify user thread that the message was sent
     527             :  */
     528         297 : void FelixClient::send_data(netio_tag_t fid, const std::vector<const uint8_t*>& msgs, const std::vector<size_t>& sizes){
     529         297 :     if(terminating_felix_client.load()){
     530           0 :         clog_debug("Already terminating, should not send data");
     531           0 :         return;
     532             :     }
     533             : 
     534         297 :     try{
     535         297 :         contextHandler->createOrUpdateInfo(fid, &bus, false);
     536           0 :     } catch (std::exception& e) {
     537           0 :         clog_error("Problem while reading bus: %s", e.what());
     538           0 :         throw FelixClientSendConnectionException();
     539           0 :     }
     540             : 
     541         297 :     if (!contextHandler->exists(fid)) {
     542           0 :         clog_error("Invalid Bus entry ");
     543           0 :         throw FelixClientSendConnectionException();
     544             :     }
     545             : 
     546         297 :     clog_info("Address %s", contextHandler->getAddress(fid).c_str());
     547         297 :     if (msgs.size() != sizes.size()){
     548           0 :         FelixClientException("Error! Number of messages differs from number of sizes");
     549             :     }
     550             : 
     551         297 :     if(evloop_thread_id == std::this_thread::get_id()){
     552           0 :         send_data_intern(fid, msgs, sizes);
     553           0 :         return;
     554             :     }
     555             : 
     556         297 :     if (contextHandler->getSocketState(fid) == DISCONNECTED || contextHandler->getSubscriptionState(fid) == TIMEOUT){ //What about CONNECTIONG?
     557          16 :         std::promise<uint8_t> prom_sub_done;
     558          16 :         std::future<uint8_t> future_sub_done = prom_sub_done.get_future();
     559          16 :         contextHandler->pushSendTask(SendConnectEvent{fid, std::move(prom_sub_done)});
     560          16 :         netio_signal_fire(&send_signal);
     561          16 :         try{
     562          16 :             std::chrono::milliseconds timeout (5000);
     563          16 :             if (future_sub_done.wait_for(timeout)==std::future_status::timeout){
     564           0 :                 clog_error("Timeout to establish send connection expired, stop blocking to prevent deadlock.");
     565           0 :                 throw FelixClientResourceNotAvailableException();
     566             :             }
     567          16 :             future_sub_done.get();
     568           0 :         } catch(std::exception& e){
     569           0 :             clog_info("Exception in evloop thread: %s thread ID: 0x%x FID: 0x%x", e.what(), std::this_thread::get_id(), fid);
     570           0 :             throw;
     571           0 :         }
     572          16 :     }
     573             : 
     574         297 :     if(contextHandler->getSocketState(fid) == CONNECTING){
     575          16 :         if(!contextHandler->waitConnected(fid, 2000)){
     576           0 :             clog_error("Socket is not connected. Instead: %d on thread: 0x%x FID: 0x%x", contextHandler->getSocketState(fid), std::this_thread::get_id(), fid);
     577           0 :             throw FelixClientSendConnectionException();
     578             :         }
     579             :     }
     580             : 
     581         297 :     std::promise<uint8_t> prom_send_done;
     582         297 :     std::future<uint8_t> future_send_done = prom_send_done.get_future();
     583         297 :     contextHandler->pushSendTask(SendDataVectorEvent{fid, msgs, sizes, std::move(prom_send_done)});
     584         297 :     netio_signal_fire(&send_signal);
     585             : 
     586         297 :     int status = NETIO_STATUS_OK;
     587             :     // wait for promise
     588             : 
     589         297 :     try{
     590         297 :         std::chrono::milliseconds timeout (5000);
     591         297 :         if (future_send_done.wait_for(timeout)==std::future_status::timeout){
     592           0 :             clog_error("Send timeout expired, stop blocking to prevent deadlock."); //No information if send we executed later. Potentially data is corrupted from now on since function will return.
     593           0 :             throw FelixClientException("Send timeout expired");
     594             :         }
     595         297 :         status = future_send_done.get();
     596           0 :     } catch(std::exception& e){
     597           0 :         clog_info("Exception in evloop thread: %s thread ID: 0x%x FID: 0x%x", e.what(), std::this_thread::get_id(), fid);
     598           0 :         throw;
     599           0 :     }
     600             : 
     601         297 :     switch(status) {
     602         297 :     case NETIO_STATUS_OK: return;
     603           0 :     case NETIO_STATUS_TOO_BIG: throw FelixClientMessageTooBigException();
     604           0 :     case NETIO_STATUS_AGAIN: throw FelixClientResourceNotAvailableException();
     605           0 :     case NETIO_STATUS_PARTIAL: throw FelixClientPartiallyCompletedException();
     606           0 :     default: throw FelixClientException("Unknown status code: " + status);
     607             :     }
     608         297 : }
     609             : 
     610             : /**
     611             :  * @brief Sends a single message and is guaranteed to be non-blocking (exposed by the API)
     612             :  *
     613             :  * This functionality was requested by the DCS team who need send_data() to be guranteed non-blocking to avaoid deadlocks in the OPC/SCA server.
     614             :  * The function behaves similar to the regular send_data() with two main differences:
     615             :  *  1) It will not attempt to establish a send connection if it is not already up. This is now in the responsability of the user since this cannot be done in a non=blocking way.
     616             :  *  2)  The data is copied before the function returns and it does not provide any information if the send was successful or not.
     617             :  *      The copy is needed because the user would not now afterwards when it is save to reuse the memory.
     618             :  *
     619             :  * @param fid: FID
     620             :  * @param data: pointer to data to send
     621             :  * @param size: size of message
     622             :  * @param flush: for buffered mode: should the buffer be flushed after this message
     623             :  */
     624          32 : void FelixClient::send_data_nb(netio_tag_t fid, const uint8_t* data, size_t size, bool flush) {
     625          32 :     if(terminating_felix_client.load()){
     626           0 :         clog_debug("Already terminating, should not send data");
     627           0 :         return;
     628             :     }
     629             : 
     630          32 :     if (!contextHandler->exists(fid) || contextHandler->getSocketState(fid) != CONNECTED) {
     631           0 :         clog_error("No corresponding send connection for this FID. Have you tried init_send_data?");
     632           0 :         throw FelixClientSendConnectionException();
     633             :     }
     634             : 
     635          32 :     std::vector<uint8_t> data_copy(data, data+size);
     636             : 
     637          32 :     ToFlxHeader header;
     638          32 :     header.length = size;
     639          32 :     header.reserved = 0;
     640          32 :     header.elink = get_elink(fid);
     641             : 
     642             :     //send
     643          32 :     std::promise<uint8_t> prom_send_done;
     644             : 
     645          32 :     contextHandler->pushSendTask(SendDataNbEvent{fid, header, std::move(data_copy), size, flush, std::move(prom_send_done)});
     646          32 :     netio_signal_fire(&send_signal);
     647          32 : }
     648             : 
     649             : /**
     650             :  * @brief Sends a vector of multiple messages which is guaranteed to be non-blocking (exposed by the API)
     651             :  *
     652             :  * Analog to the other send_data() implementations, this function provides the same functionality for a vector of messages instead of a singel message.
     653             :  * The main differences compared to the single message version is that it will not attempt to establish a send connection if it is not already up. This is now in the responsability of the user since this cannot be done in a non-blocking way.
     654             :  * And of course it does not provide any information to the user if the send was successful or not.
     655             :  *
     656             :  * @param fid: FID
     657             :  * @param msgs: std::vector of pointer to all messages to be sent (including the padding)
     658             :  * @param sizes: std::vector of message sizes for the corresponding messages in the previous parameter
     659             :  * @param prom_send_done: promise to notify user thread that the message was sent
     660             :  */
     661           6 : void FelixClient::send_data_nb(netio_tag_t fid, const std::vector<const uint8_t*>& msgs, const std::vector<size_t>& sizes){
     662           6 :     if(terminating_felix_client.load()){
     663           0 :         clog_debug("Already terminating, should not send data");
     664           0 :         return;
     665             :     }
     666             : 
     667           6 :     if (!contextHandler->exists(fid) || contextHandler->getSocketState(fid) != CONNECTED) {
     668           0 :         clog_error("No corresponding send connection for this FID. Have you tried init_send_data?");
     669           0 :         throw FelixClientSendConnectionException();
     670             :     }
     671             : 
     672           6 :     if (msgs.size() != sizes.size()){
     673           0 :         FelixClientException("Error! Number of messages differs from number of sizes");
     674             :     }
     675             : 
     676           6 :     std::vector<std::vector<uint8_t>> data;
     677           6 :     std::vector<const uint8_t*> msgs_copy;
     678          22 :     for (size_t i = 0; i < msgs.size(); i++){
     679          16 :         data.emplace_back(msgs[i], msgs[i]+sizes[i]);
     680          16 :         msgs_copy.push_back(&data[i][0]);
     681             :     }
     682             : 
     683           6 :     std::vector<uint64_t> size_copy(sizes);
     684             : 
     685           6 :     std::promise<uint8_t> prom_send_done;
     686           6 :     contextHandler->pushSendTask(SendDataVectorNbEvent{fid, std::move(data), std::move(msgs_copy), std::move(size_copy), std::move(prom_send_done)});
     687           6 :     netio_signal_fire(&send_signal);
     688           6 : }
     689             : 
     690             : 
     691             : /**
     692             :  * @brief Send_data() function that must be used by the eventloop thread
     693             :  *
     694             :  * Similar to the non-blocking send, the send_data() function call that is executed by the eventloop thread has to be non-blocking.
     695             :  * But there is no need to a timely return, so this function will still try to establish a connection and then call itself again through the eventloop until the connection is up. The std::pormise that is passed the establish_send_connection() and send_evloop() funcitons are just dummy objects htat are not actually checked.
     696             :  * Also, it exploits the fact that it is executed by the eventloop and there is no need for thread safety measuremnts.
     697             :  * So, send_eventloop() is directly called form this function without firing another signal.
     698             :  *
     699             :  * @param fid: FID
     700             :  * @param data: pointer to data to send
     701             :  * @param size: size of message
     702             :  * @param flush: for buffered mode: should the buffer be flushed after this message
     703             :  */
     704           0 : void FelixClient::send_data_intern(netio_tag_t fid, const uint8_t* data, size_t size, bool flush) {
     705             : 
     706           0 :     ToFlxHeader header;
     707           0 :     header.length = size;
     708           0 :     header.reserved = 0;
     709           0 :     header.elink = get_elink(fid);
     710           0 :     int status = NETIO_STATUS_OK;
     711           0 :     std::promise<uint8_t> dummy_prom_sub;
     712           0 :     std::promise<uint8_t> dummy_prom_send;
     713             : 
     714           0 :     if((contextHandler->getSocketState(fid) != CONNECTING) || (contextHandler->getSocketState(fid) != CONNECTED)){
     715           0 :         try{
     716           0 :             establish_send_connection(fid, std::move(dummy_prom_sub));
     717           0 :         } catch(std::exception& e){
     718           0 :             clog_info("Exception while establishing connection: %s thread ID: 0x%x FID: 0x%x", e.what(), std::this_thread::get_id(), fid);
     719           0 :             throw;
     720           0 :         }
     721             :     }
     722             : 
     723           0 :     if(contextHandler->getSocketState(fid) == CONNECTING){
     724           0 :         contextHandler->pushSendTask(SendDataInternEvent{fid, header, data, size, flush, std::move(dummy_prom_send),1});
     725           0 :         netio_signal_fire(&send_signal);
     726             :         return;
     727             :     }
     728             : 
     729           0 :     try{
     730           0 :         status = send_evloop(fid, header, data, size, flush, std::move(dummy_prom_send));
     731           0 :     } catch(std::exception& e){
     732           0 :         clog_info("Exception while sending: %s thread ID: 0x%x FID: 0x%x", e.what(), std::this_thread::get_id(), fid);
     733           0 :         throw;
     734           0 :     }
     735             : 
     736           0 :     switch(status) {
     737             :         case NETIO_STATUS_OK: return;
     738           0 :         case NETIO_STATUS_TOO_BIG: throw FelixClientMessageTooBigException();
     739           0 :         case NETIO_STATUS_AGAIN: throw FelixClientResourceNotAvailableException();
     740           0 :         case NETIO_STATUS_PARTIAL: throw FelixClientPartiallyCompletedException();
     741           0 :         default: throw FelixClientException("Unknown status code: " + status);
     742             :     }
     743           0 : }
     744             : 
     745             : 
     746             : /**
     747             :  * @brief Send_data() function for vector of messages that must be used by the eventloop thread
     748             :  *
     749             :  * The same changes described in the single message version with respect to the regular send_data() function apply here as well.
     750             :  *
     751             :  * @param fid: FID
     752             :  * @param msgs: std::vector of pointer to all messages to be sent (including the padding)
     753             :  * @param sizes: std::vector of message sizes for the corresponding messages in the previous parameter
     754             :  * @param prom_send_done: promise to notify user thread that the message was sent
     755             :  */
     756           0 : void FelixClient::send_data_intern(netio_tag_t fid, const std::vector<const uint8_t*>& msgs, const std::vector<size_t>& sizes) {
     757             : 
     758           0 :     int status = NETIO_STATUS_OK;
     759           0 :     std::promise<uint8_t> dummy_prom_sub;
     760           0 :     std::promise<uint8_t> dummy_prom_send;
     761             : 
     762           0 :     if((contextHandler->getSocketState(fid) != CONNECTING) || (contextHandler->getSocketState(fid) != CONNECTED)){
     763           0 :         try{
     764           0 :             establish_send_connection(fid, std::move(dummy_prom_sub));
     765           0 :         } catch(std::exception& e){
     766           0 :             clog_info("Exception while establishing connection: %s thread ID: 0x%x FID: 0x%x", e.what(), std::this_thread::get_id(), fid);
     767           0 :             throw;
     768           0 :         }
     769             :     }
     770             : 
     771           0 :     if(contextHandler->getSocketState(fid) == CONNECTING){
     772           0 :         contextHandler->pushSendTask(SendDataInternVectorEvent{fid, msgs, sizes, std::move(dummy_prom_send), 1});
     773           0 :         netio_signal_fire(&send_signal);
     774             :         return;
     775             :     }
     776             : 
     777           0 :     try{
     778           0 :         status = send_evloop(fid, msgs, sizes, std::move(dummy_prom_send));
     779           0 :     } catch(std::exception& e){
     780           0 :         clog_info("Exception while sending: %s thread ID: 0x%x FID: 0x%x", e.what(), std::this_thread::get_id(), fid);
     781           0 :         throw;
     782           0 :     }
     783             : 
     784           0 :     switch(status) {
     785             :         case NETIO_STATUS_OK: return;
     786           0 :         case NETIO_STATUS_TOO_BIG: throw FelixClientMessageTooBigException();
     787           0 :         case NETIO_STATUS_AGAIN: throw FelixClientResourceNotAvailableException();
     788           0 :         case NETIO_STATUS_PARTIAL: throw FelixClientPartiallyCompletedException();
     789           0 :         default: throw FelixClientException("Unknown status code: " + status);
     790             :     }
     791           0 : }
     792             : 
     793             : /**
     794             :  * @brief Generic callback for all send related function calls
     795             :  *
     796             :  * This design allows to reuse the same signal (which must have a single callback) for all send related functions.
     797             :  * The function pops the next task from the send_task_queue in the contextHandler and decides based on the object type which function to call and which parameter to pass.
     798             :  *
     799             :  */
     800        1471 : void FelixClient::exec_send(){
     801        1471 :     send_var_t task;
     802        1471 :     contextHandler->pullSendTask(task);
     803             : 
     804        1471 :     std::visit(overloaded {
     805           0 :         [](std::monostate const&) {
     806           0 :             clog_info("Nothing to send. Send task queue is empty.");
     807             :         },
     808         561 :         [this](SendDataEvent& sendData) {
     809         561 :             this->send_evloop(sendData.fid, sendData.header, sendData.data, sendData.size, sendData.flush, std::move(sendData.prom_send_done));
     810         561 :         },
     811          32 :         [this](SendDataNbEvent& sendNbData) {
     812          32 :             this->send_evloop(sendNbData.fid, sendNbData.header, &sendNbData.data[0], sendNbData.size, sendNbData.flush, std::move(sendNbData.prom_send_done));
     813          32 :         },
     814         297 :         [this](SendDataVectorEvent& sendDataVector) {
     815         297 :             this->send_evloop(sendDataVector.fid, sendDataVector.msgs, sendDataVector.sizes, std::move(sendDataVector.prom_send_done));
     816         297 :         },
     817           6 :         [this](SendDataVectorNbEvent& sendDataNbVector) {
     818           6 :             this->send_evloop(sendDataNbVector.fid, sendDataNbVector.msgs, sendDataNbVector.sizes, std::move(sendDataNbVector.prom_send_done));
     819           6 :         },
     820         575 :         [this](SendConnectEvent& sendConnect) {
     821         575 :             this->establish_send_connection(sendConnect.fid, std::move(sendConnect.prom_sub_done));
     822         575 :         },
     823           0 :         [this](SendDataInternEvent& sendData) {
     824           0 :             if(contextHandler->getSocketState(sendData.fid) == CONNECTED){
     825           0 :                 this->send_evloop(sendData.fid, sendData.header, sendData.data, sendData.size, sendData.flush, std::move(sendData.prom_send_done));
     826             :             }else{
     827           0 :                 if(sendData.count< maxSendAttempts){
     828           0 :                     contextHandler->pushSendTask(SendDataInternEvent{sendData.fid, sendData.header, sendData.data, sendData.size, sendData.flush, std::move(sendData.prom_send_done), ++sendData.count});
     829             :                 }
     830             :             }
     831           0 :         },
     832           0 :         [this](SendDataInternVectorEvent& sendDataVector) {
     833           0 :             if(contextHandler->getSocketState(sendDataVector.fid) == CONNECTED){
     834           0 :                 this->send_evloop(sendDataVector.fid, sendDataVector.msgs, sendDataVector.sizes, std::move(sendDataVector.prom_send_done));
     835             :             }else{
     836           0 :                 if(sendDataVector.count < maxSendAttempts){
     837           0 :                     contextHandler->pushSendTask(SendDataInternVectorEvent{sendDataVector.fid, sendDataVector.msgs, sendDataVector.sizes, std::move(sendDataVector.prom_send_done), ++sendDataVector.count});
     838             :                 }
     839             :             }
     840           0 :         },
     841             :     }, task);
     842        1471 : }
     843             : 
     844             : 
     845             : 
     846           0 : void FelixClient::send_data(netio_tag_t fid, struct iovec* iov, unsigned n, bool flush) {
     847           0 :     clog_info("send_msg(netio_tag_t fid, struct iovec* iov, unsigned n): not implemented");
     848           0 :     throw FelixClientSendConnectionException();
     849             : }
     850             : 
     851             : /**
     852             :  * @brief Generic callback for all subscribe related function calls
     853             :  *
     854             :  * This design allows to reuse the same signal (which must have a single callback) for all subscribe related functions.
     855             :  * The function pops the next task from the sub_task_queue in the contextHandler and decides based on the object type which function to call and which parameter to pass.
     856             :  *
     857             :  */
     858        4287 : void FelixClient::exec_sub(){
     859        4287 :     sub_var_t task;
     860        4287 :     contextHandler->pullSubTask(task);
     861             : 
     862        4287 :     std::visit(overloaded {
     863           0 :         [](std::monostate const&) {
     864           0 :             clog_info("Nothing to send. Send task queue is empty.");
     865             :         },
     866        2118 :         [this](UnsubEvent& unsub) {
     867        2118 :             this->send_unsubscribe_msg(unsub.fid, std::move(unsub.prom_unsub_done));
     868        2118 :         },
     869        2169 :         [this](SubscribeEvent const& sub) {
     870        2169 :             this->subscribe_evloop(sub.fid);
     871             :         },
     872             :     }, task);
     873        4287 : }
     874             : 
     875             : /**
     876             :  * @brief Initializes a new send connection
     877             :  *
     878             :  * This function ust be called before using the send_data_nb() functions since they will not establish a connection on the first function call.
     879             :  * This function will not block until the connection is up.
     880             :  *
     881             :  * TODO: This function should propably be blocking.
     882             :  *
     883             :  * @param fid: FID
     884             :  */
     885          40 : void FelixClient::init_send_data(netio_tag_t fid) {
     886          40 :     clog_info("init_send_data for FID 0x%x", fid);
     887          40 :     if(terminating_felix_client.load()){
     888           0 :         clog_debug("Already terminating, should not send data");
     889           0 :         return;
     890             :     }
     891             : 
     892          40 :     try{
     893          40 :         contextHandler->createOrUpdateInfo(fid, &bus, false);
     894           0 :     } catch (std::exception& e) {
     895           0 :         clog_error("Problem while reading bus: %s", e.what());
     896           0 :         throw FelixClientSendConnectionException();
     897           0 :     }
     898             : 
     899          40 :     clog_debug("Address for FID 0x%x; %s:%d", fid, contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid));
     900             : 
     901          40 :     if (!contextHandler->exists(fid)) {
     902           0 :         clog_error("Invalid Bus entry");
     903           0 :         throw FelixClientSendConnectionException();
     904             :     }
     905             : 
     906             :     //If there is no connection, we have to establish a new one through the evloop
     907          40 :     if (contextHandler->getSocketState(fid) == DISCONNECTED || contextHandler->getSubscriptionState(fid) == TIMEOUT){ //What about CONNECTING?
     908          40 :         std::promise<uint8_t> prom_sub_done;
     909          40 :         contextHandler->pushSendTask(SendConnectEvent{fid, std::move(prom_sub_done)});
     910          40 :         netio_signal_fire(&send_signal);
     911          40 :     }
     912             : 
     913             :     return;
     914             : }
     915             : 
     916         392 : void FelixClient::init_subscribe(netio_tag_t fid) {
     917         392 :     clog_info("init_subscribe not implemented, subscribe will do the job");
     918         392 :     return;
     919             : }
     920             : 
     921        1471 : void FelixClient::on_send_signal(void* ptr){
     922        1471 :     clog_trace("On send signal");
     923        1471 :     static_cast<FelixClient*>(ptr)->exec_send();
     924        1471 : }
     925             : 
     926        4287 : void FelixClient::on_sub_signal(void* ptr){
     927        4287 :     clog_debug("On sub signal");
     928        4287 :     static_cast<FelixClient*>(ptr)->exec_sub();
     929        4287 : }
     930             : 
     931             : /**
     932             :  * @brief Subscribes felix-client to a vector of FIDs
     933             :  *
     934             :  * This function performs a number of checks on the right conditions to subscribe and then forwards this tsk to the eventloop thread.
     935             :  *
     936             :  * If no timeout is specified or if it is equal to 0, this function will not block and return immediately.
     937             :  * Otherwise the function will block until either all subscriptino were successful or the timeout expires.
     938             :  * Note that it will also return FELIX_CLIENT_STATUS_TIMEOUT if just a single subscription was not successful.
     939             :  *
     940             :  * Also, if subscribe() is called by the eventloop thread (for instance from within a callback), all functions must be non-blocking.
     941             :  * Hence, the function will return FELIX_CLIENT_STATUS_TIMEOUT right away, if a timeout was specified, since the thread cannot be blocked and process the subscription at the same time.
     942             :  * If the function is called by the user thread, the task is passed via a subscribe signal to the eventloop thread to avaoid race conditions.
     943             :  *
     944             :  * @param fids: vector of FIDs to subscribe
     945             :  * @param timeoutms: timeout in ms in which the subscription needs to be successful.
     946             :  * @param for_register: indicates if subscription is for send_cmd function.
     947             :  */
     948         651 : int FelixClient::subscribe(const std::vector<netio_tag_t>& fids, uint timeoutms, bool for_register) {
     949         651 :     std::vector<netio_tag_t> all_fids;
     950         651 :     clog_debug("Subscribing to %d FIDs", fids.size());
     951         651 :     uint8_t alreadySubscribedCounter = 0;
     952        2858 :     for(auto& fid : fids){
     953             :         // Already subscribed
     954        2207 :         if (!contextHandler->canSubscribe(fid)) {
     955          28 :             clog_info("Already subscribed to 0x%x", fid);
     956          28 :             alreadySubscribedCounter++;
     957          28 :             continue;
     958             :         }
     959        2179 :         if (fid & 0x8000){
     960           0 :             clog_info("Can't subscribe to toflx fid: 0x%x", fid);
     961           0 :             continue;
     962             :         }
     963             : 
     964        2179 :         try{
     965        2179 :             contextHandler->createOrUpdateInfo(fid, &bus, for_register);
     966        2169 :             struct timespec t0;
     967        2169 :             clock_gettime(CLOCK_MONOTONIC_RAW, &t0);
     968        4338 :             contextHandler->setSubscriptionTimes(fid, "subscribe", t0);
     969        2169 :             contextHandler->setSubscriptionState(fid, SUBING);
     970          10 :         } catch(...){
     971          10 :             continue;
     972          10 :         }
     973        2169 :         all_fids.push_back(fid);
     974        2169 :         contextHandler->addToFidsToResubscribe(fid);
     975             :     }
     976         651 :     if (fids.size() == alreadySubscribedCounter){
     977             :         return FELIX_CLIENT_STATUS_ALREADY_DONE;
     978             :     }
     979             : 
     980         643 :     if(evloop_thread_id == std::this_thread::get_id()){
     981           0 :         for (auto& fid : all_fids){
     982           0 :             if (!subscribe_evloop(fid)){
     983           0 :                 if (timeoutms == 0) {
     984         651 :                     return FELIX_CLIENT_STATUS_OK;
     985             :                 }
     986             :                 return FELIX_CLIENT_STATUS_TIMEOUT;
     987             :             }
     988             :         }
     989             :     } else {
     990        2812 :         for (auto& fid : all_fids){
     991        2169 :             contextHandler->pushSubTask(SubscribeEvent{fid});
     992        2169 :             netio_signal_fire(&sub_signal);
     993             :         }
     994             :     }
     995             : 
     996         643 :     if (timeoutms == 0) {
     997             :        return FELIX_CLIENT_STATUS_OK;
     998             :     }
     999         436 :     std::vector<netio_tag_t> not_subscribed_fids;
    1000             : 
    1001         436 :     if(!contextHandler->isSubscribed(all_fids, timeoutms)){
    1002        1160 :         for (auto& fid : all_fids){
    1003        1148 :             if(!contextHandler->isSubscribed(fid)){
    1004           0 :                 not_subscribed_fids.push_back(fid);
    1005           0 :                 contextHandler->setSubscriptionState(fid, TIMEOUT);
    1006             :             }
    1007             :         }
    1008             :     }
    1009             : 
    1010         436 :     if (not_subscribed_fids.empty()) {
    1011         436 :         if(fids.size() > all_fids.size() + alreadySubscribedCounter){
    1012             :             return FELIX_CLIENT_STATUS_TIMEOUT;
    1013             :         }
    1014         426 :         return FELIX_CLIENT_STATUS_OK;
    1015             :     }
    1016             :     return FELIX_CLIENT_STATUS_TIMEOUT;
    1017             : 
    1018         651 : }
    1019             : 
    1020             : /**
    1021             :  * @brief Subscribes a single FIDs
    1022             :  *
    1023             :  * The function creates a vector with one element and calls the other subscribe function that takes a vector of FIDs as input to avoid code duplication.
    1024             :  *
    1025             :  * @param fid: FID
    1026             :  * @param timeoutms: timeout in ms in which the subscription needs to be successful.
    1027             :  * @param for_register: indicates if subscription is for send_cmd function.
    1028             :  */
    1029         589 : int FelixClient::subscribe(netio_tag_t fid, uint timeoutms, bool for_register) {
    1030         589 :     if(terminating_felix_client.load()){
    1031           0 :         clog_debug("Already terminating, should not subscribe anymore");
    1032           0 :         return FELIX_CLIENT_STATUS_OK;
    1033             :     }
    1034         589 :     std::vector<uint64_t> fids{fid};
    1035         589 :     return subscribe(fids, timeoutms, for_register);
    1036         589 : }
    1037             : 
    1038             : /**
    1039             :  * @brief Actually subscribes a FID (executed by the eventloop)
    1040             :  *
    1041             :  * Since the subscription process includes sending a subscription message to the felix-tohost application, this function needs to be executed by the eventloop as well.
    1042             :  * In addition to the actual subscription process, the time it takes to establish the connection is also measured.
    1043             :  * If the subscription message cannot be sent because of NETIO_STATUS_AGAIN, the function will call itself.
    1044             :  * At the end the current status is checked. Since there is no netio callback for establsihed connections but felix-client
    1045             :  * triggers the user on_connection_establshed for every FID, there are basically two cases:
    1046             :  * 1) The connection is already up. In this case we assume that the subscription will be successful and call the on_connection_establshed callback within this function.
    1047             :  * 2) The connection has to be establshed first, then the netio on_connection_estalished callback will trigger the user callback for all subscribed FIDs.
    1048             :  *
    1049             :  * @param fid: FID
    1050             :  */
    1051        2278 : bool FelixClient::subscribe_evloop(netio_tag_t fid) {
    1052        2278 :     try{
    1053        2278 :         contextHandler->createOrUpdateInfo(fid, &bus);
    1054        2274 :         struct timespec t1;
    1055        2274 :         clock_gettime(CLOCK_MONOTONIC_RAW, &t1);
    1056        4548 :         contextHandler->setSubscriptionTimes(fid, "subscribe_evloop", t1);
    1057           4 :     } catch(std::exception& e){
    1058           4 :         return false;
    1059           4 :     }
    1060        2274 :     int ret;
    1061        2274 :     clog_trace("type (0=buffered, 1=unbuffered) %d", contextHandler->getType(fid));
    1062             : 
    1063        2274 :     if (contextHandler->getType(fid) == BUFFERED) {
    1064        1185 :         netio_subscribe_socket* buffered_socket = get_buffered_subscribe_socket(fid);
    1065        1185 :         clog_debug("(Re-)subscribing (buffered) for fid 0x%x and socket 0x%p", fid, buffered_socket);
    1066        1185 :         ret = netio_subscribe(buffered_socket, fid);
    1067        1185 :         clog_debug("(Re-)subscribing (buffered) for 0x%x with send socket has been done", fid);
    1068             : 
    1069             :     } else {
    1070        1089 :         netio_unbuffered_subscribe_socket* unbuffered_socket = get_unbuffered_subscribe_socket(fid);
    1071        1089 :         clog_debug("(Re-)subscribing (unbuffered) for 0x%x and socket 0x%p", fid, unbuffered_socket);
    1072        1089 :         clog_debug("Subscribed to unbuffered subscribe socket. Netio_page_size: %d netio pages: %d", unbuffered_socket->recv_socket.attr.buffer_size, unbuffered_socket->recv_socket.attr.num_buffers);
    1073        1089 :         ret = netio_unbuffered_subscribe(unbuffered_socket, fid);
    1074             :     }
    1075        2274 :     if(ret == NETIO_STATUS_AGAIN){
    1076           0 :         contextHandler->updateFidsWhenUnsub(fid);
    1077           0 :         contextHandler->pushSubTask(SubscribeEvent{fid});
    1078           0 :         netio_signal_fire(&sub_signal);
    1079           0 :         return false;
    1080        2274 :     } else if(ret == NETIO_STATUS_ERROR){
    1081             :         return false;
    1082             :     }
    1083        2274 :     if(contextHandler->getSocketState(fid) != CONNECTED && contextHandler->getSocketState(fid) != CONNECTING){
    1084           0 :         contextHandler->setSocketState(fid, CONNECTING);
    1085             :     }
    1086        2274 :     if(contextHandler->getSocketState(fid) == CONNECTED){
    1087         392 :         contextHandler->setSubscriptionState(fid, SUB);
    1088         392 :         contextHandler->removeFromFidsToResubscribe(fid);
    1089         392 :         struct timespec t2;
    1090         392 :         clock_gettime(CLOCK_MONOTONIC_RAW, &t2);
    1091         392 :         std::unordered_map<std::string, timespec> subscriptionTimes = contextHandler->getSubscriptionTimes(fid);
    1092             : 
    1093         784 :         double subscribe_useconds = (t2.tv_sec - subscriptionTimes["subscribe"].tv_sec)*1e6
    1094         784 :                                     + (t2.tv_nsec - subscriptionTimes["subscribe"].tv_nsec)*1e-3;
    1095         392 :         clog_debug("Beginning of subscribe() until on_connect for fid 0x%x took %fus", fid, subscribe_useconds);
    1096             : 
    1097         784 :         double subscribe_evloop_useconds = (t2.tv_sec - subscriptionTimes["subscribe_evloop"].tv_sec)*1e6
    1098         784 :                                             + (t2.tv_nsec - subscriptionTimes["subscribe_evloop"].tv_nsec)*1e-3;
    1099         392 :         clog_debug("Beginning of subscribe_evloop() until on_connect for fid 0x%x took %fus", fid, subscribe_evloop_useconds);
    1100         392 :         if (cb_on_connect) {
    1101         392 :             (cb_on_connect)(fid);
    1102             :         }
    1103         392 :     }
    1104             :     return true;
    1105             : }
    1106             : 
    1107             : /**
    1108             :  * @brief Sends the actual unsubscribe message (executed by the eventloop thread)
    1109             :  *
    1110             :  * Analog to the other sending functions, sending an unsubscribe message has also to go through the eventloop.
    1111             :  *
    1112             :  * @param fid: FID
    1113             :  * @param prom_unsub_done: std::promise to notify user thread that the unsubscribe was successful
    1114             :  */
    1115        2167 : void FelixClient::send_unsubscribe_msg(uint64_t fid, std::promise<uint8_t> prom_unsub_done){
    1116        2167 :     void* socket;
    1117        2167 :     int ret = 0;
    1118        2167 :     if (contextHandler->getType(fid) == BUFFERED) {
    1119        1126 :         struct netio_subscribe_socket* buffered_socket;
    1120        1126 :         buffered_socket = contextHandler->getSocket<netio_subscribe_socket*>(fid);
    1121        1126 :         socket = static_cast<void*>(buffered_socket);
    1122        1126 :         if(!contextHandler->exists(socket) || contextHandler->getSubscriptionState(fid) == UNSUB){prom_unsub_done.set_value(1);return;}
    1123        1126 :         clog_info("Unsubscribing (buffered) for 0x%x from %s and socket 0x%p", fid, get_address(buffered_socket).c_str(), buffered_socket);
    1124        1126 :         ret = netio_unsubscribe(buffered_socket, fid);
    1125             :     } else {
    1126        1041 :         struct netio_unbuffered_subscribe_socket* unbuffered_socket;
    1127        1041 :         unbuffered_socket = contextHandler->getSocket<netio_unbuffered_subscribe_socket*>(fid);
    1128        1041 :         socket = static_cast<void*>(unbuffered_socket);
    1129        1041 :         if(!contextHandler->exists(socket) || contextHandler->getSubscriptionState(fid) == UNSUB){prom_unsub_done.set_value(1);return;}
    1130        1041 :         clog_info("Unsubscribing (unbuffered) for 0x%x from %s", fid, get_address(unbuffered_socket).c_str());
    1131        1041 :         ret = netio_unbuffered_unsubscribe(unbuffered_socket, fid);
    1132             :     }
    1133        2167 :     if(ret == NETIO_STATUS_AGAIN){
    1134           0 :         contextHandler->pushSubTask(UnsubEvent{fid, std::move(prom_unsub_done)});
    1135           0 :         netio_signal_fire(&sub_signal);
    1136           0 :         return;
    1137        2167 :     } else if(ret == NETIO_STATUS_ERROR){
    1138           0 :         prom_unsub_done.set_value(2);
    1139             :     }
    1140        2167 :     contextHandler->updateFidsWhenUnsub(fid);
    1141        2167 :     contextHandler->setSubscriptionState(fid, UNSUB);
    1142        2167 :     if(contextHandler->areAllFidsUnsubscribed(socket)){
    1143        1923 :         contextHandler->setSocketToAllUnsubscribed(socket);
    1144             :     }
    1145        2167 :     prom_unsub_done.set_value(0);
    1146             : }
    1147             : 
    1148             : /**
    1149             :  * @brief Unsubscribe function that is exposed to the user via felix-client-thread
    1150             :  *
    1151             :  * If the function is called by the eventloop thread, it will send the unsubscribe message directly, otherwise it will trigger
    1152             :  * the subscribe signal for the eventloop to execute this task.
    1153             :  * In the latter case, the function will block for up to 5s for the unsubscribe to be performed.
    1154             :  *
    1155             :  * @param fid: FID
    1156             :  */
    1157        2520 : int FelixClient::unsubscribe(netio_tag_t fid) {
    1158        2520 :     clog_info("Unsubscribing from 0x%x", fid);
    1159        2520 :     if (fid & 0x8000){
    1160             :         return FELIX_CLIENT_STATUS_OK;
    1161             :     }
    1162             : 
    1163        2118 :     if(evloop_thread_id == std::this_thread::get_id()){
    1164           0 :         send_unsubscribe_msg(fid);
    1165           0 :         return FELIX_CLIENT_STATUS_OK;
    1166             :     }
    1167        2118 :     if(!contextHandler->exists(fid)){
    1168             :         return FELIX_CLIENT_STATUS_OK;
    1169             :     }
    1170             : 
    1171        2118 :     std::promise<uint8_t> prom_unsub_done;
    1172        2118 :     std::future<uint8_t> future_unsub_done = prom_unsub_done.get_future();
    1173        2118 :     uint8_t status = FELIX_CLIENT_STATUS_OK;
    1174        2118 :     if (contextHandler->getSubscriptionState(fid) == SUB || contextHandler->getSubscriptionState(fid) == SUBING){
    1175        2118 :         contextHandler->setSubscriptionState(fid, UNSUBING);
    1176        2118 :         contextHandler->pushSubTask(UnsubEvent{fid, std::move(prom_unsub_done)});
    1177        2118 :         netio_signal_fire(&sub_signal);
    1178             : 
    1179        2118 :         try{
    1180        2118 :             std::chrono::milliseconds timeout (5000);
    1181        2118 :             if (future_unsub_done.wait_for(timeout)==std::future_status::timeout){
    1182           0 :                 clog_error("Unsubscribe timeout expired, stop blocking to prevent deadlock.");
    1183           0 :                 return FELIX_CLIENT_STATUS_TIMEOUT;
    1184             :             }
    1185        2118 :             status = future_unsub_done.get();
    1186           0 :         } catch(std::exception& e){
    1187           0 :             clog_info("Exception while unsubscribing: %s FID: 0x%x", e.what(), fid);
    1188           0 :             throw;
    1189           0 :         }
    1190             :     }
    1191        2118 :     return status; //0 = OK, 1 = Already done
    1192        2118 : }
    1193             : 
    1194             : /**
    1195             :  * @brief Forward a user function to be executed by the eventloop
    1196             :  *
    1197             :  * It uses a netio-signal to execute the user function within the eventloop. The signal is allocated, registered, and destroyed fo every function call.
    1198             :  * So using it extensively might impact the performance of felix-client.
    1199             :  *
    1200             :  * @param user_function: void user function that is run by the eventloop.
    1201             :  */
    1202         313 : void FelixClient::exec(const UserFunction &user_function ) {
    1203         313 :     netio_signal* signal = (struct netio_signal*)malloc(sizeof(struct netio_signal)); // c style
    1204         313 :     SignalData* data = new SignalData();
    1205         313 :     data->signal = signal;
    1206         313 :     data->user_function = user_function;
    1207         313 :     data->evloop = &ctx.evloop;
    1208             : 
    1209         313 :     netio_signal_init(&ctx.evloop, signal);
    1210         313 :     signal->data = data;
    1211         313 :     signal->cb = on_signal;
    1212             : 
    1213         313 :     netio_signal_fire(signal);
    1214         313 : }
    1215             : 
    1216             : 
    1217         104 : netio_tag_t FelixClient::get_ctrl_fid(netio_tag_t fid) {
    1218         104 :     uint8_t sid = 0;
    1219         104 :     uint8_t to_flx = 1;
    1220         104 :     uint8_t virt = 1;
    1221         312 :     netio_tag_t ctrl_fid = get_fid_from_ids(get_did(fid),
    1222             :                                             get_cid(fid),
    1223             :                                             COMMAND_REPLY_LINK,
    1224             :                                             sid,
    1225         104 :                                             get_vid(fid),
    1226             :                                             to_flx,
    1227             :                                             virt);
    1228         104 :     return ctrl_fid;
    1229             : }
    1230             : 
    1231          66 : netio_tag_t FelixClient::get_subscribe_fid(netio_tag_t fid) {
    1232          66 :     uint8_t sid = 0;
    1233          66 :     uint8_t to_flx = 0;
    1234          66 :     uint8_t virt = 1;
    1235         198 :     netio_tag_t subscribe_fid = get_fid_from_ids(get_did(fid),
    1236             :                                                  get_cid(fid),
    1237             :                                                  COMMAND_REPLY_LINK,
    1238             :                                                  sid,
    1239          66 :                                                  get_vid(fid),
    1240             :                                                  to_flx,
    1241             :                                                  virt);
    1242          66 :     return subscribe_fid;
    1243             : }
    1244             : 
    1245             : 
    1246          40 : FelixClientThread::Status FelixClient::send_cmd(const std::vector<uint64_t>& fids,
    1247             :                                                          FelixClientThread::Cmd cmd,
    1248             :                                                          const std::vector<std::string>& cmd_args,
    1249             :                                                          std::vector<FelixClientThread::Reply>& replies) {
    1250          40 :     clog_debug("send_cmd");
    1251          40 :     if(terminating_felix_client.load()){
    1252           0 :         clog_debug("Already terminating not sending command");
    1253           0 :         return FelixClientThread::Status::OK;
    1254             :     }
    1255             : 
    1256             :     // No commands to be send, all OK
    1257          40 :     if(fids.size() == 0) {
    1258             :         return FelixClientThread::Status::OK;
    1259             :     }
    1260             : 
    1261          40 :     std::string message;
    1262          40 :     FelixClientThread::Status status = FelixClientThread::Status::OK;
    1263             : 
    1264             :     // check the command and command arguments
    1265          40 :     switch(cmd) {
    1266           0 :         case FelixClientThread::NOOP:
    1267           0 :             if (cmd_args.size() != 0) {
    1268           0 :                 message = std::string("Too many arguments for NOOP, needs 0 while %d given.", cmd_args.size());
    1269           0 :                 status = FelixClientThread::Status::ERROR_INVALID_ARGS;
    1270           0 :                 break;
    1271             :             }
    1272             :             break;
    1273          28 :         case FelixClientThread::GET:
    1274          28 :             if (cmd_args.size() != 1) {
    1275           2 :                 message = std::string("Not enough arguments for GET, needs 1 while %d given.", cmd_args.size());
    1276           2 :                 status = FelixClientThread::Status::ERROR_INVALID_ARGS;
    1277           2 :                 break;
    1278             :             }
    1279             :             break;
    1280           4 :         case FelixClientThread::SET:
    1281           4 :             if (cmd_args.size() != 2) {
    1282           0 :                 message = std::string("Not enough arguments for SET, needs 2 while %d given.", cmd_args.size());
    1283           0 :                 status = FelixClientThread::Status::ERROR_INVALID_ARGS;
    1284           0 :                 break;
    1285             :             }
    1286             :             break;
    1287           6 :         case FelixClientThread::ECR_RESET:
    1288           6 :             if (cmd_args.size() != 1) {
    1289           0 :                 message = std::string("Not enough arguments for ECR_RESET, needs 1 while", cmd_args.size());
    1290           0 :                 status = FelixClientThread::Status::ERROR_INVALID_ARGS;
    1291           0 :                 break;
    1292             :             }
    1293             :             break;
    1294           2 :         default:
    1295           4 :             message = "Cmd not available: " + FelixClientThread::to_string(cmd);
    1296           2 :             status = FelixClientThread::Status::ERROR_INVALID_CMD;
    1297           2 :             break;
    1298             :     }
    1299             : 
    1300             : 
    1301             :     // deduce set of ctrl_fids from list of normal fids
    1302          80 :     std::unordered_map<netio_tag_t, netio_tag_t> ctrl_by_subscribe_fid;
    1303          80 :     std::set<uint64_t> sub_fids_set;
    1304          40 :     clog_info("Looking up %d subscribe FIDs", fids.size());
    1305         106 :     for(auto const& fid : fids) {
    1306          66 :         netio_tag_t ctrl_fid = get_ctrl_fid(fid);
    1307          66 :         netio_tag_t sub_fid = get_subscribe_fid(ctrl_fid);
    1308          66 :         sub_fids_set.insert(sub_fid);
    1309          66 :         std::error_code ec;
    1310          66 :         felixbus::FelixBusInfo bus_info = bus.get_info(sub_fid, ec);
    1311             : 
    1312          66 :         if( bus_info.ip != "" ){
    1313          62 :             ctrl_by_subscribe_fid[sub_fid] = ctrl_fid;
    1314             :         }
    1315             :         else{
    1316           4 :             clog_error("Discarding ctrl fid 0x%x because reply fid 0x%x does not appear in the bus", ctrl_fid, sub_fid);
    1317           4 :             FelixClientThread::Reply reply;
    1318           4 :             reply.ctrl_fid = ctrl_fid;
    1319           4 :             reply.status = FelixClientThread::Status::ERROR_NO_SUBSCRIPTION;
    1320           4 :             reply.message = "Invalid fid, not found in bus";
    1321           4 :             reply.value = 0;
    1322           4 :             replies.push_back(reply);
    1323           4 :         }
    1324          66 :     }
    1325             : 
    1326          80 :     std::vector<uint64_t> sub_fids;
    1327          40 :     sub_fids.assign(sub_fids_set.begin(), sub_fids_set.end());
    1328             : 
    1329          40 :     clog_info("Subscribing to %d FIDs", ctrl_by_subscribe_fid.size());
    1330          40 :     uint subscribe_timeout = 5000;
    1331          40 :     bool for_register = true;
    1332          40 :     int ret = subscribe(sub_fids, subscribe_timeout, for_register);
    1333          40 :     if(ret == FELIX_CLIENT_STATUS_TIMEOUT){
    1334           4 :         auto it = ctrl_by_subscribe_fid.begin();
    1335           6 :         while (it != ctrl_by_subscribe_fid.end()){
    1336           2 :             if(!contextHandler->isSubscribed(it->first)){
    1337           0 :                 FelixClientThread::Reply reply;
    1338           0 :                 reply.ctrl_fid = it->second;
    1339           0 :                 reply.status = FelixClientThread::Status::ERROR_NO_SUBSCRIPTION;
    1340           0 :                 reply.message = "Could not subscribe: timeout";
    1341           0 :                 reply.value = 0;
    1342           0 :                 replies.push_back(reply);
    1343           0 :                 it = ctrl_by_subscribe_fid.erase(it);
    1344           0 :             } else {
    1345           2 :                 it++;
    1346             :             }
    1347             :         }
    1348             :     }
    1349             : 
    1350          40 :     clog_info("Sending command");
    1351             : 
    1352             :     // continue with the connected ctrl_fids and send them the command
    1353          80 :     std::unordered_map<std::string, netio_tag_t> ctrl_fid_by_uuid;
    1354          82 :     for(auto const& sub_ctrl_fids : ctrl_by_subscribe_fid) {
    1355          42 :         uuid_t uuid_dec;
    1356          42 :         char uuid[36 + 1];
    1357          42 :         uuid_generate(uuid_dec);
    1358          42 :         uuid_unparse_upper(uuid_dec, uuid);
    1359             : 
    1360          42 :         clog_info("Preparing cmd...");
    1361          42 :         char json[256];
    1362          42 :         struct jWriteControl jwc;
    1363          42 :         jwOpen(&jwc, json, 256 - 1, JW_ARRAY, JW_COMPACT);
    1364          42 :         jwArr_object(&jwc);
    1365          42 :         jwObj_string(&jwc, FelixClientThread::to_string(FelixClientThread::UUID).c_str(), uuid);
    1366          84 :         jwObj_string(&jwc, FelixClientThread::to_string(FelixClientThread::CMD).c_str(), FelixClientThread::to_string(cmd).c_str());
    1367          42 :         jwObj_array(&jwc, FelixClientThread::to_string(FelixClientThread::CMD_ARGS).c_str());
    1368          90 :         for(auto const& cmd_arg : cmd_args) {
    1369          48 :             jwArr_string(&jwc, cmd_arg.c_str());
    1370             :         }
    1371          42 :         jwEnd(&jwc);
    1372          42 :         jwEnd(&jwc);
    1373          42 :         jwClose(&jwc);
    1374          42 :         int json_length = strlen(json);
    1375          42 :         clog_debug("len=%d; json=%d", json_length, json);
    1376             : 
    1377          42 :         try {
    1378          42 :             clog_info("Sending cmd to fid 0x%x", sub_ctrl_fids.second);
    1379          42 :             send_data(sub_ctrl_fids.second, (const uint8_t*)json, json_length, true);
    1380          40 :             uuids.emplace(uuid);
    1381          42 :             ctrl_fid_by_uuid.emplace(uuid, sub_ctrl_fids.second);
    1382           2 :         } catch (FelixClientException& error) {
    1383           2 :             clog_info("Could not send cmd for FID 0x%x because: %s", sub_ctrl_fids.second, error.what());
    1384             :             // sending timed out or other problem
    1385           2 :             FelixClientThread::Reply reply;
    1386           2 :             reply.ctrl_fid = sub_ctrl_fids.second;
    1387           2 :             reply.status = FelixClientThread::Status::ERROR_NO_CONNECTION;
    1388           2 :             reply.message = "Could not send cmd";
    1389           2 :             reply.value = 0;
    1390           2 :             replies.push_back(reply);
    1391           2 :         }
    1392             : 
    1393             :     }
    1394             : 
    1395             :     // wait for replies (or timeout)
    1396          40 :     clog_info("Waiting for %d replies...", ctrl_fid_by_uuid.size());
    1397          80 :     for(auto const &it : ctrl_fid_by_uuid) {
    1398          40 :         clog_debug("%s 0x%x", it.first.c_str(), it.second);
    1399             :     }
    1400             : 
    1401          40 :     uint timeoutms = 5000;
    1402          40 :     uint timeout = 0;
    1403          40 :     uint span = std::min(timeoutms, 250U);
    1404          40 :     uint recv_replies = 0;
    1405         232 :     while ((ctrl_fid_by_uuid.size() != recv_replies) && (timeout <= timeoutms)) {
    1406         192 :         recv_replies = 0;
    1407         392 :         for (auto& uuid_it : ctrl_fid_by_uuid){
    1408         200 :             if (reply_by_uuid.find(uuid_it.first) != reply_by_uuid.end()) {
    1409          40 :                 recv_replies++;
    1410             :             }
    1411             :         }
    1412         192 :         std::this_thread::sleep_for(std::chrono::milliseconds(span));
    1413         192 :         timeout += span;
    1414             :     }
    1415          40 :     clog_info("Finished waiting");
    1416             : 
    1417          80 :     for (auto& ctrl_fid_uuid : ctrl_fid_by_uuid){
    1418             :         // take it out of the list of uuids
    1419          40 :         std::string uuid = ctrl_fid_uuid.first;
    1420          40 :         uuids.erase(uuid);
    1421             : 
    1422          40 :         auto reply_it = reply_by_uuid.find(uuid);
    1423          40 :         if(reply_it != reply_by_uuid.end()){
    1424          36 :             clog_info("uuid: %s", ctrl_fid_uuid.first.c_str());
    1425          36 :             FelixClientThread::Reply reply = (*reply_it).second;
    1426          36 :             replies.push_back(reply);
    1427             : 
    1428          36 :             clog_info("Found reply for %s", ctrl_fid_uuid.first.c_str());
    1429          36 :             clog_info(" %s", FelixClientThread::to_string(reply).c_str());
    1430             : 
    1431          36 :             reply_by_uuid.erase(reply_it);
    1432          36 :         } else {
    1433           4 :             netio_tag_t ctrl_fid = ctrl_fid_uuid.second;
    1434           4 :             clog_debug("UUID %s timed out for ctrl_fid 0x%x", uuid.c_str(), ctrl_fid);
    1435             : 
    1436           4 :             FelixClientThread::Reply reply;
    1437           4 :             reply.ctrl_fid = ctrl_fid;
    1438           4 :             reply.status = FelixClientThread::Status::ERROR_NO_REPLY;
    1439           4 :             reply.message = "Did not receive a reply";
    1440           4 :             reply.value = 0;
    1441           4 :             replies.push_back(reply);
    1442           4 :         } //TODO: What happens to replies that arrive aftert the timeout and staty in the map
    1443          40 :         clog_debug("send_cmd done");
    1444          40 :     }
    1445             : 
    1446             :     // calculate summary_status, no replies is an ERROR
    1447          40 :     FelixClientThread::Status summary_status = replies.size() > 0 ? FelixClientThread::Status::OK : FelixClientThread::Status::ERROR;
    1448          40 :     summary_status = std::max(summary_status, status);
    1449          86 :     for(auto const& reply : replies) {
    1450          60 :         summary_status = std::max(summary_status, reply.status);
    1451             :     }
    1452          40 :     clog_info("Calculated summary status %d:%s from %d replies", summary_status, FelixClientThread::to_string(summary_status).c_str(), replies.size());
    1453             : 
    1454          40 :     return summary_status;
    1455          40 : }
    1456             : 
    1457         673 : void FelixClient::callback_on_init( OnInitCallback on_init ) {
    1458         673 :     clog_info("Registering on_init");
    1459         673 :     cb_on_init = on_init;
    1460         673 : }
    1461             : 
    1462         236 : void FelixClient::callback_on_data( OnDataCallback on_data ) {
    1463         236 :     clog_info("Registering on_data");
    1464         236 :     cb_on_data = on_data;
    1465         236 : }
    1466             : 
    1467           2 : void FelixClient::callback_on_buffer( OnBufferCallback on_buffer ) {
    1468           2 :     clog_info("Registering on_buffer");
    1469           2 :     cb_on_buffer = on_buffer;
    1470           2 : }
    1471             : 
    1472         665 : void FelixClient::callback_on_connect( OnConnectCallback on_connect ) {
    1473         665 :     clog_info("Registering on_connect");
    1474         665 :     cb_on_connect = on_connect;
    1475         665 : }
    1476             : 
    1477         657 : void FelixClient::callback_on_disconnect( OnDisconnectCallback on_disconnect ) {
    1478         657 :     clog_info("Registering on_disconnect");
    1479         657 :     cb_on_disconnect = on_disconnect;
    1480         657 : }
    1481             : 
    1482          36 : void FelixClient::callback_on_user_timer(  OnUserTimerCallback on_user_timer_cb ) {
    1483          36 :     clog_info("Registering on_user_timer");
    1484          36 :     cb_on_user_timer = on_user_timer_cb;
    1485          36 : }
    1486             : 
    1487         673 : void FelixClient::on_init_eventloop(void* ptr) {
    1488         673 :     clog_info("On init");
    1489         673 :     FelixClient* client = static_cast<FelixClient*>(ptr);
    1490         673 :     client->ready = true;
    1491         673 :     std::function<void()> callback = client->cb_on_init;
    1492         673 :     if (callback) {
    1493         673 :         (callback)();
    1494             :     }
    1495         673 : }
    1496             : 
    1497             : /**
    1498             :  * @brief Callback when a connection to a remote endpoint was successfully established
    1499             :  *
    1500             :  * When a socket is in TIMEOUT state (which means that the subscription timed out), an unsubscribe is triggered because subscribe() already returned
    1501             :  * an error to the user.
    1502             :  * Otherwise, the callback sets the correct state for the socket and will trigger the user on_connection_established callback for every FID that is associated with this connection.
    1503             :  *
    1504             :  * @param socket: pointer to socket that established the connection. This could be a (un-)buffered send or subscribe socket.
    1505             :  */
    1506         761 : void FelixClient::on_connection_established(void* socket) {
    1507         761 :     clog_debug("on_connection_established");
    1508         761 :     if (contextHandler->exists(socket)){
    1509         761 :         contextHandler->setSocketState(socket, CONNECTED);
    1510        3069 :         for(auto fid : contextHandler->getFidsBySocket(socket)){
    1511        2308 :             if (contextHandler->exists(fid)){
    1512        2308 :                 if(contextHandler->getConnectionType(fid) == PUBSUB){
    1513        1785 :                     if(contextHandler->getSubscriptionState(fid) == TIMEOUT){ //FID has timed out before and we do not want to notify the user nor need this subscription
    1514           0 :                         contextHandler->setSubscriptionState(fid, UNSUBING);
    1515           0 :                         send_unsubscribe_msg(fid);
    1516             :                         //contextHandler->removeFromFidsToResubscribe(fid);
    1517           0 :                         return;
    1518             :                     }
    1519        1785 :                     struct timespec t2;
    1520        1785 :                     clock_gettime(CLOCK_MONOTONIC_RAW, &t2);
    1521        1785 :                     std::unordered_map<std::string, timespec> subscriptionTimes = contextHandler->getSubscriptionTimes(fid);
    1522        3570 :                     double subscribe_useconds = (t2.tv_sec - subscriptionTimes["subscribe"].tv_sec)*1e6
    1523        3570 :                                             + (t2.tv_nsec - subscriptionTimes["subscribe"].tv_nsec)*1e-3;
    1524        1785 :                     clog_debug("Beginning of subscribe() until on_connect for fid 0x%x took %fus", fid, subscribe_useconds);
    1525        3570 :                     double subscribe_evloop_useconds = (t2.tv_sec - subscriptionTimes["subscribe_evloop"].tv_sec)*1e6
    1526        3570 :                                                     + (t2.tv_nsec - subscriptionTimes["subscribe_evloop"].tv_nsec)*1e-3;
    1527        1785 :                     clog_debug("Beginning of subscribe_evloop() until on_connect for fid 0x%x took %fus", fid, subscribe_evloop_useconds);
    1528        1785 :                 }
    1529        2308 :                 contextHandler->removeFromFidsToResubscribe(fid);
    1530        2308 :                 contextHandler->setSubscriptionState(fid, SUB);
    1531        2308 :                 if (cb_on_connect) {
    1532        2300 :                     (cb_on_connect)(fid);
    1533             :                 }
    1534        2308 :                 clog_debug("Tag: 0x%x out of %d FIDs has status: CONNECTED", fid, contextHandler->getFidsBySocket(socket).size());
    1535             :             }
    1536         761 :         }
    1537             :     }
    1538             : }
    1539             : 
    1540         236 : void FelixClient::on_connection_established(struct netio_send_socket* socket) {
    1541         236 :     clog_info("Unbuffered send connection established");
    1542         236 :     static_cast<FelixClient*>(socket->usr)->on_connection_established((void*)socket);
    1543         236 : }
    1544             : 
    1545         271 : void FelixClient::on_connection_established(struct netio_buffered_send_socket* socket) {
    1546         271 :     clog_info("Buffered send connection established");
    1547         271 :     static_cast<FelixClient*>(socket->usr)->on_connection_established((void*)socket);
    1548         271 : }
    1549             : 
    1550         165 : void FelixClient::on_connection_established(struct netio_subscribe_socket* socket) {
    1551         165 :     clog_info("Buffered connection established to %s", get_address(socket).c_str());
    1552         165 :     static_cast<FelixClient*>(socket->usr)->on_connection_established((void*)socket);
    1553         165 : }
    1554             : 
    1555          89 : void FelixClient::on_connection_established(struct netio_unbuffered_subscribe_socket* socket) {
    1556          89 :     clog_info("Unbuffered connection established to %s", get_address(socket).c_str());
    1557          89 :     static_cast<FelixClient*>(socket->usr)->on_connection_established((void*)socket);
    1558          89 : }
    1559             : 
    1560             : /**
    1561             :  * @brief Callback when a connection to a remote endpoint was closed
    1562             :  *
    1563             :  * Since send_socket objects are not deleted right away when the connection is closed, a counter is decremented to keep track if all send connections where closed in the stop() function.
    1564             :  *
    1565             :  * @param socket: pointer to socket that is associated with this endpoint. This could be a (un-)buffered send or subscribe socket.
    1566             :  */
    1567         725 : void FelixClient::on_connection_closed(void* socket) {
    1568         725 :     clog_debug("FelixClient::on_connection_closed");
    1569         725 :     contextHandler->setSocketState(socket, DISCONNECTED);
    1570         725 :     auto fids = contextHandler->getFidsBySocket(socket);
    1571         725 :     if(contextHandler->getConnectionType(socket) == SENDRECV){
    1572         507 :         decrement_num_send_sockets();
    1573             :     }
    1574         725 :     if(!terminating_felix_client){
    1575         100 :         contextHandler->removeSocket(socket);
    1576             :     }
    1577             : 
    1578        1266 :     for(auto fid : fids ){
    1579         541 :         if (cb_on_disconnect) {
    1580         529 :             (cb_on_disconnect)(fid);
    1581             :         }
    1582             :     }
    1583             : 
    1584         725 : }
    1585             : 
    1586         236 : void FelixClient::on_connection_closed(struct netio_send_socket* socket) {
    1587         236 :     clog_info("Unbuffered send connection closed");
    1588         236 :     static_cast<FelixClient*>(socket->usr)->on_connection_closed((void*)socket);
    1589             :     //static_cast<FelixClient*>(socket->usr)->decrement_num_send_sockets();
    1590         236 : }
    1591             : 
    1592         271 : void FelixClient::on_connection_closed(struct netio_buffered_send_socket* socket) {
    1593         271 :     clog_info("Buffered send connection closed");
    1594         271 :     static_cast<FelixClient*>(socket->usr)->on_connection_closed((void*)socket);
    1595             :     //static_cast<FelixClient*>(socket->usr)->decrement_num_send_sockets();
    1596         271 : }
    1597             : 
    1598         136 : void FelixClient::on_connection_closed(struct netio_subscribe_socket* socket) {
    1599         136 :     clog_info("Buffered connection closed to %s", get_address(socket).c_str());
    1600         136 :     netio_close_socket(&static_cast<FelixClient*>(socket->usr)->ctx.evloop, socket, socket_type::BSUB);
    1601         136 :     static_cast<FelixClient*>(socket->usr)->on_connection_closed((void*)socket);
    1602         136 : }
    1603             : 
    1604          82 : void FelixClient::on_connection_closed(struct netio_unbuffered_subscribe_socket* socket) {
    1605          82 :     clog_info("Unbuffered connection closed to %s", get_address(socket).c_str());
    1606          82 :     netio_close_socket(&static_cast<FelixClient*>(socket->usr)->ctx.evloop, socket, socket_type::USUB);
    1607          82 :     static_cast<FelixClient*>(socket->usr)->on_connection_closed((void*)socket);
    1608          82 : }
    1609             : 
    1610             : /**
    1611             :  * @brief Callback when a connection to a remote endpoint was refused
    1612             :  *
    1613             :  * The socket is then removed via exec() since an immediate removal can trigger a crash the the socket is still accessed after the removal by netio-next.
    1614             :  *
    1615             :  * @param socket: pointer to socket that is associated with this endpoint. This could be a (un-)buffered send or subscribe socket.
    1616             :  */
    1617         207 : void FelixClient::on_error_connection_refused(void* socket) {
    1618         207 :     clog_debug("on_error_connection_refused");
    1619         207 :     contextHandler->setSocketState(socket, DISCONNECTED);
    1620         414 :     exec([this, socket]{this->contextHandler->removeSocket(socket, true);});
    1621         207 :     clog_debug("Socket deleted from socket map.");
    1622             : 
    1623         207 : }
    1624             : 
    1625          46 : void FelixClient::on_error_connection_refused(struct netio_send_socket* socket) {
    1626          46 :     clog_info("Unbuffered send connection refused");
    1627          46 :     static_cast<FelixClient*>(socket->usr)->on_error_connection_refused((void*)socket);
    1628          46 : }
    1629             : 
    1630          64 : void FelixClient::on_error_connection_refused(struct netio_buffered_send_socket* socket) {
    1631          64 :     clog_info("Buffered send connection refused");
    1632          64 :     static_cast<FelixClient*>(socket->usr)->on_error_connection_refused((void*)socket);
    1633          64 : }
    1634             : 
    1635          53 : void FelixClient::on_error_connection_refused(struct netio_subscribe_socket* socket) {
    1636          53 :     clog_info("Buffered connection refused to %s", get_address(socket).c_str());
    1637          53 :     static_cast<FelixClient*>(socket->usr)->on_error_connection_refused((void*)socket);
    1638             : 
    1639          53 : }
    1640             : 
    1641          44 : void FelixClient::on_error_connection_refused(struct netio_unbuffered_subscribe_socket* socket) {
    1642          44 :     clog_info("Unbuffered connection refused to %s", get_address(socket).c_str());
    1643          44 :     static_cast<FelixClient*>(socket->usr)->on_error_connection_refused((void*)socket);
    1644             : 
    1645          44 : }
    1646             : 
    1647         443 : void FelixClient::on_send_completed(struct netio_send_socket* socket, uint64_t key) {
    1648         443 :     clog_debug("Unbuffered send completed for 0x%x", key);
    1649         443 : }
    1650             : 
    1651          38 : void FelixClient::on_register_msg_received(void* socket, netio_tag_t tag, void* data_ptr, size_t size) {
    1652          38 :     if (size > 0) {
    1653          38 :         uint8_t* data = (uint8_t*)(data_ptr);
    1654          38 :         data++;
    1655          38 :         size--;
    1656             : 
    1657             :         // handle subscriptions for felix-register
    1658          38 :         clog_info("Answer on register command");
    1659             : 
    1660          38 :         netio_tag_t ctrl_fid = get_ctrl_fid(tag);
    1661          38 :         clog_info(" for ctrl_fid 0x%x", ctrl_fid);
    1662             : 
    1663          38 :         simdjson::dom::element ops = parser.parse(simdjson::padded_string((const char*)data, size));
    1664             :         // FIXME check that this works for more than one reply
    1665         152 :         for (simdjson::dom::object op : ops) {
    1666          38 :             FelixClientThread::Reply reply;
    1667          76 :             std::string uuid = std::string(op[FelixClientThread::to_string(FelixClientThread::UUID)]);
    1668          38 :             clog_info("   uuid %s", uuid.c_str());
    1669             : 
    1670             :             // only add if we are still looking for the uuid (timeout...)
    1671          38 :             if (uuids.find(uuid) != uuids.end()) {
    1672          36 :                 reply.ctrl_fid = ctrl_fid;
    1673          72 :                 int64_t s = op[FelixClientThread::to_string(FelixClientThread::STATUS)];
    1674          36 :                 reply.status = static_cast<FelixClientThread::Status>(s);
    1675          36 :                 reply.message = std::string(op[FelixClientThread::to_string(FelixClientThread::MESSAGE)]);
    1676          72 :                 reply.value = uint64_t(op[FelixClientThread::to_string(FelixClientThread::VALUE)]);
    1677             : 
    1678          74 :                 reply_by_uuid[uuid] = reply;
    1679             :             } else {
    1680           2 :                 clog_info("   uuid not found, answer ignored", uuid.c_str());
    1681             :             }
    1682          38 :         }
    1683             :     }
    1684          38 : }
    1685             : 
    1686          38 : void FelixClient::on_register_msg_received(struct netio_subscribe_socket* socket, netio_tag_t tag, void* data, size_t size) {
    1687          38 :     static_cast<FelixClient*>(socket->usr)->on_register_msg_received((void*)socket, tag, data, size);
    1688          38 : }
    1689             : 
    1690           0 : void FelixClient::on_register_msg_received(struct netio_unbuffered_subscribe_socket* socket, netio_tag_t tag, void* data, size_t size) {
    1691           0 :     static_cast<FelixClient*>(socket->usr)->on_register_msg_received((void*)socket, tag, data, size);
    1692           0 : }
    1693             : 
    1694             : 
    1695             : /**
    1696             :  * @brief Callback that is triggered by the subscription timer
    1697             :  *
    1698             :  * The timer is hard-coded to 1s in the constructor. The timer is started when there are subscriptions that are not up and is stopped again when all FIDs are subscribed.
    1699             :  * It is particularly used when the server side (felix-tohost/felix-toflx) is restarted. This way felix-client attempts to resubscribe all FIDs periodically and the user does not have to take care of it.
    1700             :  * It does not only handle subscriptions to felix-tohost but will also re-establish send connections to felix-toflx
    1701             :  *
    1702             :  * @param ptr: pointer to felix-client object itself
    1703             :  */
    1704        3762 : void FelixClient::on_timer(void* ptr) {
    1705             : 
    1706        3762 :     clog_debug("On timer called. Attempting to resubscribe");
    1707        3762 :     uint count = 0;
    1708        3762 :     FelixClient* client = static_cast<FelixClient*>(ptr);
    1709        3762 :     if(client->terminating_felix_client.load()){
    1710           0 :         clog_debug("Already terminating no reason to resubscribe");
    1711           0 :         return;
    1712             :     }
    1713        3762 :     std::vector<uint64_t> fids_to_resubscribe = client->contextHandler->getFidsToResubscribe();
    1714        3762 :     clog_debug("Resubscribing for %d", fids_to_resubscribe.size());
    1715        3945 :     for(auto& fid : fids_to_resubscribe) {
    1716         183 :         if(client->contextHandler->getSocketState(fid) == DISCONNECTED){
    1717         175 :             if(fid & 0x8000){
    1718          66 :                 try{
    1719          66 :                     std::promise<uint8_t> prom_sub;
    1720          66 :                     std::future<uint8_t> future_sub_done = prom_sub.get_future();
    1721          66 :                     client->establish_send_connection(fid, std::move(prom_sub));
    1722          66 :                     if(!future_sub_done.get()) {
    1723          66 :                         count++;
    1724             :                     }
    1725          66 :                 } catch(...){
    1726           0 :                     continue;
    1727           0 :                 }
    1728             :             } else {
    1729         109 :                 if (!client->contextHandler->canSubscribe(fid)) {
    1730           0 :                     clog_debug("Already attempting to subscribe to 0x%x", fid);
    1731           0 :                     continue;
    1732             :                 }
    1733         109 :                 if (client->subscribe_evloop(fid)) {
    1734         105 :                     count++;
    1735             :                 }
    1736             :             }
    1737             :         }
    1738         183 :         clog_trace("Socket has state: %d", client->contextHandler->getSocketState(fid));
    1739         183 :         clog_trace("FID 0x%x is in state: %d", fid, client->contextHandler->getSubscriptionState(fid));
    1740             :     }
    1741             : 
    1742        3762 :     if (count > 0) {
    1743         114 :         clog_debug("Subscription timer: resubscribing to %d tags", count);
    1744             :     }
    1745        3762 : }
    1746             : 
    1747             : 
    1748         313 : void FelixClient::on_signal(void* ptr) {
    1749         313 :     clog_debug("On signal");
    1750         313 :     struct SignalData* data = (struct SignalData*)ptr;
    1751         313 :     UserFunction user_function = data->user_function;
    1752         313 :     (user_function)();
    1753             : 
    1754         313 :     netio_signal_close(data->evloop, data->signal);
    1755         313 :     free(data->signal);
    1756         626 :     delete(data);
    1757         313 : }
    1758             : 
    1759        1698 : std::string FelixClient::get_address(netio_subscribe_socket *socket) {
    1760        3396 :     return std::string(socket->remote_hostname) + std::string(":") + std::to_string(socket->remote_port);
    1761             : }
    1762             : 
    1763        1389 : std::string FelixClient::get_address(netio_unbuffered_subscribe_socket *socket) {
    1764        2778 :     return std::string(socket->remote_hostname) + std::string(":") + std::to_string(socket->remote_port);
    1765             : }
    1766             : 
    1767             : 
    1768        1185 : struct netio_subscribe_socket* FelixClient::get_buffered_subscribe_socket(uint64_t fid) {
    1769        1185 :     bool needInititialization = contextHandler->addOrCreateSocket(fid);
    1770        1185 :     struct netio_subscribe_socket* socket = contextHandler->getSocket<netio_subscribe_socket*>(fid);
    1771        1185 :     clog_debug("get_buffered_subscribe_socket for fid 0x%x and socket 0x%p", fid, socket);
    1772        1185 :     if(needInititialization){
    1773         218 :         initialize_buffered_subscribe_socket(socket, fid);
    1774             :     }
    1775        1185 :     return socket;
    1776             : }
    1777             : 
    1778             : 
    1779             : /**
    1780             :  * @brief initializes a buffered subscribe socket
    1781             :  *
    1782             :  * Right now we allocate double the number of pages that are published in the bus to better overcome some fluctuations in the network load.
    1783             :  *
    1784             :  * @param socket: pointer to netio_subscribe_socket
    1785             :  * @param fid: FID associated with this subscribe socket
    1786             :  */
    1787         218 : void FelixClient::initialize_buffered_subscribe_socket(struct netio_subscribe_socket* socket, uint64_t fid){
    1788         218 :     clog_debug("Setup socket using address %s:%d", contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid));
    1789         218 :     struct netio_buffered_socket_attr attr;
    1790         218 :     attr.num_pages = 2 * contextHandler->getPages(fid); //Hard-coded double the number of pages for FLX-2041, todo: make a parameter later
    1791         218 :     attr.pagesize = contextHandler->getPagesize(fid);
    1792         218 :     attr.watermark = contextHandler->getWatermark(fid);
    1793         436 :     netio_subscribe_socket_init(socket, &ctx, &attr,
    1794         218 :         local_ip_address.c_str(), contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid));
    1795             : 
    1796         218 :     socket->cb_connection_closed = on_connection_closed;
    1797         218 :     socket->cb_connection_established = on_connection_established;
    1798         218 :     socket->cb_error_connection_refused = on_error_connection_refused;
    1799         218 :     if (contextHandler->isForRegister(fid)) {
    1800          34 :         socket->cb_msg_received = on_register_msg_received;
    1801             :     } else {
    1802         184 :         socket->cb_msg_received = on_msg_received;
    1803             :     }
    1804             : 
    1805         218 :     if(buffer_callback){
    1806           2 :         socket->cb_buf_received = on_buf_received;
    1807             :     }
    1808             : 
    1809         218 :     clog_debug("Setup socket for %s", get_address(socket).c_str());
    1810             :     // store object pointer (this) in socket, for use in c-style callbacks
    1811         218 :     socket->usr = this;
    1812         218 : }
    1813             : 
    1814        1089 : struct netio_unbuffered_subscribe_socket* FelixClient::get_unbuffered_subscribe_socket(uint64_t fid) {
    1815             :     // Create socket if it does not exist, otherwise lookup
    1816        1089 :     bool needInititialization = contextHandler->addOrCreateSocket(fid);
    1817             : 
    1818        1089 :     struct netio_unbuffered_subscribe_socket* socket = contextHandler->getSocket<netio_unbuffered_subscribe_socket*>(fid);
    1819        1089 :     clog_debug("get_unbuffered_subscribe_socket");
    1820        1089 :     if(needInititialization){
    1821         133 :         initialize_unbuffered_subscribe_socket(socket, fid);
    1822             :     }
    1823             : 
    1824             : 
    1825        1089 :     return socket;
    1826             : }
    1827             : 
    1828             : /**
    1829             :  * @brief initializes an unbuffered subscribe socket
    1830             :  *
    1831             :  * If the 'unbuffered_netio_pages' parameter is specified by the user, the respective number and page_size will be allocated for the buffers.
    1832             :  * If they are not specified or 0, the default number ans size are taken from the bus.
    1833             :  * Hence, the user can manually overwrite the the default settings in thus bus. However, this is not recommended if there are no good reason for this.
    1834             :  * Especially the page size needs to be at least as big as the one on the felix-tohost side.
    1835             :  *
    1836             :  * @param socket: pointer to netio_unbuffered_subscribe_socket
    1837             :  * @param fid: FID associated with this subscribe socket
    1838             :  */
    1839         133 : void FelixClient::initialize_unbuffered_subscribe_socket(netio_unbuffered_subscribe_socket* socket, uint64_t fid){
    1840         133 :     clog_debug("Setup socket using address %s:%d", contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid));
    1841             : 
    1842             :     // FLX-1222 use params
    1843             :     // FLX-1222 allocate somewhere globally, per socket
    1844             :     //buffer size not specified, taken by the bus
    1845         133 :     if (unbuffered_netio_pages  == 0){
    1846          11 :         clog_trace("Creating unbuffered subscribe socket. Buffer not specified, using bus parameters. Pages: %d pagesize: %d", contextHandler->getPages(fid), contextHandler->getPagesize(fid));
    1847          22 :         netio_unbuffered_subscribe_socket_init(socket, &ctx,
    1848          22 :         local_ip_address.c_str(), contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid),
    1849          11 :         contextHandler->getPagesize(fid), contextHandler->getPages(fid));
    1850             :     } else {
    1851             :         //buffer size passed as parameter
    1852         122 :         clog_trace("Creating unbuffered subscribe socket. Buffer parameter specified. Pages: %d pagesize: %d", unbuffered_netio_pages, unbuffered_netio_pagesize);
    1853         122 :         netio_unbuffered_subscribe_socket_init(socket, &ctx,
    1854         244 :         local_ip_address.c_str(), contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid),
    1855         122 :         unbuffered_netio_pagesize, unbuffered_netio_pages); //correct even if unbuffered_netio_pagesize/-pages are needed?
    1856             :     }
    1857             : 
    1858         133 :     socket->cb_connection_closed = on_connection_closed;
    1859         133 :     socket->cb_connection_established = on_connection_established;
    1860         133 :     socket->cb_error_connection_refused = on_error_connection_refused;
    1861         133 :     if (contextHandler->isForRegister(fid)) {
    1862           0 :         socket->cb_msg_received = on_register_msg_received;
    1863             :     } else {
    1864         133 :         socket->cb_msg_received = on_msg_received;
    1865             :     }
    1866             : 
    1867         133 :     clog_debug("Setup socket for %s", get_address(socket).c_str());
    1868             : 
    1869         133 :     socket->usr = this;
    1870         133 : }
    1871             : 
    1872             : 
    1873         797 : struct netio_buffered_send_socket* FelixClient::get_buffered_send_socket(uint64_t fid) {
    1874             :     // Create socket if it does not exist, otherwise lookup
    1875         797 :     bool needInititialization = false;
    1876         797 :     if (contextHandler->getSocket<struct netio_buffered_send_socket*>(fid) == NULL){
    1877         347 :         needInititialization = contextHandler->addOrCreateSocket(fid);
    1878             :     }
    1879             : 
    1880         797 :     struct netio_buffered_send_socket* socket = contextHandler->getSocket<struct netio_buffered_send_socket*>(fid);
    1881             : 
    1882         797 :     if (needInititialization)
    1883             :     {
    1884         335 :         struct netio_buffered_socket_attr attr;
    1885         335 :         attr.num_pages = contextHandler->getPages(fid);
    1886         335 :         attr.pagesize = contextHandler->getPagesize(fid);
    1887         335 :         attr.watermark = contextHandler->getWatermark(fid);
    1888         335 :         attr.timeout_ms = 5;
    1889         335 :         netio_buffered_send_socket_init(socket, &ctx, &attr);
    1890         335 :         socket->cb_connection_closed = on_connection_closed;
    1891         335 :         socket->cb_connection_established = on_connection_established;
    1892         335 :         socket->cb_error_connection_refused = on_error_connection_refused;
    1893             : 
    1894             :         // store object pointer (this) in socket, for use in c-style callbacks
    1895         335 :         socket->usr = this;
    1896         670 :         netio_buffered_connect(socket, contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid));
    1897             :     }
    1898         797 :     return socket;
    1899             : }
    1900             : 
    1901             : 
    1902         740 : struct netio_send_socket* FelixClient::get_unbuffered_send_socket(uint64_t fid) {
    1903             :     // Create socket if it does not exist, otherwise lookup
    1904         740 :     bool needInititialization = false;
    1905         740 :     if (contextHandler->getSocket<struct netio_send_socket*>(fid) == NULL){
    1906         294 :         needInititialization = contextHandler->addOrCreateSocket(fid);
    1907             :     }
    1908             : 
    1909         740 :     struct netio_send_socket* socket = contextHandler->getSocket<struct netio_send_socket*>(fid);
    1910             : 
    1911         740 :     if (needInititialization)
    1912             :     {
    1913         282 :         netio_unbuffered_send_socket_init(socket, &ctx);
    1914             : 
    1915         282 :         struct netio_buffer buf;
    1916         282 :         buf.size = UNBUFFERED_MR_SIZE;
    1917         282 :         buf.data = malloc(UNBUFFERED_MR_SIZE);
    1918         282 :         memset(buf.data, 0, UNBUFFERED_MR_SIZE);
    1919         282 :         buf.mr = nullptr;
    1920         564 :         unbuf_mem_regions[contextHandler->getAddress(fid)] = buf;
    1921             : 
    1922         282 :         socket->cb_connection_closed = on_connection_closed;
    1923         282 :         socket->cb_connection_established = on_connection_established;
    1924         282 :         socket->cb_error_connection_refused = on_error_connection_refused;
    1925         282 :         socket->cb_send_completed = on_send_completed;
    1926             :         // store object pointer (this) in socket, for use in c-style callbacks
    1927         282 :         socket->usr = this;
    1928         282 :         netio_connect(socket, contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid));
    1929         564 :         clog_debug("Connected. Now registering memory region: 0x%p", &unbuf_mem_regions[contextHandler->getAddress(fid)]);
    1930         846 :         netio_register_send_buffer(socket, &unbuf_mem_regions[contextHandler->getAddress(fid)], 0);
    1931             :     }
    1932         740 :     return socket;
    1933             : }
    1934             : 
    1935             : 
    1936             : 
    1937         709 : void FelixClient::user_timer_init(){
    1938         709 :     netio_timer_init(&ctx.evloop, &user_timer);
    1939         709 :     user_timer.cb = on_user_timer;
    1940         709 :     user_timer.data = this;
    1941         709 : }
    1942             : 
    1943          37 : void FelixClient::user_timer_start(unsigned long interval){
    1944          37 :     netio_timer_start_ms(&user_timer, interval);
    1945          37 : }
    1946             : 
    1947           0 : void FelixClient::user_timer_stop(){
    1948           0 :     netio_timer_stop(&user_timer);
    1949           0 : }
    1950             : 
    1951         382 : void FelixClient::on_user_timer(void* ptr) {
    1952         382 :     FelixClient* client = static_cast<FelixClient*>(ptr);
    1953         382 :     client->cb_on_user_timer();
    1954         382 : }
    1955             : 
    1956             : 
    1957         507 : void FelixClient::decrement_num_send_sockets(){
    1958         507 :     if(total_number_send_sockets.load() > 0){
    1959         501 :         total_number_send_sockets--;
    1960             :     }
    1961         507 : }

Generated by: LCOV version 1.0