LCOV - code coverage report
Current view: top level - felix-client/src - felix_client_thread_impl.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 78 101 77.2 %
Date: 2025-06-10 03:23:28 Functions: 15 18 83.3 %

          Line data    Source code
       1             : #include <iostream>
       2             : #include <unistd.h>
       3             : #include <cstring>
       4             : #include <list>
       5             : 
       6             : #include "felix/felix_client_util.hpp"
       7             : 
       8             : #include "felix/felix_client_properties.h"
       9             : #include "felix/felix_client_exception.hpp"
      10             : #include "felix/felix_client_thread_impl.hpp"
      11             : 
      12             : // #include "clog.h"
      13             : 
      14         625 : FelixClientThreadImpl::FelixClientThreadImpl(Config& config) {
      15         625 :     if (config.property[FELIX_CLIENT_READ_ENV] == "True") {
      16           0 :         std::list<std::string> propertiesList = {
      17             :             FELIX_CLIENT_LOCAL_IP_OR_INTERFACE,
      18             :             FELIX_CLIENT_LOG_LEVEL,
      19             :             FELIX_CLIENT_BUS_DIR,
      20             :             FELIX_CLIENT_BUS_GROUP_NAME,
      21             :             FELIX_CLIENT_VERBOSE_BUS,
      22             :             FELIX_CLIENT_TIMEOUT,
      23             :             FELIX_CLIENT_NETIO_PAGES,
      24             :             FELIX_CLIENT_NETIO_PAGESIZE,
      25             :             FELIX_CLIENT_THREAD_AFFINITY,
      26           0 :         };
      27             : 
      28           0 :         std::string prefix = "FELIX_CLIENT_";
      29             : 
      30           0 :         for (const auto & property : propertiesList) {
      31           0 :             char* value = std::getenv((prefix + property).c_str());
      32           0 :             if (value != NULL) {
      33           0 :                 config.property[property] = value;
      34             :             }
      35             :         }
      36           0 :     }
      37             : 
      38        1250 :     std::string local_hostname = config.property[FELIX_CLIENT_LOCAL_IP_OR_INTERFACE];
      39        1250 :     std::string log_lvl = config.property[FELIX_CLIENT_LOG_LEVEL];
      40         625 :     unsigned level = get_log_level(log_lvl);
      41        1250 :     std::string timeout = config.property[FELIX_CLIENT_TIMEOUT];
      42         625 :     if (timeout == "") {
      43         431 :         timeout = "0";
      44             :     }
      45         625 :     try {
      46         625 :         timeoutms = std::stoul(timeout, 0, 0);
      47           0 :     } catch (std::invalid_argument const& error) {
      48           0 :         std::cout << "WARNING Timeout ignored: <" << timeout << ">\n";
      49           0 :         timeoutms = 0;
      50           0 :     }
      51             : 
      52        1250 :     std::string felix_netio_pages = config.property[FELIX_CLIENT_NETIO_PAGES];
      53         819 :     unsigned netio_pages = felix_netio_pages == "" ? 0 : std::stoul(felix_netio_pages, 0, 0);
      54             : 
      55        1250 :     std::string felix_netio_pagesize = config.property[FELIX_CLIENT_NETIO_PAGESIZE];
      56         819 :     unsigned netio_pagesize = felix_netio_pagesize == "" ? 0 : std::stoul(felix_netio_pagesize, 0, 0);
      57             : 
      58        1250 :     std::string bus_dir = config.property[FELIX_CLIENT_BUS_DIR];
      59        1246 :     bus_dir = bus_dir == "" ? "./bus" : bus_dir;
      60             : 
      61        1250 :     std::string bus_group_name = config.property[FELIX_CLIENT_BUS_GROUP_NAME];
      62        1250 :     bus_group_name = bus_group_name == "" ? "FELIX" : bus_group_name;
      63             : 
      64         625 :     client = new FelixClient(local_hostname,
      65             :                              bus_dir,
      66             :                              bus_group_name,
      67             :                              level,
      68        1250 :                              config.property[FELIX_CLIENT_VERBOSE_BUS] == "True",
      69             :                              netio_pages,
      70        3125 :                              netio_pagesize);
      71             : 
      72         625 :     if (config.on_init_callback) {
      73        1250 :         client->callback_on_init(config.on_init_callback);
      74             :     }
      75             : 
      76         625 :     if (config.on_data_callback) {
      77         380 :         client->callback_on_data(config.on_data_callback);
      78             :     }
      79             : 
      80         625 :     if (config.on_connect_callback) {
      81        1234 :         client->callback_on_connect(config.on_connect_callback);
      82             :     }
      83             : 
      84         625 :     if (config.on_disconnect_callback) {
      85        1218 :         client->callback_on_disconnect(config.on_disconnect_callback);
      86             :     }
      87             : 
      88         625 :     if (config.property[FELIX_CLIENT_THREAD_AFFINITY] != "") {
      89           0 :         client->set_thread_affinity(config.property[FELIX_CLIENT_THREAD_AFFINITY]);
      90             :     }
      91             : 
      92         625 :     thread = std::thread(&FelixClient::run, client);
      93             : 
      94        1249 :     while(!client->is_ready()) {
      95         624 :         usleep(100000);
      96             :     }
      97         625 : }
      98             : 
      99        1094 : FelixClientThreadImpl::~FelixClientThreadImpl() {
     100         547 :     client->stop();
     101         547 :     if (thread.joinable()) {
     102         547 :         thread.join();
     103             :     }
     104         547 :     delete client;
     105        1094 : }
     106             : 
     107             : 
     108         569 : void FelixClientThreadImpl::send_data(uint64_t fid, const uint8_t* data, size_t size, bool flush) {
     109         569 :     client->send_data(fid, data,  size, flush);
     110         521 : }
     111             : 
     112             : 
     113         297 : void FelixClientThreadImpl::send_data(netio_tag_t fid, const std::vector<const uint8_t*>& msgs, const std::vector<size_t>& sizes) {
     114         297 :     client->send_data(fid, msgs, sizes);
     115         297 : }
     116             : 
     117             : 
     118          32 : void FelixClientThreadImpl::send_data_nb(uint64_t fid, const uint8_t* data, size_t size, bool flush) {
     119          32 :     client->send_data_nb(fid, data,  size, flush);
     120          32 : }
     121             : 
     122             : 
     123           6 : void FelixClientThreadImpl::send_data_nb(netio_tag_t fid, const std::vector<const uint8_t*>& msgs, const std::vector<size_t>& sizes) {
     124           6 :     client->send_data_nb(fid, msgs, sizes);
     125           6 : }
     126             : 
     127         540 : void FelixClientThreadImpl::subscribe(uint64_t fid) {
     128         540 :     int status = client->subscribe(fid, timeoutms);
     129         540 :     if (status == FELIX_CLIENT_STATUS_TIMEOUT) {
     130           2 :         throw FelixClientResourceNotAvailableException();
     131             :     }
     132         538 : }
     133             : 
     134          22 : void FelixClientThreadImpl::subscribe(const std::vector<uint64_t>& fids) {
     135          22 :     int status = client->subscribe(fids, timeoutms);
     136          22 :     if (status == FELIX_CLIENT_STATUS_TIMEOUT) {
     137           4 :         throw FelixClientResourceNotAvailableException();
     138             :     }
     139          18 : }
     140             : 
     141             : 
     142         536 : void FelixClientThreadImpl::unsubscribe(uint64_t fid) {
     143         536 :     while(client->unsubscribe(fid) == FELIX_CLIENT_STATUS_CONNECTION_BUSY)
     144             :     {
     145           0 :         usleep(100);
     146             :     }
     147         536 : }
     148             : 
     149          94 : void FelixClientThreadImpl::exec(const UserFunction &user_function) {
     150          94 :     client->exec(user_function);
     151          94 : }
     152             : 
     153          40 : void FelixClientThreadImpl::init_send_data(uint64_t fid) {
     154          40 :     client->init_send_data(fid);
     155          40 : }
     156             : 
     157         392 : void FelixClientThreadImpl::init_subscribe(uint64_t fid) {
     158         392 :     client->init_subscribe(fid);
     159         392 : }
     160             : 
     161          40 : FelixClientThreadImpl::Status FelixClientThreadImpl::send_cmd(const std::vector<uint64_t>& fids, Cmd cmd, const std::vector<std::string>& cmd_args, std::vector<Reply>& replies) {
     162          40 :     return client->send_cmd(fids, cmd, cmd_args, replies);
     163             : }
     164             : 
     165           0 : void FelixClientThreadImpl::user_timer_start(unsigned long interval) {
     166           0 :     client->user_timer_start(interval);
     167           0 : }
     168             : 
     169           0 : void FelixClientThreadImpl::user_timer_stop() {
     170           0 :     client->user_timer_stop();
     171           0 : }
     172             : 
     173           0 : void FelixClientThreadImpl::callback_on_user_timer(OnUserTimerCallback on_user_timer_cb) {
     174           0 :     client->callback_on_user_timer(on_user_timer_cb);
     175           0 : }
     176             : 
     177             : extern "C" {
     178         611 :     FelixClientThreadInterface * create_felix_client(FelixClientThreadInterface::Config& config) {
     179         611 :         return new FelixClientThreadImpl(config);
     180             :     }
     181             : }

Generated by: LCOV version 1.0