LCOV - code coverage report
Current view: top level - felix-client/src - felix_client_thread_impl.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 78 101 77.2 %
Date: 2025-08-12 04:15:35 Functions: 15 18 83.3 %

          Line data    Source code
       1             : #include <iostream>
       2             : #include <unistd.h>
       3             : #include <cstring>
       4             : #include <list>
       5             : 
       6             : #include "felix/felix_client_util.hpp"
       7             : 
       8             : #include "felix/felix_client_properties.h"
       9             : #include "felix/felix_client_exception.hpp"
      10             : #include "felix/felix_client_thread_impl.hpp"
      11             : 
      12             : 
      13         384 : FelixClientThreadImpl::FelixClientThreadImpl(Config& config) {
      14         384 :     if (config.property[FELIX_CLIENT_READ_ENV] == "True") {
      15           0 :         std::list<std::string> propertiesList = {
      16             :             FELIX_CLIENT_LOCAL_IP_OR_INTERFACE,
      17             :             FELIX_CLIENT_LOG_LEVEL,
      18             :             FELIX_CLIENT_BUS_DIR,
      19             :             FELIX_CLIENT_BUS_GROUP_NAME,
      20             :             FELIX_CLIENT_VERBOSE_BUS,
      21             :             FELIX_CLIENT_TIMEOUT,
      22             :             FELIX_CLIENT_NETIO_PAGES,
      23             :             FELIX_CLIENT_NETIO_PAGESIZE,
      24             :             FELIX_CLIENT_THREAD_AFFINITY,
      25           0 :         };
      26             : 
      27           0 :         std::string prefix = "FELIX_CLIENT_";
      28             : 
      29           0 :         for (const auto & property : propertiesList) {
      30           0 :             char* value = std::getenv((prefix + property).c_str());
      31           0 :             if (value != NULL) {
      32           0 :                 config.property[property] = value;
      33             :             }
      34             :         }
      35           0 :     }
      36             : 
      37         768 :     std::string local_hostname = config.property[FELIX_CLIENT_LOCAL_IP_OR_INTERFACE];
      38         768 :     std::string log_lvl = config.property[FELIX_CLIENT_LOG_LEVEL];
      39         384 :     unsigned level = get_log_level(log_lvl);
      40         768 :     std::string timeout = config.property[FELIX_CLIENT_TIMEOUT];
      41         384 :     if (timeout == "") {
      42         231 :         timeout = "0";
      43             :     }
      44         384 :     try {
      45         384 :         timeoutms = std::stoul(timeout, 0, 0);
      46           0 :     } catch (std::invalid_argument const& error) {
      47           0 :         std::cout << "WARNING Timeout ignored: <" << timeout << ">\n";
      48           0 :         timeoutms = 0;
      49           0 :     }
      50             : 
      51         768 :     std::string felix_netio_pages = config.property[FELIX_CLIENT_NETIO_PAGES];
      52         537 :     unsigned netio_pages = felix_netio_pages == "" ? 0 : std::stoul(felix_netio_pages, 0, 0);
      53             : 
      54         768 :     std::string felix_netio_pagesize = config.property[FELIX_CLIENT_NETIO_PAGESIZE];
      55         537 :     unsigned netio_pagesize = felix_netio_pagesize == "" ? 0 : std::stoul(felix_netio_pagesize, 0, 0);
      56             : 
      57         768 :     std::string bus_dir = config.property[FELIX_CLIENT_BUS_DIR];
      58         384 :     bus_dir = bus_dir == "" ? "./bus" : bus_dir;
      59             : 
      60         768 :     std::string bus_group_name = config.property[FELIX_CLIENT_BUS_GROUP_NAME];
      61         384 :     bus_group_name = bus_group_name == "" ? "FELIX" : bus_group_name;
      62             : 
      63         384 :     client = new FelixClient(local_hostname,
      64             :                              bus_dir,
      65             :                              bus_group_name,
      66             :                              level,
      67         768 :                              config.property[FELIX_CLIENT_VERBOSE_BUS] == "True",
      68             :                              netio_pages,
      69        1152 :                              netio_pagesize);
      70             : 
      71         384 :     if (config.on_init_callback) {
      72         768 :         client->callback_on_init(config.on_init_callback);
      73             :     }
      74             : 
      75         384 :     if (config.on_data_callback) {
      76         302 :         client->callback_on_data(config.on_data_callback);
      77             :     }
      78             : 
      79         384 :     if (config.on_connect_callback) {
      80         756 :         client->callback_on_connect(config.on_connect_callback);
      81             :     }
      82             : 
      83         384 :     if (config.on_disconnect_callback) {
      84         748 :         client->callback_on_disconnect(config.on_disconnect_callback);
      85             :     }
      86             : 
      87         384 :     if (config.property[FELIX_CLIENT_THREAD_AFFINITY] != "") {
      88           0 :         client->set_thread_affinity(config.property[FELIX_CLIENT_THREAD_AFFINITY]);
      89             :     }
      90             : 
      91         384 :     thread = std::thread(&FelixClient::run, client);
      92             : 
      93         768 :     while(!client->is_ready()) {
      94         384 :         usleep(100000);
      95             :     }
      96         384 : }
      97             : 
      98         616 : FelixClientThreadImpl::~FelixClientThreadImpl() {
      99         308 :     client->stop();
     100         308 :     if (thread.joinable()) {
     101         308 :         thread.join();
     102             :     }
     103         308 :     delete client;
     104         616 : }
     105             : 
     106             : 
     107       14190 : void FelixClientThreadImpl::send_data(uint64_t fid, const uint8_t* data, size_t size, bool flush) {
     108       14190 :     client->send_data(fid, data,  size, flush);
     109       10275 : }
     110             : 
     111             : 
     112           4 : void FelixClientThreadImpl::send_data(netio_tag_t fid, const std::vector<const uint8_t*>& msgs, const std::vector<size_t>& sizes) {
     113           4 :     client->send_data(fid, msgs, sizes);
     114           4 : }
     115             : 
     116             : 
     117          21 : void FelixClientThreadImpl::send_data_nb(uint64_t fid, const uint8_t* data, size_t size, bool flush) {
     118          21 :     client->send_data_nb(fid, data,  size, flush);
     119          21 : }
     120             : 
     121             : 
     122           3 : void FelixClientThreadImpl::send_data_nb(netio_tag_t fid, const std::vector<const uint8_t*>& msgs, const std::vector<size_t>& sizes) {
     123           3 :     client->send_data_nb(fid, msgs, sizes);
     124           3 : }
     125             : 
     126         509 : void FelixClientThreadImpl::subscribe(uint64_t fid) {
     127         509 :     int status = client->subscribe(fid, timeoutms);
     128         509 :     if (status == FELIX_CLIENT_STATUS_TIMEOUT) {
     129           2 :         throw FelixClientResourceNotAvailableException();
     130             :     }
     131         507 : }
     132             : 
     133          22 : void FelixClientThreadImpl::subscribe(const std::vector<uint64_t>& fids) {
     134          22 :     int status = client->subscribe(fids, timeoutms);
     135          22 :     if (status == FELIX_CLIENT_STATUS_TIMEOUT) {
     136           4 :         throw FelixClientResourceNotAvailableException();
     137             :     }
     138          18 : }
     139             : 
     140             : 
     141         313 : void FelixClientThreadImpl::unsubscribe(uint64_t fid) {
     142         313 :     while(client->unsubscribe(fid) == FELIX_CLIENT_STATUS_CONNECTION_BUSY)
     143             :     {
     144           0 :         usleep(100);
     145             :     }
     146         313 : }
     147             : 
     148          94 : void FelixClientThreadImpl::exec(const UserFunction &user_function) {
     149          94 :     client->exec(user_function);
     150          94 : }
     151             : 
     152          25 : void FelixClientThreadImpl::init_send_data(uint64_t fid) {
     153          25 :     client->init_send_data(fid);
     154          25 : }
     155             : 
     156         391 : void FelixClientThreadImpl::init_subscribe(uint64_t fid) {
     157         391 :     client->init_subscribe(fid);
     158         391 : }
     159             : 
     160          20 : FelixClientThreadImpl::Status FelixClientThreadImpl::send_cmd(const std::vector<uint64_t>& fids, Cmd cmd, const std::vector<std::string>& cmd_args, std::vector<Reply>& replies) {
     161          20 :     return client->send_cmd(fids, cmd, cmd_args, replies);
     162             : }
     163             : 
     164           0 : void FelixClientThreadImpl::user_timer_start(unsigned long interval) {
     165           0 :     client->user_timer_start(interval);
     166           0 : }
     167             : 
     168           0 : void FelixClientThreadImpl::user_timer_stop() {
     169           0 :     client->user_timer_stop();
     170           0 : }
     171             : 
     172           0 : void FelixClientThreadImpl::callback_on_user_timer(OnUserTimerCallback on_user_timer_cb) {
     173           0 :     client->callback_on_user_timer(on_user_timer_cb);
     174           0 : }
     175             : 
     176             : extern "C" {
     177         376 :     FelixClientThreadInterface * create_felix_client(FelixClientThreadInterface::Config& config) {
     178         376 :         return new FelixClientThreadImpl(config);
     179             :     }
     180             : }

Generated by: LCOV version 1.0