LCOV - code coverage report
Current view: top level - felix-client/src - felix_client.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 902 1157 78.0 %
Date: 2025-08-12 04:15:35 Functions: 72 81 88.9 %

          Line data    Source code
       1             : #include "felix/felix_client.hpp"
       2             : #include "felix/felix_client_exception.hpp"
       3             : #include "felix/felix_client_util.hpp"
       4             : #include "felix/felix_client_thread.hpp"
       5             : 
       6             : #include "felix/felix_fid.h"
       7             : 
       8             : #include "clog.h"
       9             : 
      10             : #include "netio/netio.h"
      11             : #include "netio/netio_tcp.h"
      12             : 
      13             : extern "C" {
      14             :     #include "jWrite.h"
      15             : }
      16             : 
      17             : #include <algorithm>
      18             : #include <uuid/uuid.h>
      19             : #include <unordered_set>
      20             : #include <numeric>
      21             : #include <regex>
      22             : #include <unistd.h>
      23             : #include <set>
      24             : 
      25         419 : FelixClient::FelixClient(const std::string& local_ip_or_interface,
      26             :                          std::string bus_dir,
      27             :                          std::string bus_group_name,
      28             :                          unsigned log_level,
      29             :                          bool verbose_bus,
      30             :                          unsigned netio_pages,
      31         419 :                          unsigned netio_pagesize) :
      32         419 :             ready(false),
      33         419 :             timer_delay_millis(0),
      34         419 :             ctx({0}),
      35         419 :             user_timer({0}),
      36         419 :             unbuffered_netio_pages(netio_pages),
      37         419 :             unbuffered_netio_pagesize(netio_pagesize),
      38         838 :             local_ip_address(get_ip_from_interface(local_ip_or_interface)) {
      39             : 
      40             :     // // handle here to make sure trace logging looks ok
      41             :     // local_ip_address = get_ip_from_interface(local_ip_or_interface);
      42             : 
      43         419 :     if (local_ip_address == "") {
      44           0 :         clog_fatal("Could not deduce the local ip address from %s", local_ip_or_interface.c_str());
      45           0 :         throw FelixClientException("felix-client: Could not deduce the local ip address from local_ip_or_interface");
      46             :     }
      47             : 
      48         419 :     clog_info("Local IP: %s", local_ip_address.c_str());
      49             : 
      50         419 :     clog_info("Initializing netio-next");
      51         419 :     netio_init(&ctx);
      52             : 
      53         419 :     clog_set_level(log_level);
      54         419 :     netio_set_debug_level(log_level);
      55             : 
      56             :     // init callback
      57         419 :     ctx.evloop.cb_init = on_init_eventloop;
      58         419 :     ctx.evloop.data = this;
      59             : 
      60         419 :     contextHandler = std::make_unique<FelixClientContextHandler>();
      61             : 
      62             : 
      63         419 :     send_signal = netio_signal();//(struct netio_signal*)malloc(sizeof(struct netio_signal)); // c style
      64             : 
      65         419 :     netio_signal_init(&ctx.evloop, &send_signal);
      66         419 :     send_signal.data = this;
      67         419 :     send_signal.cb = on_send_signal;
      68             : 
      69             : 
      70         419 :     sub_signal = netio_signal();
      71         419 :     netio_signal_init(&ctx.evloop, &sub_signal);
      72         419 :     sub_signal.data = this;
      73         419 :     sub_signal.cb = on_sub_signal;
      74             : 
      75             :     // setup timer for reconnection/resubscription
      76         419 :     netio_timer_init(&ctx.evloop, &subscribe_timer);
      77         419 :     subscribe_timer.cb = on_timer;
      78         419 :     subscribe_timer.data = this;
      79         419 :     terminating_felix_client = false;
      80         419 :     contextHandler->terminatingFelixClient = false;
      81             : 
      82         419 :     user_timer_init();
      83             : 
      84         419 :     clog_info("Initializing felix-bus");
      85         838 :     bus.set_path(bus_dir);
      86             : 
      87         419 :     bus.set_groupname(bus_group_name);
      88         419 :     bus.set_verbose(verbose_bus);
      89         419 : }
      90             : 
      91           0 : std::vector<int> FelixClient::parseCpuRange(const std::string &range) {
      92           0 :     static const std::regex re(R"(((\d+)-(\d+))|(\d+))");
      93           0 :     std::smatch sm;
      94           0 :     std::vector<int> data;
      95           0 :     for (std::string s(range); std::regex_search(s, sm, re); s = sm.suffix()) {
      96           0 :         if (sm[1].str().empty()) {
      97           0 :             data.push_back(std::stoi(sm[0]));
      98             :         } else {
      99           0 :             for (int i = std::stoi(sm[2]); i <= std::stoi(sm[3]); ++i) {
     100           0 :                 data.push_back(i);
     101             :             }
     102             :         }
     103           0 :     }
     104           0 :     return data;
     105           0 : }
     106             : 
     107           0 : void FelixClient::set_thread_affinity(const std::string& affinity) {
     108           0 :     this->affinity = affinity;
     109           0 : }
     110             : 
     111         308 : FelixClient::~FelixClient(){
     112         308 :     clog_info("Cleaning up FelixClient.");
     113         431 :     for(auto& it : unbuf_mem_regions){
     114         123 :         clog_debug("Removing mem reg: 0x%x. There are %d mem regions remaining.", it.second.mr, unbuf_mem_regions.size());
     115         123 :         it.second.size = 0;
     116         123 :         free(it.second.data);
     117             :     }
     118         308 : }
     119             : 
     120             : // event loop control
     121         419 : void FelixClient::run() {
     122         419 :     if (affinity != "") {
     123           0 :         std::vector<int> cpus = parseCpuRange(affinity);
     124           0 :         if (!cpus.empty()) {
     125           0 :             cpu_set_t pmask;
     126           0 :             CPU_ZERO(&pmask);
     127           0 :             for (int cpu : cpus) {
     128           0 :                 CPU_SET(cpu, &pmask);
     129             :             }
     130           0 :             int s = sched_setaffinity(0, sizeof(pmask), &pmask);
     131           0 :             if (s != 0) {
     132           0 :                 clog_error("Setting affinity to CPU rage [%s] failed with error = %d", affinity.c_str(), s);
     133             :             }
     134             :         }
     135           0 :     }
     136         419 :     pthread_setname_np(pthread_self(), "felix-client-thread");
     137             : 
     138         419 :     evloop_thread_id = std::this_thread::get_id();
     139         419 :     clog_info("In run: 0x%x", evloop_thread_id);
     140         419 :     clog_info("Running netio_run");
     141         419 :     netio_run(&ctx.evloop);
     142         318 :     clog_info("Finished netio_run");
     143         318 : }
     144             : 
     145         423 : void FelixClient::stop() {
     146         423 :     clog_info("Stopping netio");
     147         423 :     terminating_felix_client = true;
     148         423 :     contextHandler->terminatingFelixClient = true;
     149         423 :     netio_timer_close(&ctx.evloop, &subscribe_timer);
     150         423 :     netio_timer_close(&ctx.evloop, &user_timer);
     151             : 
     152             :     //unsubscribing all elinks
     153         423 :     std::vector<uint64_t> fids_to_unsubscribe = contextHandler->getFidsToUnsubscribe();
     154         423 :     clog_debug("Closing %d subscriptions", fids_to_unsubscribe.size());
     155        2323 :     for(auto fid : fids_to_unsubscribe){
     156        1900 :         if(evloop_thread_id == std::this_thread::get_id()){
     157          35 :             send_unsubscribe_msg(fid);
     158             :         } else {
     159        1865 :             unsubscribe(fid);
     160             :         }
     161             :     }
     162             : 
     163             :     uint timeout = 1000;
     164             :     uint t_expired = 0;
     165             :     uint span = 25U;
     166             : 
     167             : 
     168        6170 :     while (!contextHandler->areAllFidsUnsubscribed() && (t_expired <= timeout)){
     169        5747 :         std::this_thread::sleep_for(std::chrono::milliseconds(span));
     170        5747 :         t_expired += span;
     171             : 
     172             :     }
     173         423 :     if (timeout <= t_expired){
     174         139 :         clog_error("Timeout during stopping netio, not all subscriptions have been closed. %d subscriptions remain open.", contextHandler->getFidsToUnsubscribe().size());
     175             :     }
     176             : 
     177             : 
     178         423 :     std::vector<netio_send_socket*> send_sockets = contextHandler->getSendSockets();
     179         423 :     total_number_send_sockets.store(send_sockets.size());
     180         423 :     clog_debug("Closing netio %d send sockets.", total_number_send_sockets.load());
     181         687 :     for(auto& socket : send_sockets){
     182         264 :         netio_disconnect(socket);
     183             :     }
     184             : 
     185             :     t_expired = 0;
     186         685 :     while ((total_number_send_sockets.load(std::memory_order_relaxed) != 0) && (t_expired <= timeout)){
     187         262 :         std::this_thread::sleep_for(std::chrono::milliseconds(span));
     188         262 :         t_expired += span;
     189             :     }
     190         423 :     if (timeout <= t_expired){
     191           0 :         clog_error("Timeout during stopping netio, not all send sockets have been disconnected, %d remain open.", total_number_send_sockets.load());
     192             :     }
     193         423 :     netio_signal_close(&ctx.evloop, &send_signal);
     194         423 :     netio_signal_close(&ctx.evloop, &sub_signal);
     195         423 :     netio_terminate_signal(&ctx.evloop);
     196         423 : }
     197             : 
     198       14207 : int FelixClient::send_evloop(netio_tag_t fid, ToFlxHeader header, const uint8_t* data, size_t size, bool flush, std::promise<uint8_t> prom_send_done){
     199       14207 :     if(contextHandler->getSocketState(fid) == DISCONNECTED){
     200           0 :         clog_error("Unbuffered socket is disconnected. Thread: 0x%x FID: 0x%x", std::this_thread::get_id(), fid);
     201           0 :         std::exception_ptr e = std::make_exception_ptr(FelixClientSendConnectionException());
     202           0 :         prom_send_done.set_exception(e);
     203           0 :         return 1;
     204           0 :     }
     205             : 
     206       14207 :     int status = NETIO_STATUS_OK;
     207             : 
     208       14207 :     if (contextHandler->getType(fid) == BUFFERED) {
     209       14059 :         netio_buffered_send_socket* buffered_socket = get_buffered_send_socket(fid);
     210       14059 :         if(!contextHandler->exists(static_cast<void*>(buffered_socket))){return NETIO_STATUS_OK;}
     211       14059 :         size_t len = sizeof(header) + size;
     212       14059 :         uint8_t* buf = (uint8_t*)malloc(len);
     213       14059 :         memcpy(buf, &header, sizeof(header));
     214       14059 :         memcpy(buf + sizeof(header), data, size);
     215       14059 :         clog_debug("Sending (buffered) for 0x%x, i.e. tag %d, socket 0x%p", fid, header.elink, buffered_socket);
     216       14059 :         status = netio_buffered_send(buffered_socket, (void*)buf, len);
     217             : 
     218       14059 :         clog_debug("Status %d", status);
     219       14059 :         free(buf);
     220             : 
     221       14059 :         if (flush) {
     222       14059 :             netio_buffered_flush(buffered_socket);
     223             :         }
     224             : 
     225             :     } else {
     226             : 
     227         148 :         netio_send_socket* unbuffered_socket = get_unbuffered_send_socket(fid);
     228         148 :         std::string addr = contextHandler->getAddress(fid);
     229         148 :         clog_debug("Begin sending process. Buffer ptr: 0x%p", unbuf_mem_regions[addr].data);
     230         148 :         memcpy(unbuf_mem_regions[addr].data, &header, sizeof(header));
     231         148 :         memcpy((uint8_t*)unbuf_mem_regions[addr].data + sizeof(header), data, size);
     232         148 :         clog_debug("Memcopy done. Total size: %d (header: %d + size: %d)", sizeof(header) + size, sizeof(header), size);
     233         148 :         if(!contextHandler->exists(static_cast<void*>(unbuffered_socket))){return NETIO_STATUS_OK;}
     234         148 :         status = netio_send(unbuffered_socket, &unbuf_mem_regions[addr], unbuf_mem_regions[addr].data, sizeof(header) + size, (uint64_t)&unbuf_mem_regions[addr]);
     235         148 :     }
     236       14207 :     prom_send_done.set_value(status);
     237       14207 :     return status;
     238             : }
     239             : 
     240             : 
     241             : 
     242             : 
     243             : 
     244             : 
     245             : 
     246           7 : int FelixClient::send_evloop(netio_tag_t fid, const std::vector<const uint8_t*>& msgs, const std::vector<size_t>& sizes, std::promise<uint8_t> prom_send_done){
     247             : 
     248           7 :     if(contextHandler->getSocketState(fid) == DISCONNECTED){
     249           0 :         clog_error("Unbuffered socket is disconnected. Thread: 0x%x FID: 0x%x", std::this_thread::get_id(), fid);
     250           0 :         std::exception_ptr e = std::make_exception_ptr(FelixClientSendConnectionException());
     251           0 :         prom_send_done.set_exception(e);
     252           0 :         return 1;
     253           0 :     }
     254             : 
     255           7 :     int status = NETIO_STATUS_OK;
     256             : 
     257           7 :     if (contextHandler->getType(fid) == BUFFERED) {
     258           3 :         netio_buffered_send_socket* buffered_socket = get_buffered_send_socket(fid);
     259           3 :         clog_debug("Socket = 0x%x with state: %d", buffered_socket, contextHandler->getSocketState(fid));
     260           3 :         if(!contextHandler->exists(static_cast<void*>(buffered_socket))){return NETIO_STATUS_OK;}
     261             :         //Adding up messages
     262           3 :         ToFlxHeader header;
     263           6 :         size_t totalSize = accumulate(sizes.begin(), sizes.end(), 0) + sizeof(header) * msgs.size();
     264           3 :         uint8_t* buf = (uint8_t*)malloc(totalSize);
     265           3 :         size_t currentSize = 0;
     266             : 
     267          12 :         for(unsigned i = 0; i < msgs.size(); i++){
     268             : 
     269           9 :             header.length = sizes[i];
     270           9 :             header.reserved = 0;
     271           9 :             header.elink = get_elink(fid);
     272             : 
     273           9 :             memcpy(buf + currentSize, &header, sizeof(header));
     274           9 :             memcpy(buf + currentSize + sizeof(header), msgs[i], sizes[i]);
     275             : 
     276           9 :             currentSize += sizeof(header) + sizes[i];
     277             :         }
     278             : 
     279           3 :         clog_debug("Sending (buffered) for 0x%x", fid);
     280           3 :         clog_debug("Sending %d messages with the total size of %d", msgs.size(), totalSize);
     281           3 :         {
     282           3 :             status = netio_buffered_send(buffered_socket, (void*)buf, totalSize);
     283           3 :             clog_debug("Status %d", status);
     284           3 :             free(buf);
     285           3 :             netio_buffered_flush(buffered_socket);
     286             :         }
     287             :     } else {
     288             :         // Unbuffered
     289           4 :         std::string addr = contextHandler->getAddress(fid);
     290           4 :         netio_send_socket* unbuffered_socket = get_unbuffered_send_socket(fid);
     291           4 :         if(!contextHandler->exists(static_cast<void*>(unbuffered_socket))){return NETIO_STATUS_OK;}
     292           4 :         ToFlxHeader header;
     293           8 :         size_t totalSize = accumulate(sizes.begin(), sizes.end(), 0) + sizeof(header) * msgs.size();
     294           4 :         size_t currentSize = 0;
     295             : 
     296          16 :         for(unsigned i = 0; i < msgs.size(); i++){
     297             : 
     298          12 :             header.length = sizes[i];
     299          12 :             header.reserved = 0;
     300          12 :             header.elink = get_elink(fid);
     301             : 
     302          12 :             memcpy((uint8_t*)unbuf_mem_regions[addr].data + currentSize, &header, sizeof(header));
     303          12 :             memcpy((uint8_t*)unbuf_mem_regions[addr].data + currentSize + sizeof(header), msgs[i], sizes[i]);
     304             : 
     305          12 :             currentSize += sizeof(header) + sizes[i];
     306             :         }
     307             : 
     308           4 :         clog_debug("Sending (unbuffered) for 0x%x", fid);
     309           4 :         clog_debug("Sending %d messages with the total size of %d", msgs.size(), totalSize);
     310             : 
     311           4 :         {
     312           4 :             status = netio_send(unbuffered_socket, &unbuf_mem_regions[addr], unbuf_mem_regions[addr].data, totalSize, (uint64_t)&unbuf_mem_regions[addr]);
     313             :         }
     314             : 
     315           4 :     }
     316           7 :     prom_send_done.set_value(status);
     317           7 :     return status;
     318             : 
     319             : }
     320             : 
     321             : 
     322             : 
     323             : 
     324             : 
     325             : 
     326             : 
     327         329 : void FelixClient::establish_send_connection(uint64_t fid, std::promise<uint8_t> prom_sub_done){
     328         329 :     if (contextHandler->getType(fid) == BUFFERED) {
     329         173 :         netio_buffered_send_socket* buffered_socket = get_buffered_send_socket(fid);
     330         173 :         clog_debug("Buffered socket = 0x%p with state: %d", buffered_socket, contextHandler->getSocketState(fid));
     331         173 :         if(contextHandler->getSocketState(fid) == CONNECTING || contextHandler->getSocketState(fid) == CONNECTED){
     332         173 :             prom_sub_done.set_value(0);
     333         173 :             return;
     334             :         } else {
     335           0 :             clog_error("Buffered socket is disconnected. Evloop thread: 0x%x FID: 0x%x state is %d", std::this_thread::get_id(), fid, contextHandler->getSocketState(fid));
     336           0 :             std::exception_ptr e = std::make_exception_ptr(FelixClientSendConnectionException());
     337           0 :             prom_sub_done.set_exception(e);
     338           0 :             return;
     339           0 :         }
     340             :     } else {
     341         156 :         netio_send_socket* unbuffered_socket = get_unbuffered_send_socket(fid);
     342         156 :         clog_debug("Unbuffered socket = 0x%x with state: %d", unbuffered_socket, contextHandler->getSocketState(fid));
     343         156 :         if(contextHandler->getSocketState(fid) == CONNECTING || contextHandler->getSocketState(fid) == CONNECTED){
     344         156 :             prom_sub_done.set_value(0);
     345         156 :             return;
     346             :         } else {
     347           0 :             clog_error("Unbuffered socket is disconnected. Evloop thread: 0x%x FID: 0x%x", std::this_thread::get_id(), fid);
     348           0 :             std::exception_ptr e = std::make_exception_ptr(FelixClientSendConnectionException());
     349           0 :             prom_sub_done.set_exception(e);
     350           0 :             return;
     351           0 :         }
     352             :     }
     353             : 
     354             : }
     355             : 
     356             : 
     357             : 
     358       14211 : void FelixClient::send_data(netio_tag_t fid, const uint8_t* data, size_t size, bool flush) {
     359       14211 :     if(terminating_felix_client.load()){
     360           0 :         clog_debug("Already terminating, should not send data");
     361           0 :         return;
     362             :     }
     363             : 
     364       14211 :     try{
     365       14211 :         contextHandler->createOrUpdateInfo(fid, &bus, false);
     366           1 :     } catch (std::exception& e) {
     367           1 :         clog_error("Problem while reading bus: %s", e.what());
     368           1 :         throw FelixClientSendConnectionException();
     369           1 :     }
     370             : 
     371       14210 :     clog_debug("Address for FID 0x%x; %s:%d", fid, contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid));
     372             : 
     373       14210 :     if (!contextHandler->exists(fid)) {
     374           0 :         clog_error("Invalid Bus entry");
     375           0 :         throw FelixClientSendConnectionException();
     376             :     }
     377             : 
     378       14210 :     if(evloop_thread_id == std::this_thread::get_id()){
     379           0 :         send_data_intern(fid, data, size, flush);
     380           0 :         return;
     381             :     }
     382             : 
     383             :     //If there is no connection, we have to establish a new one through the evloop
     384       14210 :     if (contextHandler->getSocketState(fid) == DISCONNECTED || contextHandler->getSubscriptionState(fid) == TIMEOUT){ //What about CONNECTING?
     385         276 :         std::promise<uint8_t> prom_sub_done;
     386         276 :         std::future<uint8_t> future_sub_done = prom_sub_done.get_future();
     387         276 :         contextHandler->pushSendTask(SendConnectEvent{fid, std::move(prom_sub_done)});
     388         276 :         netio_signal_fire(&send_signal);
     389         276 :         try{
     390         276 :             std::chrono::milliseconds timeout (5000);
     391         276 :             if (future_sub_done.wait_for(timeout)==std::future_status::timeout){
     392           0 :                 clog_error("Timeout to establish send connection expired, stop blocking to prevent deadlock.");
     393           0 :                 throw FelixClientResourceNotAvailableException();
     394             :             }
     395         276 :             future_sub_done.get();
     396           0 :         } catch(std::exception& e){
     397           0 :             clog_info("Exception in evloop thread: %s thread ID: 0x%x FID: 0x%x", e.what(), std::this_thread::get_id(), fid);
     398           0 :             throw;
     399           0 :         }
     400         276 :     }
     401             : 
     402       14210 :     if(contextHandler->getSocketState(fid) == CONNECTING){
     403         270 :         if(!contextHandler->waitConnected(fid, 2000)){
     404          24 :             clog_error("Socket is not connected. Instead: %d on thread: 0x%x FID: 0x%x", contextHandler->getSocketState(fid), std::this_thread::get_id(), fid);
     405          24 :             throw FelixClientSendConnectionException();
     406             :         }
     407             :     }
     408             : 
     409       14186 :     ToFlxHeader header;
     410       14186 :     header.length = size;
     411       14186 :     header.reserved = 0;
     412       14186 :     header.elink = get_elink(fid);
     413       14186 :     int status = NETIO_STATUS_OK;
     414             : 
     415             :     //send
     416       14186 :     std::promise<uint8_t> prom_send_done;
     417       14186 :     std::future<uint8_t> future_send_done = prom_send_done.get_future();
     418             : 
     419       18077 :     contextHandler->pushSendTask(SendDataEvent{fid, header, data, size, flush, std::move(prom_send_done)});
     420       14186 :     netio_signal_fire(&send_signal);
     421             : 
     422             :     // wait for promise
     423             : 
     424       14186 :     try{
     425       14186 :         std::chrono::milliseconds timeout (5000);
     426       14186 :         if (future_send_done.wait_for(timeout)==std::future_status::timeout){
     427           0 :             clog_error("Send timeout expired, stop blocking to prevent deadlock."); //No information if send we executed later. Potentially data is corrupted from now on since function will return.
     428           0 :             throw FelixClientException("Send timeout expired");
     429             :         }
     430       14186 :         status = future_send_done.get();
     431           0 :     } catch(std::exception& e){
     432           0 :         clog_info("Exception in evloop thread: %s thread ID: 0x%x FID: 0x%x", e.what(), std::this_thread::get_id(), fid);
     433           0 :         throw;
     434           0 :     }
     435             : 
     436       14186 :     switch(status) {
     437       10295 :         case NETIO_STATUS_OK: return;
     438           0 :         case NETIO_STATUS_TOO_BIG: throw FelixClientMessageTooBigException();
     439        3891 :         case NETIO_STATUS_AGAIN: throw FelixClientResourceNotAvailableException();
     440           0 :         case NETIO_STATUS_PARTIAL: throw FelixClientPartiallyCompletedException();
     441           0 :         default: throw FelixClientException("Unknown status code: " + status);
     442             :     }
     443       14186 : }
     444             : 
     445           4 : void FelixClient::send_data(netio_tag_t fid, const std::vector<const uint8_t*>& msgs, const std::vector<size_t>& sizes){
     446           4 :     if(terminating_felix_client.load()){
     447           0 :         clog_debug("Already terminating, should not send data");
     448           0 :         return;
     449             :     }
     450             : 
     451           4 :     try{
     452           4 :         contextHandler->createOrUpdateInfo(fid, &bus, false);
     453           0 :     } catch (std::exception& e) {
     454           0 :         clog_error("Problem while reading bus: %s", e.what());
     455           0 :         throw FelixClientSendConnectionException();
     456           0 :     }
     457             : 
     458           4 :     if (!contextHandler->exists(fid)) {
     459           0 :         clog_error("Invalid Bus entry ");
     460           0 :         throw FelixClientSendConnectionException();
     461             :     }
     462             : 
     463           4 :     clog_info("Address %s", contextHandler->getAddress(fid).c_str());
     464           4 :     if (msgs.size() != sizes.size()){
     465           0 :         FelixClientException("Error! Number of messages differs from number of sizes");
     466             :     }
     467             : 
     468           4 :     if(evloop_thread_id == std::this_thread::get_id()){
     469           0 :         send_data_intern(fid, msgs, sizes);
     470           0 :         return;
     471             :     }
     472             : 
     473           4 :     if (contextHandler->getSocketState(fid) == DISCONNECTED || contextHandler->getSubscriptionState(fid) == TIMEOUT){ //What about CONNECTIONG?
     474           4 :         std::promise<uint8_t> prom_sub_done;
     475           4 :         std::future<uint8_t> future_sub_done = prom_sub_done.get_future();
     476           4 :         contextHandler->pushSendTask(SendConnectEvent{fid, std::move(prom_sub_done)});
     477           4 :         netio_signal_fire(&send_signal);
     478           4 :         try{
     479           4 :             std::chrono::milliseconds timeout (5000);
     480           4 :             if (future_sub_done.wait_for(timeout)==std::future_status::timeout){
     481           0 :                 clog_error("Timeout to establish send connection expired, stop blocking to prevent deadlock.");
     482           0 :                 throw FelixClientResourceNotAvailableException();
     483             :             }
     484           4 :             future_sub_done.get();
     485           0 :         } catch(std::exception& e){
     486           0 :             clog_info("Exception in evloop thread: %s thread ID: 0x%x FID: 0x%x", e.what(), std::this_thread::get_id(), fid);
     487           0 :             throw;
     488           0 :         }
     489           4 :     }
     490             : 
     491           4 :     if(contextHandler->getSocketState(fid) == CONNECTING){
     492           4 :         if(!contextHandler->waitConnected(fid, 2000)){
     493           0 :             clog_error("Socket is not connected. Instead: %d on thread: 0x%x FID: 0x%x", contextHandler->getSocketState(fid), std::this_thread::get_id(), fid);
     494           0 :             throw FelixClientSendConnectionException();
     495             :         }
     496             :     }
     497             : 
     498           4 :     std::promise<uint8_t> prom_send_done;
     499           4 :     std::future<uint8_t> future_send_done = prom_send_done.get_future();
     500           4 :     contextHandler->pushSendTask(SendDataVectorEvent{fid, msgs, sizes, std::move(prom_send_done)});
     501           4 :     netio_signal_fire(&send_signal);
     502             : 
     503           4 :     int status = NETIO_STATUS_OK;
     504             :     // wait for promise
     505             : 
     506           4 :     try{
     507           4 :         std::chrono::milliseconds timeout (5000);
     508           4 :         if (future_send_done.wait_for(timeout)==std::future_status::timeout){
     509           0 :             clog_error("Send timeout expired, stop blocking to prevent deadlock."); //No information if send we executed later. Potentially data is corrupted from now on since function will return.
     510           0 :             throw FelixClientException("Send timeout expired");
     511             :         }
     512           4 :         status = future_send_done.get();
     513           0 :     } catch(std::exception& e){
     514           0 :         clog_info("Exception in evloop thread: %s thread ID: 0x%x FID: 0x%x", e.what(), std::this_thread::get_id(), fid);
     515           0 :         throw;
     516           0 :     }
     517             : 
     518           4 :     switch(status) {
     519           4 :     case NETIO_STATUS_OK: return;
     520           0 :     case NETIO_STATUS_TOO_BIG: throw FelixClientMessageTooBigException();
     521           0 :     case NETIO_STATUS_AGAIN: throw FelixClientResourceNotAvailableException();
     522           0 :     case NETIO_STATUS_PARTIAL: throw FelixClientPartiallyCompletedException();
     523           0 :     default: throw FelixClientException("Unknown status code: " + status);
     524             :     }
     525           4 : }
     526             : 
     527             : 
     528          21 : void FelixClient::send_data_nb(netio_tag_t fid, const uint8_t* data, size_t size, bool flush) {
     529          21 :     if(terminating_felix_client.load()){
     530           0 :         clog_debug("Already terminating, should not send data");
     531           0 :         return;
     532             :     }
     533             : 
     534          21 :     if (!contextHandler->exists(fid) || contextHandler->getSocketState(fid) != CONNECTED) {
     535           0 :         clog_error("No corresponding send connection for this FID. Have you tried init_send_data?");
     536           0 :         throw FelixClientSendConnectionException();
     537             :     }
     538             : 
     539          21 :     std::vector<uint8_t> data_copy(data, data+size);
     540             : 
     541          21 :     ToFlxHeader header;
     542          21 :     header.length = size;
     543          21 :     header.reserved = 0;
     544          21 :     header.elink = get_elink(fid);
     545             : 
     546             :     //send
     547          21 :     std::promise<uint8_t> prom_send_done;
     548             : 
     549          21 :     contextHandler->pushSendTask(SendDataNbEvent{fid, header, std::move(data_copy), size, flush, std::move(prom_send_done)});
     550          21 :     netio_signal_fire(&send_signal);
     551          21 : }
     552             : 
     553           3 : void FelixClient::send_data_nb(netio_tag_t fid, const std::vector<const uint8_t*>& msgs, const std::vector<size_t>& sizes){
     554           3 :     if(terminating_felix_client.load()){
     555           0 :         clog_debug("Already terminating, should not send data");
     556           0 :         return;
     557             :     }
     558             : 
     559           3 :     if (!contextHandler->exists(fid) || contextHandler->getSocketState(fid) != CONNECTED) {
     560           0 :         clog_error("No corresponding send connection for this FID. Have you tried init_send_data?");
     561           0 :         throw FelixClientSendConnectionException();
     562             :     }
     563             : 
     564           3 :     if (msgs.size() != sizes.size()){
     565           0 :         FelixClientException("Error! Number of messages differs from number of sizes");
     566             :     }
     567             : 
     568           3 :     std::vector<std::vector<uint8_t>> data;
     569           3 :     std::vector<const uint8_t*> msgs_copy;
     570          12 :     for (size_t i = 0; i < msgs.size(); i++){
     571           9 :         data.emplace_back(msgs[i], msgs[i]+sizes[i]);
     572           9 :         msgs_copy.push_back(&data[i][0]);
     573             :     }
     574             : 
     575           3 :     std::vector<uint64_t> size_copy(sizes);
     576             : 
     577           3 :     std::promise<uint8_t> prom_send_done;
     578           3 :     contextHandler->pushSendTask(SendDataVectorNbEvent{fid, std::move(data), std::move(msgs_copy), std::move(size_copy), std::move(prom_send_done)});
     579           3 :     netio_signal_fire(&send_signal);
     580           3 : }
     581             : 
     582             : 
     583             : //We need non-blocking version of send data for tests that only run FelixClient instead of FelixClientThread
     584           0 : void FelixClient::send_data_intern(netio_tag_t fid, const uint8_t* data, size_t size, bool flush) {
     585             : 
     586           0 :     ToFlxHeader header;
     587           0 :     header.length = size;
     588           0 :     header.reserved = 0;
     589           0 :     header.elink = get_elink(fid);
     590           0 :     int status = NETIO_STATUS_OK;
     591           0 :     std::promise<uint8_t> dummy_prom_sub;
     592           0 :     std::promise<uint8_t> dummy_prom_send;
     593             : 
     594           0 :     if((contextHandler->getSocketState(fid) != CONNECTING) || (contextHandler->getSocketState(fid) != CONNECTED)){
     595           0 :         try{
     596           0 :             establish_send_connection(fid, std::move(dummy_prom_sub));
     597           0 :         } catch(std::exception& e){
     598           0 :             clog_info("Exception while establishing connection: %s thread ID: 0x%x FID: 0x%x", e.what(), std::this_thread::get_id(), fid);
     599           0 :             throw;
     600           0 :         }
     601             :     }
     602             : 
     603           0 :     if(contextHandler->getSocketState(fid) == CONNECTING){
     604           0 :         contextHandler->pushSendTask(SendDataInternEvent{fid, header, data, size, flush, std::move(dummy_prom_send),1});
     605           0 :         netio_signal_fire(&send_signal);
     606             :         return;
     607             :     }
     608             : 
     609           0 :     try{
     610           0 :         status = send_evloop(fid, header, data, size, flush, std::move(dummy_prom_send));
     611           0 :     } catch(std::exception& e){
     612           0 :         clog_info("Exception while sending: %s thread ID: 0x%x FID: 0x%x", e.what(), std::this_thread::get_id(), fid);
     613           0 :         throw;
     614           0 :     }
     615             : 
     616           0 :     switch(status) {
     617             :         case NETIO_STATUS_OK: return;
     618           0 :         case NETIO_STATUS_TOO_BIG: throw FelixClientMessageTooBigException();
     619           0 :         case NETIO_STATUS_AGAIN: throw FelixClientResourceNotAvailableException();
     620           0 :         case NETIO_STATUS_PARTIAL: throw FelixClientPartiallyCompletedException();
     621           0 :         default: throw FelixClientException("Unknown status code: " + status);
     622             :     }
     623           0 : }
     624             : 
     625             : 
     626             : //We need non-blocking version of send data for tests that only run FelixClient instead of FelixClientThread
     627           0 : void FelixClient::send_data_intern(netio_tag_t fid, const std::vector<const uint8_t*>& msgs, const std::vector<size_t>& sizes) {
     628             : 
     629           0 :     int status = NETIO_STATUS_OK;
     630           0 :     std::promise<uint8_t> dummy_prom_sub;
     631           0 :     std::promise<uint8_t> dummy_prom_send;
     632             : 
     633           0 :     if((contextHandler->getSocketState(fid) != CONNECTING) || (contextHandler->getSocketState(fid) != CONNECTED)){
     634           0 :         try{
     635           0 :             establish_send_connection(fid, std::move(dummy_prom_sub));
     636           0 :         } catch(std::exception& e){
     637           0 :             clog_info("Exception while establishing connection: %s thread ID: 0x%x FID: 0x%x", e.what(), std::this_thread::get_id(), fid);
     638           0 :             throw;
     639           0 :         }
     640             :     }
     641             : 
     642           0 :     if(contextHandler->getSocketState(fid) == CONNECTING){
     643           0 :         contextHandler->pushSendTask(SendDataInternVectorEvent{fid, msgs, sizes, std::move(dummy_prom_send), 1});
     644           0 :         netio_signal_fire(&send_signal);
     645             :         return;
     646             :     }
     647             : 
     648           0 :     try{
     649           0 :         status = send_evloop(fid, msgs, sizes, std::move(dummy_prom_send));
     650           0 :     } catch(std::exception& e){
     651           0 :         clog_info("Exception while sending: %s thread ID: 0x%x FID: 0x%x", e.what(), std::this_thread::get_id(), fid);
     652           0 :         throw;
     653           0 :     }
     654             : 
     655           0 :     switch(status) {
     656             :         case NETIO_STATUS_OK: return;
     657           0 :         case NETIO_STATUS_TOO_BIG: throw FelixClientMessageTooBigException();
     658           0 :         case NETIO_STATUS_AGAIN: throw FelixClientResourceNotAvailableException();
     659           0 :         case NETIO_STATUS_PARTIAL: throw FelixClientPartiallyCompletedException();
     660           0 :         default: throw FelixClientException("Unknown status code: " + status);
     661             :     }
     662           0 : }
     663             : 
     664             : 
     665       14519 : void FelixClient::exec_send(){
     666       14519 :     send_var_t task;
     667       14519 :     contextHandler->pullSendTask(task);
     668             : 
     669       14519 :     std::visit(overloaded {
     670           0 :         [](std::monostate const&) {
     671           0 :             clog_info("Nothing to send. Send task queue is empty.");
     672             :         },
     673       14186 :         [this](SendDataEvent& sendData) {
     674       14186 :             this->send_evloop(sendData.fid, sendData.header, sendData.data, sendData.size, sendData.flush, std::move(sendData.prom_send_done));
     675       14186 :         },
     676          21 :         [this](SendDataNbEvent& sendNbData) {
     677          21 :             this->send_evloop(sendNbData.fid, sendNbData.header, &sendNbData.data[0], sendNbData.size, sendNbData.flush, std::move(sendNbData.prom_send_done));
     678          21 :         },
     679           4 :         [this](SendDataVectorEvent& sendDataVector) {
     680           4 :             this->send_evloop(sendDataVector.fid, sendDataVector.msgs, sendDataVector.sizes, std::move(sendDataVector.prom_send_done));
     681           4 :         },
     682           3 :         [this](SendDataVectorNbEvent& sendDataNbVector) {
     683           3 :             this->send_evloop(sendDataNbVector.fid, sendDataNbVector.msgs, sendDataNbVector.sizes, std::move(sendDataNbVector.prom_send_done));
     684           3 :         },
     685         305 :         [this](SendConnectEvent& sendConnect) {
     686         305 :             this->establish_send_connection(sendConnect.fid, std::move(sendConnect.prom_sub_done));
     687         305 :         },
     688           0 :         [this](SendDataInternEvent& sendData) {
     689           0 :             if(contextHandler->getSocketState(sendData.fid) == CONNECTED){
     690           0 :                 this->send_evloop(sendData.fid, sendData.header, sendData.data, sendData.size, sendData.flush, std::move(sendData.prom_send_done));
     691             :             }else{
     692           0 :                 if(sendData.count< maxSendAttempts){
     693           0 :                     contextHandler->pushSendTask(SendDataInternEvent{sendData.fid, sendData.header, sendData.data, sendData.size, sendData.flush, std::move(sendData.prom_send_done), ++sendData.count});
     694             :                 }
     695             :             }
     696           0 :         },
     697           0 :         [this](SendDataInternVectorEvent& sendDataVector) {
     698           0 :             if(contextHandler->getSocketState(sendDataVector.fid) == CONNECTED){
     699           0 :                 this->send_evloop(sendDataVector.fid, sendDataVector.msgs, sendDataVector.sizes, std::move(sendDataVector.prom_send_done));
     700             :             }else{
     701           0 :                 if(sendDataVector.count < maxSendAttempts){
     702           0 :                     contextHandler->pushSendTask(SendDataInternVectorEvent{sendDataVector.fid, sendDataVector.msgs, sendDataVector.sizes, std::move(sendDataVector.prom_send_done), ++sendDataVector.count});
     703             :                 }
     704             :             }
     705           0 :         },
     706             :     }, task);
     707       14519 : }
     708             : 
     709             : 
     710        4085 : void FelixClient::exec_sub(){
     711        4085 :     sub_var_t task;
     712        4085 :     contextHandler->pullSubTask(task);
     713             : 
     714        4085 :     std::visit(overloaded {
     715           0 :         [](std::monostate const&) {
     716           0 :             clog_info("Nothing to send. Send task queue is empty.");
     717             :         },
     718        1978 :         [this](UnsubEvent& unsub) {
     719        1978 :             this->send_unsubscribe_msg(unsub.fid, std::move(unsub.prom_unsub_done));
     720        1978 :         },
     721        2107 :         [this](SubscribeEvent const& sub) {
     722        2107 :             this->subscribe_evloop(sub.fid);
     723             :         },
     724             :     }, task);
     725        4085 : }
     726             : 
     727             : 
     728          25 : void FelixClient::init_send_data(netio_tag_t fid) {
     729          25 :     clog_info("init_send_data for FID 0x%x", fid);
     730          25 :     if(terminating_felix_client.load()){
     731           0 :         clog_debug("Already terminating, should not send data");
     732           0 :         return;
     733             :     }
     734             : 
     735          25 :     try{
     736          25 :         contextHandler->createOrUpdateInfo(fid, &bus, false);
     737           0 :     } catch (std::exception& e) {
     738           0 :         clog_error("Problem while reading bus: %s", e.what());
     739           0 :         throw FelixClientSendConnectionException();
     740           0 :     }
     741             : 
     742          25 :     clog_debug("Address for FID 0x%x; %s:%d", fid, contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid));
     743             : 
     744          25 :     if (!contextHandler->exists(fid)) {
     745           0 :         clog_error("Invalid Bus entry");
     746           0 :         throw FelixClientSendConnectionException();
     747             :     }
     748             : 
     749             :     //If there is no connection, we have to establish a new one through the evloop
     750          25 :     if (contextHandler->getSocketState(fid) == DISCONNECTED || contextHandler->getSubscriptionState(fid) == TIMEOUT){ //What about CONNECTING?
     751          25 :         std::promise<uint8_t> prom_sub_done;
     752          25 :         contextHandler->pushSendTask(SendConnectEvent{fid, std::move(prom_sub_done)});
     753          25 :         netio_signal_fire(&send_signal);
     754          25 :     }
     755             : 
     756             :     return;
     757             : }
     758             : 
     759         391 : void FelixClient::init_subscribe(netio_tag_t fid) {
     760         391 :     clog_info("init_subscribe not implemented, subscribe will do the job");
     761         391 :     return;
     762             : }
     763             : 
     764             : 
     765       14519 : void FelixClient::on_send_signal(void* ptr){
     766       14519 :     clog_trace("On send signal");
     767       14519 :     static_cast<FelixClient*>(ptr)->exec_send();
     768       14519 : }
     769             : 
     770        4085 : void FelixClient::on_sub_signal(void* ptr){
     771        4085 :     clog_debug("On sub signal");
     772        4085 :     static_cast<FelixClient*>(ptr)->exec_sub();
     773        4085 : }
     774             : 
     775             : 
     776           0 : void FelixClient::send_data(netio_tag_t fid, struct iovec* iov, unsigned n, bool flush) {
     777           0 :     clog_info("send_msg(netio_tag_t fid, struct iovec* iov, unsigned n): not implemented");
     778           0 :     throw FelixClientSendConnectionException();
     779             : }
     780             : 
     781         586 : int FelixClient::subscribe(const std::vector<netio_tag_t>& fids, uint timeoutms, bool for_register) {
     782             : 
     783         586 :     std::vector<netio_tag_t> all_fids;
     784         586 :     clog_debug("Subscribing to %d FIDs", fids.size());
     785         586 :     uint8_t alreadySubscribedCounter = 0;
     786        2725 :     for(auto& fid : fids){
     787             :         // Already subscribed
     788        2139 :         if (!contextHandler->canSubscribe(fid)) {
     789          24 :             clog_info("Already subscribed to 0x%x", fid);
     790          24 :             alreadySubscribedCounter++;
     791          24 :             continue;
     792             :         }
     793        2115 :         if (fid & 0x8000){
     794           0 :             clog_info("Can't subscribe to toflx fid: 0x%x", fid);
     795           0 :             continue;
     796             :         }
     797             : 
     798        2115 :         try{
     799        2115 :             contextHandler->createOrUpdateInfo(fid, &bus, for_register);
     800        2107 :             struct timespec t0;
     801        2107 :             clock_gettime(CLOCK_MONOTONIC_RAW, &t0);
     802        4214 :             contextHandler->setSubscriptionTimes(fid, "subscribe", t0);
     803        2107 :             contextHandler->setSubscriptionState(fid, SUBING);
     804           8 :         } catch(...){
     805           8 :             continue;
     806           8 :         }
     807        2107 :         all_fids.push_back(fid);
     808        2107 :         contextHandler->addToFidsToResubscribe(fid);
     809             :     }
     810         586 :     if (fids.size() == alreadySubscribedCounter){
     811             :         return FELIX_CLIENT_STATUS_ALREADY_DONE;
     812             :     }
     813             : 
     814         582 :     uint64_t actual_delay = 1000;
     815         582 :     netio_timer_start_ms(&subscribe_timer, actual_delay);
     816         582 :     if(evloop_thread_id == std::this_thread::get_id()){
     817           0 :         for (const auto& fid : all_fids){
     818           0 :             if (!subscribe_evloop(fid)){
     819           0 :                 if (timeoutms == 0) {
     820         586 :                     return FELIX_CLIENT_STATUS_OK;
     821             :                 }
     822           0 :                 return FELIX_CLIENT_STATUS_TIMEOUT;
     823             :             }
     824             :         }
     825             :     } else {
     826        2689 :         for (auto& fid : all_fids){
     827        2107 :             contextHandler->pushSubTask(SubscribeEvent{fid});
     828        2107 :             netio_signal_fire(&sub_signal);
     829             :         }
     830             :     }
     831             : 
     832         582 :     if (timeoutms == 0) {
     833             :        return FELIX_CLIENT_STATUS_OK;
     834             :     }
     835         420 :     std::vector<netio_tag_t> not_subscribed_fids;
     836             : 
     837         420 :     if(!contextHandler->isSubscribed(all_fids, timeoutms)){
     838         483 :         for (auto& fid : all_fids){
     839         478 :             if(!contextHandler->isSubscribed(fid)){
     840           0 :                 not_subscribed_fids.push_back(fid);
     841           0 :                 contextHandler->setSubscriptionState(fid, TIMEOUT);
     842             :             }
     843             :         }
     844             :     }
     845             : 
     846         420 :     if (not_subscribed_fids.empty()) {
     847         420 :         if(fids.size() > all_fids.size() + alreadySubscribedCounter){
     848             :             return FELIX_CLIENT_STATUS_TIMEOUT;
     849             :         }
     850         412 :         return FELIX_CLIENT_STATUS_OK;
     851             :     }
     852             :     return FELIX_CLIENT_STATUS_TIMEOUT;
     853             : 
     854         586 : }
     855             : 
     856         544 : int FelixClient::subscribe(netio_tag_t fid, uint timeoutms, bool for_register) {
     857         544 :     if(terminating_felix_client.load()){
     858           0 :         clog_debug("Already terminating, should not subscribe anymore");
     859           0 :         return FELIX_CLIENT_STATUS_OK;
     860             :     }
     861         544 :     std::vector<uint64_t> fids{fid};
     862         544 :     return subscribe(fids, timeoutms, for_register);
     863         544 : }
     864             : 
     865        2156 : bool FelixClient::subscribe_evloop(netio_tag_t fid) {
     866        2156 :     try{
     867        2156 :         contextHandler->createOrUpdateInfo(fid, &bus);
     868        2156 :         struct timespec t1;
     869        2156 :         clock_gettime(CLOCK_MONOTONIC_RAW, &t1);
     870        4312 :         contextHandler->setSubscriptionTimes(fid, "subscribe_evloop", t1);
     871           0 :     } catch(const std::exception& e){
     872           0 :         return false;
     873           0 :     }
     874        2156 :     int ret;
     875        2156 :     clog_trace("type (0=buffered, 1=unbuffered) %d", contextHandler->getType(fid));
     876             : 
     877        2156 :     if (contextHandler->getType(fid) == BUFFERED) {
     878        1093 :         netio_subscribe_socket* buffered_socket = get_buffered_subscribe_socket(fid);
     879        1093 :         clog_debug("(Re-)subscribing (buffered) for fid 0x%x and socket 0x%p", fid, buffered_socket);
     880        1093 :         ret = netio_subscribe(buffered_socket, fid);
     881        1093 :         clog_debug("(Re-)subscribing (buffered) for 0x%x with send socket has been done", fid);
     882             : 
     883             :     } else {
     884        1063 :         netio_unbuffered_subscribe_socket* unbuffered_socket = get_unbuffered_subscribe_socket(fid);
     885        1063 :         clog_debug("(Re-)subscribing (unbuffered) for 0x%x and socket 0x%p", fid, unbuffered_socket);
     886        1063 :         clog_debug("Subscribed to unbuffered subscribe socket. Netio_page_size: %d netio pages: %d", unbuffered_socket->recv_socket.attr.buffer_size, unbuffered_socket->recv_socket.attr.num_buffers);
     887        1063 :         ret = netio_unbuffered_subscribe(unbuffered_socket, fid);
     888             :     }
     889        2156 :     if(ret == NETIO_STATUS_AGAIN){
     890           0 :         contextHandler->updateFidsWhenUnsub(fid);
     891           0 :         contextHandler->pushSubTask(SubscribeEvent{fid});
     892           0 :         netio_signal_fire(&sub_signal);
     893           0 :         return false;
     894        2156 :     } else if(ret == NETIO_STATUS_ERROR){
     895             :         return false;
     896             :     }
     897        2156 :     if(contextHandler->getSocketState(fid) != CONNECTED && contextHandler->getSocketState(fid) != CONNECTING){
     898           0 :         contextHandler->setSocketState(fid, CONNECTING);
     899             :     }
     900        2156 :     if(contextHandler->getSocketState(fid) == CONNECTED){
     901        1107 :         contextHandler->setSubscriptionState(fid, SUB);
     902        1107 :         contextHandler->removeFromFidsToResubscribe(fid);
     903        1107 :         struct timespec t2;
     904        1107 :         clock_gettime(CLOCK_MONOTONIC_RAW, &t2);
     905        1107 :         std::unordered_map<std::string, timespec> subscriptionTimes = contextHandler->getSubscriptionTimes(fid);
     906             : 
     907        2214 :         double subscribe_useconds = (t2.tv_sec - subscriptionTimes["subscribe"].tv_sec)*1e6
     908        2214 :                                     + (t2.tv_nsec - subscriptionTimes["subscribe"].tv_nsec)*1e-3;
     909        1107 :         clog_debug("Beginning of subscribe() until on_connect for fid 0x%x took %fus", fid, subscribe_useconds);
     910             : 
     911        2214 :         double subscribe_evloop_useconds = (t2.tv_sec - subscriptionTimes["subscribe_evloop"].tv_sec)*1e6
     912        2214 :                                             + (t2.tv_nsec - subscriptionTimes["subscribe_evloop"].tv_nsec)*1e-3;
     913        1107 :         clog_debug("Beginning of subscribe_evloop() until on_connect for fid 0x%x took %fus", fid, subscribe_evloop_useconds);
     914        1107 :         if (cb_on_connect) {
     915        1107 :             (cb_on_connect)(fid);
     916             :         }
     917        1107 :     }
     918             :     return true;
     919             : }
     920             : 
     921        2013 : void FelixClient::send_unsubscribe_msg(uint64_t fid, std::promise<uint8_t> prom_unsub_done){
     922        2013 :     void* socket;
     923        2013 :     int ret = 0;
     924        2013 :     if (contextHandler->getType(fid) == BUFFERED) {
     925        1022 :         struct netio_subscribe_socket* buffered_socket;
     926        1022 :         buffered_socket = contextHandler->getSocket<netio_subscribe_socket*>(fid);
     927        1022 :         socket = static_cast<void*>(buffered_socket);
     928        1022 :         if(!contextHandler->exists(socket) || contextHandler->getSubscriptionState(fid) == UNSUB){prom_unsub_done.set_value(1);return;}
     929        1022 :         clog_info("Unsubscribing (buffered) for 0x%x from %s and socket 0x%p", fid, get_address(buffered_socket).c_str(), buffered_socket);
     930        1022 :         ret = netio_unsubscribe(buffered_socket, fid);
     931             :     } else {
     932         991 :         struct netio_unbuffered_subscribe_socket* unbuffered_socket;
     933         991 :         unbuffered_socket = contextHandler->getSocket<netio_unbuffered_subscribe_socket*>(fid);
     934         991 :         socket = static_cast<void*>(unbuffered_socket);
     935         991 :         if(!contextHandler->exists(socket) || contextHandler->getSubscriptionState(fid) == UNSUB){prom_unsub_done.set_value(1);return;}
     936         991 :         clog_info("Unsubscribing (unbuffered) for 0x%x from %s", fid, get_address(unbuffered_socket).c_str());
     937         991 :         ret = netio_unbuffered_unsubscribe(unbuffered_socket, fid);
     938             :     }
     939        2013 :     if(ret == NETIO_STATUS_AGAIN){
     940           0 :         contextHandler->pushSubTask(UnsubEvent{fid, std::move(prom_unsub_done)});
     941           0 :         netio_signal_fire(&sub_signal);
     942           0 :         return;
     943        2013 :     } else if(ret == NETIO_STATUS_ERROR){
     944           0 :         prom_unsub_done.set_value(2);
     945             :     }
     946        2013 :     contextHandler->updateFidsWhenUnsub(fid);
     947        2013 :     contextHandler->setSubscriptionState(fid, UNSUB);
     948        2013 :     if(contextHandler->areAllFidsUnsubscribed(socket)){
     949        1824 :         contextHandler->setSocketToAllUnsubscribed(socket);
     950             :     }
     951        2013 :     prom_unsub_done.set_value(0);
     952             : }
     953             : 
     954             : 
     955        2178 : int FelixClient::unsubscribe(netio_tag_t fid) {
     956        2178 :     clog_info("Unsubscribing from 0x%x", fid);
     957        2178 :     if (fid & 0x8000){
     958             :         return FELIX_CLIENT_STATUS_OK;
     959             :     }
     960             : 
     961        1978 :     if(evloop_thread_id == std::this_thread::get_id()){
     962           0 :         send_unsubscribe_msg(fid);
     963           0 :         return FELIX_CLIENT_STATUS_OK;
     964             :     }
     965        1978 :     if(!contextHandler->exists(fid)){
     966             :         return FELIX_CLIENT_STATUS_OK;
     967             :     }
     968             : 
     969        1978 :     std::promise<uint8_t> prom_unsub_done;
     970        1978 :     std::future<uint8_t> future_unsub_done = prom_unsub_done.get_future();
     971        1978 :     uint8_t status = FELIX_CLIENT_STATUS_OK;
     972        1978 :     if (contextHandler->getSubscriptionState(fid) == SUB || contextHandler->getSubscriptionState(fid) == SUBING){
     973        1978 :         contextHandler->setSubscriptionState(fid, UNSUBING);
     974        1978 :         contextHandler->pushSubTask(UnsubEvent{fid, std::move(prom_unsub_done)});
     975        1978 :         netio_signal_fire(&sub_signal);
     976             : 
     977        1978 :         try{
     978        1978 :             std::chrono::milliseconds timeout (5000);
     979        1978 :             if (future_unsub_done.wait_for(timeout)==std::future_status::timeout){
     980           0 :                 clog_error("Unsubscribe timeout expired, stop blocking to prevent deadlock.");
     981           0 :                 return FELIX_CLIENT_STATUS_TIMEOUT;
     982             :             }
     983        1978 :             status = future_unsub_done.get();
     984           0 :         } catch(std::exception& e){
     985           0 :             clog_info("Exception while unsubscribing: %s FID: 0x%x", e.what(), fid);
     986           0 :             throw;
     987           0 :         }
     988             :     }
     989        1978 :     return status; //0 = OK, 1 = Already done
     990        1978 : }
     991             : 
     992             : 
     993         195 : void FelixClient::exec(const UserFunction &user_function ) {
     994         195 :     struct netio_signal* signal = (struct netio_signal*)malloc(sizeof(struct netio_signal)); // c style
     995         195 :     struct SignalData* data = new SignalData(); // C++ style (struct SignalData*)malloc(sizeof(struct SignalData)); Does this need to be cleaned up?
     996         195 :     data->signal = signal;
     997         195 :     data->user_function = user_function;
     998         195 :     data->evloop = &ctx.evloop;
     999             : 
    1000         195 :     netio_signal_init(&ctx.evloop, signal);
    1001         195 :     signal->data = data;
    1002         195 :     signal->cb = on_signal;
    1003             : 
    1004         195 :     netio_signal_fire(signal);
    1005         195 : }
    1006             : 
    1007             : 
    1008          52 : netio_tag_t FelixClient::get_ctrl_fid(netio_tag_t fid) {
    1009          52 :     uint8_t sid = 0;
    1010          52 :     uint8_t to_flx = 1;
    1011          52 :     uint8_t virt = 1;
    1012         104 :     netio_tag_t ctrl_fid = get_fid_from_ids(get_did(fid),
    1013             :                                             get_cid(fid),
    1014             :                                             COMMAND_REPLY_LINK,
    1015             :                                             sid,
    1016          52 :                                             get_vid(fid),
    1017             :                                             to_flx,
    1018             :                                             virt);
    1019          52 :     return ctrl_fid;
    1020             : }
    1021             : 
    1022          33 : netio_tag_t FelixClient::get_subscribe_fid(netio_tag_t fid) {
    1023          33 :     uint8_t sid = 0;
    1024          33 :     uint8_t to_flx = 0;
    1025          33 :     uint8_t virt = 1;
    1026          66 :     netio_tag_t subscribe_fid = get_fid_from_ids(get_did(fid),
    1027             :                                                  get_cid(fid),
    1028             :                                                  COMMAND_REPLY_LINK,
    1029             :                                                  sid,
    1030          33 :                                                  get_vid(fid),
    1031             :                                                  to_flx,
    1032             :                                                  virt);
    1033          33 :     return subscribe_fid;
    1034             : }
    1035             : 
    1036             : 
    1037          20 : FelixClientThread::Status FelixClient::send_cmd(const std::vector<uint64_t>& fids,
    1038             :                                                          FelixClientThread::Cmd cmd,
    1039             :                                                          const std::vector<std::string>& cmd_args,
    1040             :                                                          std::vector<FelixClientThread::Reply>& replies) {
    1041          20 :     clog_debug("send_cmd");
    1042          20 :     if(terminating_felix_client.load()){
    1043           0 :         clog_debug("Already terminating not sending command");
    1044           0 :         return FelixClientThread::Status::OK;
    1045             :     }
    1046             : 
    1047             :     // No commands to be send, all OK
    1048          20 :     if(fids.size() == 0) {
    1049             :         return FelixClientThread::Status::OK;
    1050             :     }
    1051             : 
    1052          20 :     std::string message;
    1053          20 :     FelixClientThread::Status status = FelixClientThread::Status::OK;
    1054             : 
    1055             :     // check the command and command arguments
    1056          20 :     switch(cmd) {
    1057           0 :         case FelixClientThread::NOOP:
    1058           0 :             if (cmd_args.size() != 0) {
    1059           0 :                 message = std::string("Too many arguments for NOOP, needs 0 while %d given.", cmd_args.size());
    1060           0 :                 status = FelixClientThread::Status::ERROR_INVALID_ARGS;
    1061           0 :                 break;
    1062             :             }
    1063             :             break;
    1064          14 :         case FelixClientThread::GET:
    1065          14 :             if (cmd_args.size() != 1) {
    1066           1 :                 message = std::string("Not enough arguments for GET, needs 1 while %d given.", cmd_args.size());
    1067           1 :                 status = FelixClientThread::Status::ERROR_INVALID_ARGS;
    1068           1 :                 break;
    1069             :             }
    1070             :             break;
    1071           2 :         case FelixClientThread::SET:
    1072           2 :             if (cmd_args.size() != 2) {
    1073           0 :                 message = std::string("Not enough arguments for SET, needs 2 while %d given.", cmd_args.size());
    1074           0 :                 status = FelixClientThread::Status::ERROR_INVALID_ARGS;
    1075           0 :                 break;
    1076             :             }
    1077             :             break;
    1078           3 :         case FelixClientThread::ECR_RESET:
    1079           3 :             if (cmd_args.size() != 1) {
    1080           0 :                 message = std::string("Not enough arguments for ECR_RESET, needs 1 while", cmd_args.size());
    1081           0 :                 status = FelixClientThread::Status::ERROR_INVALID_ARGS;
    1082           0 :                 break;
    1083             :             }
    1084             :             break;
    1085           1 :         default:
    1086           1 :             message = "Cmd not available: " + FelixClientThread::to_string(cmd);
    1087           1 :             status = FelixClientThread::Status::ERROR_INVALID_CMD;
    1088           1 :             break;
    1089             :     }
    1090             : 
    1091             : 
    1092             :     // deduce set of ctrl_fids from list of normal fids
    1093          40 :     std::unordered_map<netio_tag_t, netio_tag_t> ctrl_by_subscribe_fid;
    1094          40 :     std::set<uint64_t> sub_fids_set;
    1095          20 :     clog_info("Looking up %d subscribe FIDs", fids.size());
    1096          53 :     for(auto const& fid : fids) {
    1097          33 :         netio_tag_t ctrl_fid = get_ctrl_fid(fid);
    1098          33 :         netio_tag_t sub_fid = get_subscribe_fid(ctrl_fid);
    1099          33 :         sub_fids_set.insert(sub_fid);
    1100          33 :         std::error_code ec;
    1101          33 :         felixbus::FelixBusInfo bus_info = bus.get_info(sub_fid, ec);
    1102             : 
    1103          33 :         if( bus_info.ip != "" ){
    1104          31 :             ctrl_by_subscribe_fid[sub_fid] = ctrl_fid;
    1105             :         }
    1106             :         else{
    1107           2 :             clog_error("Discarding ctrl fid 0x%x because reply fid 0x%x does not appear in the bus", ctrl_fid, sub_fid);
    1108           2 :             FelixClientThread::Reply reply;
    1109           2 :             reply.ctrl_fid = ctrl_fid;
    1110           2 :             reply.status = FelixClientThread::Status::ERROR_NO_SUBSCRIPTION;
    1111           2 :             reply.message = "Invalid fid, not found in bus";
    1112           2 :             reply.value = 0;
    1113           2 :             replies.push_back(reply);
    1114           2 :         }
    1115          33 :     }
    1116             : 
    1117          40 :     std::vector<uint64_t> sub_fids;
    1118          20 :     sub_fids.assign(sub_fids_set.begin(), sub_fids_set.end());
    1119             : 
    1120          20 :     clog_info("Subscribing to %d FIDs", ctrl_by_subscribe_fid.size());
    1121          20 :     uint subscribe_timeout = 5000;
    1122          20 :     bool for_register = true;
    1123          20 :     int ret = subscribe(sub_fids, subscribe_timeout, for_register);
    1124          20 :     if(ret == FELIX_CLIENT_STATUS_TIMEOUT){
    1125           2 :         auto it = ctrl_by_subscribe_fid.begin();
    1126           3 :         while (it != ctrl_by_subscribe_fid.end()){
    1127           1 :             if(!contextHandler->isSubscribed(it->first)){
    1128           0 :                 FelixClientThread::Reply reply;
    1129           0 :                 reply.ctrl_fid = it->second;
    1130           0 :                 reply.status = FelixClientThread::Status::ERROR_NO_SUBSCRIPTION;
    1131           0 :                 reply.message = "Could not subscribe: timeout";
    1132           0 :                 reply.value = 0;
    1133           0 :                 replies.push_back(reply);
    1134           0 :                 it = ctrl_by_subscribe_fid.erase(it);
    1135           0 :             } else {
    1136           1 :                 it++;
    1137             :             }
    1138             :         }
    1139             :     }
    1140             : 
    1141          20 :     clog_info("Sending command");
    1142             : 
    1143             :     // continue with the connected ctrl_fids and send them the command
    1144          40 :     std::unordered_map<std::string, netio_tag_t> ctrl_fid_by_uuid;
    1145          41 :     for(auto const& sub_ctrl_fids : ctrl_by_subscribe_fid) {
    1146          21 :         uuid_t uuid_dec;
    1147          21 :         char uuid[36 + 1];
    1148          21 :         uuid_generate(uuid_dec);
    1149          21 :         uuid_unparse_upper(uuid_dec, uuid);
    1150             : 
    1151          21 :         clog_info("Preparing cmd...");
    1152          21 :         char json[256];
    1153          21 :         struct jWriteControl jwc;
    1154          21 :         jwOpen(&jwc, json, 256 - 1, JW_ARRAY, JW_COMPACT);
    1155          21 :         jwArr_object(&jwc);
    1156          21 :         jwObj_string(&jwc, FelixClientThread::to_string(FelixClientThread::UUID).c_str(), uuid);
    1157          42 :         jwObj_string(&jwc, FelixClientThread::to_string(FelixClientThread::CMD).c_str(), FelixClientThread::to_string(cmd).c_str());
    1158          21 :         jwObj_array(&jwc, FelixClientThread::to_string(FelixClientThread::CMD_ARGS).c_str());
    1159          45 :         for(auto const& cmd_arg : cmd_args) {
    1160          24 :             jwArr_string(&jwc, cmd_arg.c_str());
    1161             :         }
    1162          21 :         jwEnd(&jwc);
    1163          21 :         jwEnd(&jwc);
    1164          21 :         jwClose(&jwc);
    1165          21 :         int json_length = strlen(json);
    1166          21 :         clog_debug("len=%d; json=%d", json_length, json);
    1167             : 
    1168          21 :         try {
    1169          21 :             clog_info("Sending cmd to fid 0x%x", sub_ctrl_fids.second);
    1170          21 :             send_data(sub_ctrl_fids.second, (const uint8_t*)json, json_length, true);
    1171          20 :             uuids.emplace(uuid);
    1172          21 :             ctrl_fid_by_uuid.emplace(uuid, sub_ctrl_fids.second);
    1173           1 :         } catch (FelixClientException& error) {
    1174           1 :             clog_info("Could not send cmd for FID 0x%x because: %s", sub_ctrl_fids.second, error.what());
    1175             :             // sending timed out or other problem
    1176           1 :             FelixClientThread::Reply reply;
    1177           1 :             reply.ctrl_fid = sub_ctrl_fids.second;
    1178           1 :             reply.status = FelixClientThread::Status::ERROR_NO_CONNECTION;
    1179           1 :             reply.message = "Could not send cmd";
    1180           1 :             reply.value = 0;
    1181           1 :             replies.push_back(reply);
    1182           1 :         }
    1183             : 
    1184             :     }
    1185             : 
    1186             :     // wait for replies (or timeout)
    1187          20 :     clog_info("Waiting for %d replies...", ctrl_fid_by_uuid.size());
    1188          40 :     for(auto const &it : ctrl_fid_by_uuid) {
    1189          20 :         clog_debug("%s 0x%x", it.first.c_str(), it.second);
    1190             :     }
    1191             : 
    1192          20 :     uint timeoutms = 5000;
    1193          20 :     uint timeout = 0;
    1194          20 :     uint span = std::min(timeoutms, 250U);
    1195          20 :     uint recv_replies = 0;
    1196         116 :     while ((ctrl_fid_by_uuid.size() != recv_replies) && (timeout <= timeoutms)) {
    1197          96 :         recv_replies = 0;
    1198         196 :         for (auto& uuid_it : ctrl_fid_by_uuid){
    1199         100 :             if (reply_by_uuid.find(uuid_it.first) != reply_by_uuid.end()) {
    1200          20 :                 recv_replies++;
    1201             :             }
    1202             :         }
    1203          96 :         std::this_thread::sleep_for(std::chrono::milliseconds(span));
    1204          96 :         timeout += span;
    1205             :     }
    1206          20 :     clog_info("Finished waiting");
    1207             : 
    1208          40 :     for (auto& ctrl_fid_uuid : ctrl_fid_by_uuid){
    1209             :         // take it out of the list of uuids
    1210          20 :         std::string uuid = ctrl_fid_uuid.first;
    1211          20 :         uuids.erase(uuid);
    1212             : 
    1213          20 :         auto reply_it = reply_by_uuid.find(uuid);
    1214          20 :         if(reply_it != reply_by_uuid.end()){
    1215          18 :             clog_info("uuid: %s", ctrl_fid_uuid.first.c_str());
    1216          18 :             FelixClientThread::Reply reply = (*reply_it).second;
    1217          18 :             replies.push_back(reply);
    1218             : 
    1219          18 :             clog_info("Found reply for %s", ctrl_fid_uuid.first.c_str());
    1220          18 :             clog_info(" %s", FelixClientThread::to_string(reply).c_str());
    1221             : 
    1222          18 :             reply_by_uuid.erase(reply_it);
    1223          18 :         } else {
    1224           2 :             netio_tag_t ctrl_fid = ctrl_fid_uuid.second;
    1225           2 :             clog_debug("UUID %s timed out for ctrl_fid 0x%x", uuid.c_str(), ctrl_fid);
    1226             : 
    1227           2 :             FelixClientThread::Reply reply;
    1228           2 :             reply.ctrl_fid = ctrl_fid;
    1229           2 :             reply.status = FelixClientThread::Status::ERROR_NO_REPLY;
    1230           2 :             reply.message = "Did not receive a reply";
    1231           2 :             reply.value = 0;
    1232           2 :             replies.push_back(reply);
    1233           2 :         } //TODO: What happens to replies that arrive aftert the timeout and staty in the map
    1234          20 :         clog_debug("send_cmd done");
    1235          20 :     }
    1236             : 
    1237             :     // calculate summary_status, no replies is an ERROR
    1238          20 :     FelixClientThread::Status summary_status = replies.size() > 0 ? FelixClientThread::Status::OK : FelixClientThread::Status::ERROR;
    1239          20 :     summary_status = std::max(summary_status, status);
    1240          43 :     for(auto const& reply : replies) {
    1241          30 :         summary_status = std::max(summary_status, reply.status);
    1242             :     }
    1243          20 :     clog_info("Calculated summary status %d:%s from %d replies", summary_status, FelixClientThread::to_string(summary_status).c_str(), replies.size());
    1244             : 
    1245          20 :     return summary_status;
    1246          20 : }
    1247             : 
    1248         419 : void FelixClient::callback_on_init( OnInitCallback on_init ) {
    1249         419 :     clog_info("Registering on_init");
    1250         419 :     cb_on_init = on_init;
    1251         419 : }
    1252             : 
    1253         184 : void FelixClient::callback_on_data( OnDataCallback on_data ) {
    1254         184 :     clog_info("Registering on_data");
    1255         184 :     cb_on_data = on_data;
    1256         184 : }
    1257             : 
    1258           2 : void FelixClient::callback_on_buffer( OnBufferCallback on_buffer ) {
    1259           2 :     clog_info("Registering on_buffer");
    1260           2 :     cb_on_buffer = on_buffer;
    1261           2 : }
    1262             : 
    1263         413 : void FelixClient::callback_on_connect( OnConnectCallback on_connect ) {
    1264         413 :     clog_info("Registering on_connect");
    1265         413 :     cb_on_connect = on_connect;
    1266         413 : }
    1267             : 
    1268         409 : void FelixClient::callback_on_disconnect( OnDisconnectCallback on_disconnect ) {
    1269         409 :     clog_info("Registering on_disconnect");
    1270         409 :     cb_on_disconnect = on_disconnect;
    1271         409 : }
    1272             : 
    1273          25 : void FelixClient::callback_on_user_timer(  OnUserTimerCallback on_user_timer_cb ) {
    1274          25 :     clog_info("Registering on_user_timer");
    1275          25 :     cb_on_user_timer = on_user_timer_cb;
    1276          25 : }
    1277             : 
    1278         419 : void FelixClient::on_init_eventloop(void* ptr) {
    1279         419 :     clog_info("On init");
    1280         419 :     FelixClient* client = static_cast<FelixClient*>(ptr);
    1281         419 :     client->ready = true;
    1282         419 :     std::function<void()> callback = client->cb_on_init;
    1283         419 :     if (callback) {
    1284         419 :         (callback)();
    1285             :     }
    1286         419 : }
    1287             : 
    1288         459 : void FelixClient::on_connection_established(void* socket) {
    1289         459 :     clog_debug("on_connection_established");
    1290         459 :     if (contextHandler->exists(socket)){
    1291         459 :         contextHandler->setSocketState(socket, CONNECTED);
    1292        1739 :         for(auto fid : contextHandler->getFidsBySocket(socket)){
    1293        1280 :             if (contextHandler->exists(fid)){
    1294        1280 :                 if(contextHandler->getConnectionType(fid) == PUBSUB){
    1295        1004 :                     if(contextHandler->getSubscriptionState(fid) == TIMEOUT){ //FID has timed out before and we do not want to notify the user nor need this subscription
    1296           0 :                         contextHandler->setSubscriptionState(fid, UNSUBING);
    1297           0 :                         send_unsubscribe_msg(fid);
    1298             :                         //contextHandler->removeFromFidsToResubscribe(fid);
    1299           0 :                         return;
    1300             :                     }
    1301             :                 }
    1302        1280 :                 contextHandler->removeFromFidsToResubscribe(fid);
    1303        1280 :                 contextHandler->setSubscriptionState(fid, SUB);
    1304        1280 :                 struct timespec t2;
    1305        1280 :                 clock_gettime(CLOCK_MONOTONIC_RAW, &t2);
    1306        1280 :                 std::unordered_map<std::string, timespec> subscriptionTimes = contextHandler->getSubscriptionTimes(fid);
    1307        2560 :                 double subscribe_useconds = (t2.tv_sec - subscriptionTimes["subscribe"].tv_sec)*1e6
    1308        2560 :                                         + (t2.tv_nsec - subscriptionTimes["subscribe"].tv_nsec)*1e-3;
    1309        1280 :                 clog_debug("Beginning of subscribe() until on_connect for fid 0x%x took %fus", fid, subscribe_useconds);
    1310        2560 :                 double subscribe_evloop_useconds = (t2.tv_sec - subscriptionTimes["subscribe_evloop"].tv_sec)*1e6
    1311        2560 :                                                 + (t2.tv_nsec - subscriptionTimes["subscribe_evloop"].tv_nsec)*1e-3;
    1312        1280 :                 clog_debug("Beginning of subscribe_evloop() until on_connect for fid 0x%x took %fus", fid, subscribe_evloop_useconds);
    1313        1280 :                 if (cb_on_connect) {
    1314        1276 :                     (cb_on_connect)(fid);
    1315             :                 }
    1316        1280 :                 clog_debug("Tag: 0x%x out of %d FIDs has status: CONNECTED", fid, contextHandler->getFidsBySocket(socket).size());
    1317        1280 :             }
    1318         459 :         }
    1319             :     }
    1320             : }
    1321             : 
    1322         124 : void FelixClient::on_connection_established(struct netio_send_socket* socket) {
    1323         124 :     clog_info("Unbuffered send connection established");
    1324         124 :     static_cast<FelixClient*>(socket->usr)->on_connection_established((void*)socket);
    1325         124 : }
    1326             : 
    1327         142 : void FelixClient::on_connection_established(struct netio_buffered_send_socket* socket) {
    1328         142 :     clog_info("Buffered send connection established");
    1329         142 :     static_cast<FelixClient*>(socket->usr)->on_connection_established((void*)socket);
    1330         142 : }
    1331             : 
    1332         108 : void FelixClient::on_connection_established(struct netio_subscribe_socket* socket) {
    1333         108 :     clog_info("Buffered connection established to %s", get_address(socket).c_str());
    1334         108 :     static_cast<FelixClient*>(socket->usr)->on_connection_established((void*)socket);
    1335         108 : }
    1336             : 
    1337          85 : void FelixClient::on_connection_established(struct netio_unbuffered_subscribe_socket* socket) {
    1338          85 :     clog_info("Unbuffered connection established to %s", get_address(socket).c_str());
    1339          85 :     static_cast<FelixClient*>(socket->usr)->on_connection_established((void*)socket);
    1340          85 : }
    1341             : 
    1342         368 : void FelixClient::on_connection_closed(void* socket) {
    1343         368 :     clog_debug("FelixClient::on_connection_closed");
    1344         368 :     contextHandler->setSocketState(socket, DISCONNECTED);
    1345         368 :     auto fids = contextHandler->getFidsBySocket(socket);
    1346         368 :     if(contextHandler->getConnectionType(socket) == SENDRECV){
    1347         266 :         decrement_num_send_sockets();
    1348             :     }
    1349         368 :     if(!terminating_felix_client){
    1350          48 :         contextHandler->removeSocket(socket);
    1351             :     }
    1352             : 
    1353         654 :     for(auto fid : fids ){
    1354         286 :         if (cb_on_disconnect) {
    1355         282 :             (cb_on_disconnect)(fid);
    1356             :         }
    1357             :     }
    1358             : 
    1359         368 : }
    1360             : 
    1361         124 : void FelixClient::on_connection_closed(struct netio_send_socket* socket) {
    1362         124 :     clog_info("Unbuffered send connection closed");
    1363         124 :     static_cast<FelixClient*>(socket->usr)->on_connection_closed((void*)socket);
    1364             :     //static_cast<FelixClient*>(socket->usr)->decrement_num_send_sockets();
    1365         124 : }
    1366             : 
    1367         142 : void FelixClient::on_connection_closed(struct netio_buffered_send_socket* socket) {
    1368         142 :     clog_info("Buffered send connection closed");
    1369         142 :     static_cast<FelixClient*>(socket->usr)->on_connection_closed((void*)socket);
    1370             :     //static_cast<FelixClient*>(socket->usr)->decrement_num_send_sockets();
    1371         142 : }
    1372             : 
    1373          61 : void FelixClient::on_connection_closed(struct netio_subscribe_socket* socket) {
    1374          61 :     clog_info("Buffered connection closed to %s", get_address(socket).c_str());
    1375          61 :     netio_close_socket(&static_cast<FelixClient*>(socket->usr)->ctx.evloop, socket, socket_type::BSUB);
    1376          61 :     static_cast<FelixClient*>(socket->usr)->on_connection_closed((void*)socket);
    1377          61 : }
    1378             : 
    1379          41 : void FelixClient::on_connection_closed(struct netio_unbuffered_subscribe_socket* socket) {
    1380          41 :     clog_info("Unbuffered connection closed to %s", get_address(socket).c_str());
    1381          41 :     netio_close_socket(&static_cast<FelixClient*>(socket->usr)->ctx.evloop, socket, socket_type::USUB);
    1382          41 :     static_cast<FelixClient*>(socket->usr)->on_connection_closed((void*)socket);
    1383          41 : }
    1384             : 
    1385          91 : void FelixClient::on_error_connection_refused(void* socket) {
    1386          91 :     clog_debug("on_error_connection_refused");
    1387          91 :     contextHandler->setSocketState(socket, DISCONNECTED);
    1388         182 :     exec([this, socket]{this->contextHandler->removeSocket(socket, true);});
    1389          91 :     clog_debug("Socket deleted from socket map.");
    1390          91 : }
    1391             : 
    1392          23 : void FelixClient::on_error_connection_refused(struct netio_send_socket* socket) {
    1393          23 :     clog_info("Unbuffered send connection refused");
    1394          23 :     static_cast<FelixClient*>(socket->usr)->on_error_connection_refused((void*)socket);
    1395          23 : }
    1396             : 
    1397          23 : void FelixClient::on_error_connection_refused(struct netio_buffered_send_socket* socket) {
    1398          23 :     clog_info("Buffered send connection refused");
    1399          23 :     static_cast<FelixClient*>(socket->usr)->on_error_connection_refused((void*)socket);
    1400          23 : }
    1401             : 
    1402          22 : void FelixClient::on_error_connection_refused(struct netio_subscribe_socket* socket) {
    1403          22 :     clog_info("Buffered connection refused to %s", get_address(socket).c_str());
    1404          22 :     static_cast<FelixClient*>(socket->usr)->on_error_connection_refused((void*)socket);
    1405             : 
    1406          22 : }
    1407             : 
    1408          23 : void FelixClient::on_error_connection_refused(struct netio_unbuffered_subscribe_socket* socket) {
    1409          23 :     clog_info("Unbuffered connection refused to %s", get_address(socket).c_str());
    1410          23 :     static_cast<FelixClient*>(socket->usr)->on_error_connection_refused((void*)socket);
    1411             : 
    1412          23 : }
    1413             : 
    1414         152 : void FelixClient::on_send_completed(struct netio_send_socket* socket, uint64_t key) {
    1415         152 :     clog_debug("Unbuffered send completed for 0x%x", key);
    1416         152 : }
    1417             : 
    1418          19 : void FelixClient::on_register_msg_received(void* socket, netio_tag_t tag, void* data_ptr, size_t size) {
    1419          19 :     if (size > 0) {
    1420          19 :         uint8_t* data = (uint8_t*)(data_ptr);
    1421          19 :         data++;
    1422          19 :         size--;
    1423             : 
    1424             :         // handle subscriptions for felix-register
    1425          19 :         clog_info("Answer on register command");
    1426             : 
    1427          19 :         netio_tag_t ctrl_fid = get_ctrl_fid(tag);
    1428          19 :         clog_info(" for ctrl_fid 0x%x", ctrl_fid);
    1429             : 
    1430          19 :         simdjson::dom::element ops = parser.parse(simdjson::padded_string((const char*)data, size));
    1431             :         // FIXME check that this works for more than one reply
    1432          38 :         for (simdjson::dom::object op : ops) {
    1433          19 :             FelixClientThread::Reply reply;
    1434          38 :             std::string uuid = std::string(op[FelixClientThread::to_string(FelixClientThread::UUID)]);
    1435          19 :             clog_info("   uuid %s", uuid.c_str());
    1436             : 
    1437             :             // only add if we are still looking for the uuid (timeout...)
    1438          19 :             if (uuids.find(uuid) != uuids.end()) {
    1439          18 :                 reply.ctrl_fid = ctrl_fid;
    1440          36 :                 int64_t s = op[FelixClientThread::to_string(FelixClientThread::STATUS)];
    1441          18 :                 reply.status = static_cast<FelixClientThread::Status>(s);
    1442          18 :                 reply.message = std::string(op[FelixClientThread::to_string(FelixClientThread::MESSAGE)]);
    1443          36 :                 reply.value = uint64_t(op[FelixClientThread::to_string(FelixClientThread::VALUE)]);
    1444             : 
    1445          37 :                 reply_by_uuid[uuid] = reply;
    1446             :             } else {
    1447           1 :                 clog_info("   uuid not found, answer ignored", uuid.c_str());
    1448             :             }
    1449          19 :         }
    1450             :     }
    1451          19 : }
    1452             : 
    1453          19 : void FelixClient::on_register_msg_received(struct netio_subscribe_socket* socket, netio_tag_t tag, void* data, size_t size) {
    1454          19 :     static_cast<FelixClient*>(socket->usr)->on_register_msg_received((void*)socket, tag, data, size);
    1455          19 : }
    1456             : 
    1457           0 : void FelixClient::on_register_msg_received(struct netio_unbuffered_subscribe_socket* socket, netio_tag_t tag, void* data, size_t size) {
    1458           0 :     static_cast<FelixClient*>(socket->usr)->on_register_msg_received((void*)socket, tag, data, size);
    1459           0 : }
    1460             : 
    1461        3197 : void FelixClient::on_timer(void* ptr) {
    1462             : 
    1463        3197 :     clog_debug("On timer called. Attempting to resubscribe");
    1464        3197 :     uint count = 0;
    1465        3197 :     FelixClient* client = static_cast<FelixClient*>(ptr);
    1466        3197 :     if(client->terminating_felix_client.load()){
    1467           0 :         clog_debug("Already terminating no reason to resubscribe");
    1468           0 :         return;
    1469             :     }
    1470        3197 :     std::vector<uint64_t> fids_to_resubscribe = client->contextHandler->getFidsToResubscribe();
    1471        3197 :     clog_debug("Resubscribing for %d", fids_to_resubscribe.size());
    1472        3284 :     for(const auto& fid : fids_to_resubscribe) {
    1473          87 :         if(client->contextHandler->getSocketState(fid) == DISCONNECTED){
    1474          73 :             if(fid & 0x8000){
    1475          24 :                 std::promise<uint8_t> prom_sub;
    1476          24 :                 std::future<uint8_t> future_sub_done = prom_sub.get_future();
    1477          24 :                 client->establish_send_connection(fid, std::move(prom_sub));
    1478          24 :                 if(!future_sub_done.get()) {
    1479          24 :                     count++;
    1480             :                 }
    1481          24 :             } else {
    1482          49 :                 if (!client->contextHandler->canSubscribe(fid)) {
    1483           0 :                     clog_debug("Already attempting to subscribe to 0x%x", fid);
    1484           0 :                     continue;
    1485             :                 }
    1486          49 :                 if (client->subscribe_evloop(fid)) {
    1487          49 :                     count++;
    1488             :                 }
    1489             :             }
    1490             :         }
    1491          87 :         clog_trace("Socket has state: %d", client->contextHandler->getSocketState(fid));
    1492          87 :         clog_trace("FID 0x%x is in state: %d", fid, client->contextHandler->getSubscriptionState(fid));
    1493             :     }
    1494             : 
    1495        3197 :     if (count > 0) {
    1496          49 :         clog_debug("Subscription timer: resubscribing to %d tags", count);
    1497             :     }
    1498        3197 : }
    1499             : 
    1500             : 
    1501         195 : void FelixClient::on_signal(void* ptr) {
    1502         195 :     clog_debug("On signal");
    1503         195 :     struct SignalData* data = (struct SignalData*)ptr;
    1504         195 :     UserFunction user_function = data->user_function;
    1505         195 :     (user_function)();
    1506             : 
    1507         195 :     netio_signal_close(data->evloop, data->signal);
    1508         195 :     free(data->signal);
    1509         390 :     delete data;
    1510         195 : }
    1511             : 
    1512        1343 : std::string FelixClient::get_address(netio_subscribe_socket *socket) {
    1513        2686 :     return std::string(socket->remote_hostname) + std::string(":") + std::to_string(socket->remote_port);
    1514             : }
    1515             : 
    1516        1248 : std::string FelixClient::get_address(netio_unbuffered_subscribe_socket *socket) {
    1517        2496 :     return std::string(socket->remote_hostname) + std::string(":") + std::to_string(socket->remote_port);
    1518             : }
    1519             : 
    1520             : 
    1521        1093 : struct netio_subscribe_socket* FelixClient::get_buffered_subscribe_socket(uint64_t fid) {
    1522        1093 :     bool needInititialization = contextHandler->addOrCreateSocket(fid);
    1523        1093 :     struct netio_subscribe_socket* socket = contextHandler->getSocket<netio_subscribe_socket*>(fid);
    1524        1093 :     clog_debug("get_buffered_subscribe_socket for fid 0x%x and socket 0x%p", fid, socket);
    1525        1093 :     if(needInititialization){
    1526         130 :         initialize_buffered_subscribe_socket(socket, fid);
    1527             :     }
    1528        1093 :     return socket;
    1529             : }
    1530             : 
    1531         130 : void FelixClient::initialize_buffered_subscribe_socket(struct netio_subscribe_socket* socket, uint64_t fid){
    1532         130 :     struct netio_buffered_socket_attr attr;
    1533         130 :     attr.num_pages = 2 * contextHandler->getPages(fid); //Hard-coded double the number of pages for FLX-2041, todo: make a parameter later
    1534         130 :     attr.pagesize = contextHandler->getPagesize(fid);
    1535         130 :     attr.watermark = contextHandler->getWatermark(fid);
    1536         130 :     if (contextHandler->isTcp(fid)) {
    1537          31 :         clog_debug("Setup tcp socket using address %s:%d", contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid));
    1538          62 :         netio_subscribe_tcp_socket_init(socket, &ctx, &attr,
    1539          62 :             netio_hostname(local_ip_address.c_str()), contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid));
    1540             :     } else {
    1541          99 :         clog_debug("Setup libfabric socket using address %s:%d", contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid));
    1542         198 :         netio_subscribe_socket_init(socket, &ctx, &attr,
    1543         198 :             netio_hostname(local_ip_address.c_str()), contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid));
    1544             :     }
    1545             : 
    1546         130 :     socket->cb_connection_closed = on_connection_closed;
    1547         130 :     socket->cb_connection_established = on_connection_established;
    1548         130 :     socket->cb_error_connection_refused = on_error_connection_refused;
    1549         130 :     if (contextHandler->isForRegister(fid)) {
    1550          17 :         socket->cb_msg_received = on_register_msg_received;
    1551             :     } else {
    1552         113 :         socket->cb_msg_received = on_msg_received;
    1553             :     }
    1554             : 
    1555         130 :     if(buffer_callback){
    1556           2 :         socket->cb_buf_received = on_buf_received;
    1557             :     }
    1558             : 
    1559         130 :     clog_debug("Setup socket for %s", get_address(socket).c_str());
    1560             :     // store object pointer (this) in socket, for use in c-style callbacks
    1561         130 :     socket->usr = this;
    1562         130 : }
    1563             : 
    1564        1063 : struct netio_unbuffered_subscribe_socket* FelixClient::get_unbuffered_subscribe_socket(uint64_t fid) {
    1565             :     // Create socket if it does not exist, otherwise lookup
    1566        1063 :     bool needInititialization = contextHandler->addOrCreateSocket(fid);
    1567             : 
    1568        1063 :     struct netio_unbuffered_subscribe_socket* socket = contextHandler->getSocket<netio_unbuffered_subscribe_socket*>(fid);
    1569        1063 :     clog_debug("get_unbuffered_subscribe_socket");
    1570        1063 :     if(needInititialization){
    1571         108 :         initialize_unbuffered_subscribe_socket(socket, fid);
    1572             :     }
    1573             : 
    1574             : 
    1575        1063 :     return socket;
    1576             : }
    1577         108 : void FelixClient::initialize_unbuffered_subscribe_socket(netio_unbuffered_subscribe_socket* socket, uint64_t fid){
    1578         108 :     clog_debug("Setup socket using address %s:%d", contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid));
    1579             : 
    1580             :     // FLX-1222 use params
    1581             :     // FLX-1222 allocate somewhere globally, per socket
    1582             :     //buffer size not specified, taken by the bus
    1583         108 :     if (unbuffered_netio_pages  == 0){
    1584          13 :         clog_trace("Creating unbuffered subscribe socket. Buffer not specified, using bus parameters. Pages: %d pagesize: %d", contextHandler->getPages(fid), contextHandler->getPagesize(fid));
    1585          26 :         netio_unbuffered_subscribe_socket_init(socket, &ctx,
    1586          26 :         local_ip_address.c_str(), contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid),
    1587          13 :         contextHandler->getPagesize(fid), contextHandler->getPages(fid));
    1588             :     } else {
    1589             :         //buffer size passed as parameter
    1590          95 :         clog_trace("Creating unbuffered subscribe socket. Buffer parameter specified. Pages: %d pagesize: %d", unbuffered_netio_pages, unbuffered_netio_pagesize);
    1591          95 :         netio_unbuffered_subscribe_socket_init(socket, &ctx,
    1592         190 :         local_ip_address.c_str(), contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid),
    1593          95 :         unbuffered_netio_pagesize, unbuffered_netio_pages); //correct even if unbuffered_netio_pagesize/-pages are needed?
    1594             :     }
    1595             : 
    1596         108 :     socket->cb_connection_closed = on_connection_closed;
    1597         108 :     socket->cb_connection_established = on_connection_established;
    1598         108 :     socket->cb_error_connection_refused = on_error_connection_refused;
    1599         108 :     if (contextHandler->isForRegister(fid)) {
    1600           0 :         socket->cb_msg_received = on_register_msg_received;
    1601             :     } else {
    1602         108 :         socket->cb_msg_received = on_msg_received;
    1603             :     }
    1604             : 
    1605         108 :     clog_debug("Setup socket for %s", get_address(socket).c_str());
    1606             : 
    1607         108 :     socket->usr = this;
    1608         108 : }
    1609             : 
    1610             : 
    1611       14235 : struct netio_buffered_send_socket* FelixClient::get_buffered_send_socket(uint64_t fid) {
    1612             :     // Create socket if it does not exist, otherwise lookup
    1613       14235 :     bool needInititialization = false;
    1614       14235 :     if (contextHandler->getSocket<struct netio_buffered_send_socket*>(fid) == NULL){
    1615         172 :         needInititialization = contextHandler->addOrCreateSocket(fid);
    1616             :     }
    1617             : 
    1618       14235 :     struct netio_buffered_send_socket* socket = contextHandler->getSocket<struct netio_buffered_send_socket*>(fid);
    1619             : 
    1620       14235 :     if (needInititialization)
    1621             :     {
    1622         165 :         struct netio_buffered_socket_attr attr;
    1623         165 :         attr.num_pages = contextHandler->getPages(fid);
    1624         165 :         attr.pagesize = contextHandler->getPagesize(fid);
    1625         165 :         attr.watermark = contextHandler->getWatermark(fid);
    1626         165 :         attr.timeout_ms = 5;
    1627         165 :         if (contextHandler->isTcp(fid)) {
    1628           5 :             netio_buffered_send_tcp_socket_init(socket, &ctx, &attr);
    1629             :         } else {
    1630         160 :             netio_buffered_send_socket_init(socket, &ctx, &attr);
    1631             :         }
    1632         165 :         socket->cb_connection_closed = on_connection_closed;
    1633         165 :         socket->cb_connection_established = on_connection_established;
    1634         165 :         socket->cb_error_connection_refused = on_error_connection_refused;
    1635             : 
    1636             :         // store object pointer (this) in socket, for use in c-style callbacks
    1637         165 :         socket->usr = this;
    1638         330 :         netio_buffered_connect(socket, contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid));
    1639             :     }
    1640       14235 :     return socket;
    1641             : }
    1642             : 
    1643             : 
    1644         308 : struct netio_send_socket* FelixClient::get_unbuffered_send_socket(uint64_t fid) {
    1645             :     // Create socket if it does not exist, otherwise lookup
    1646         308 :     bool needInititialization = false;
    1647         308 :     if (contextHandler->getSocket<struct netio_send_socket*>(fid) == NULL){
    1648         156 :         needInititialization = contextHandler->addOrCreateSocket(fid);
    1649             :     }
    1650             : 
    1651         308 :     struct netio_send_socket* socket = contextHandler->getSocket<struct netio_send_socket*>(fid);
    1652             : 
    1653         308 :     if (needInititialization)
    1654             :     {
    1655         147 :         if (contextHandler->isTcp(fid)) {
    1656             :             // FIXME is this equivalent to the non-tcp below ?
    1657          10 :             netio_init_send_tcp_socket(socket, &ctx);
    1658             :         } else {
    1659         137 :             netio_unbuffered_send_socket_init(socket, &ctx);
    1660             :         }
    1661             : 
    1662         147 :         struct netio_buffer buf;
    1663         147 :         buf.size = UNBUFFERED_MR_SIZE;
    1664         147 :         buf.data = malloc(UNBUFFERED_MR_SIZE);
    1665         147 :         memset(buf.data, 0, UNBUFFERED_MR_SIZE);
    1666         147 :         buf.mr = nullptr;
    1667         294 :         unbuf_mem_regions[contextHandler->getAddress(fid)] = buf;
    1668             : 
    1669         147 :         socket->cb_connection_closed = on_connection_closed;
    1670         147 :         socket->cb_connection_established = on_connection_established;
    1671         147 :         socket->cb_error_connection_refused = on_error_connection_refused;
    1672         147 :         socket->cb_send_completed = on_send_completed;
    1673             :         // store object pointer (this) in socket, for use in c-style callbacks
    1674         147 :         socket->usr = this;
    1675         147 :         netio_connect(socket, contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid));
    1676         294 :         clog_debug("Connected. Now registering memory region: 0x%p", &unbuf_mem_regions[contextHandler->getAddress(fid)]);
    1677         441 :         netio_register_send_buffer(socket, &unbuf_mem_regions[contextHandler->getAddress(fid)], 0);
    1678             :     }
    1679         308 :     return socket;
    1680             : }
    1681             : 
    1682             : 
    1683             : 
    1684         444 : void FelixClient::user_timer_init(){
    1685         444 :     netio_timer_init(&ctx.evloop, &user_timer);
    1686         444 :     user_timer.cb = on_user_timer;
    1687         444 :     user_timer.data = this;
    1688         444 : }
    1689             : 
    1690          25 : void FelixClient::user_timer_start(unsigned long interval){
    1691          25 :     netio_timer_start_ms(&user_timer, interval);
    1692          25 : }
    1693             : 
    1694           0 : void FelixClient::user_timer_stop(){
    1695           0 :     netio_timer_stop(&user_timer);
    1696           0 : }
    1697             : 
    1698          37 : void FelixClient::on_user_timer(void* ptr) {
    1699          37 :     FelixClient* client = static_cast<FelixClient*>(ptr);
    1700          37 :     client->cb_on_user_timer();
    1701          37 : }
    1702             : 
    1703             : 
    1704         266 : void FelixClient::decrement_num_send_sockets(){
    1705         266 :     if(total_number_send_sockets.load() > 0){
    1706         264 :         total_number_send_sockets--;
    1707             :     }
    1708         266 : }

Generated by: LCOV version 1.0