Line data Source code
1 : #include <iostream>
2 : #include <unistd.h>
3 : #include <cstring>
4 : #include <list>
5 :
6 : #include "felix/felix_client_util.hpp"
7 :
8 : #include "felix/felix_client_properties.h"
9 : #include "felix/felix_client_exception.hpp"
10 : #include "felix/felix_client_thread_impl.hpp"
11 :
12 : // #include "clog.h"
13 :
14 625 : FelixClientThreadImpl::FelixClientThreadImpl(Config& config) {
15 625 : if (config.property[FELIX_CLIENT_READ_ENV] == "True") {
16 0 : std::list<std::string> propertiesList = {
17 : FELIX_CLIENT_LOCAL_IP_OR_INTERFACE,
18 : FELIX_CLIENT_LOG_LEVEL,
19 : FELIX_CLIENT_BUS_DIR,
20 : FELIX_CLIENT_BUS_GROUP_NAME,
21 : FELIX_CLIENT_VERBOSE_BUS,
22 : FELIX_CLIENT_TIMEOUT,
23 : FELIX_CLIENT_NETIO_PAGES,
24 : FELIX_CLIENT_NETIO_PAGESIZE,
25 : FELIX_CLIENT_THREAD_AFFINITY,
26 0 : };
27 :
28 0 : std::string prefix = "FELIX_CLIENT_";
29 :
30 0 : for (const auto & property : propertiesList) {
31 0 : char* value = std::getenv((prefix + property).c_str());
32 0 : if (value != NULL) {
33 0 : config.property[property] = value;
34 : }
35 : }
36 0 : }
37 :
38 1250 : std::string local_hostname = config.property[FELIX_CLIENT_LOCAL_IP_OR_INTERFACE];
39 1250 : std::string log_lvl = config.property[FELIX_CLIENT_LOG_LEVEL];
40 625 : unsigned level = get_log_level(log_lvl);
41 1250 : std::string timeout = config.property[FELIX_CLIENT_TIMEOUT];
42 625 : if (timeout == "") {
43 431 : timeout = "0";
44 : }
45 625 : try {
46 625 : timeoutms = std::stoul(timeout, 0, 0);
47 0 : } catch (std::invalid_argument const& error) {
48 0 : std::cout << "WARNING Timeout ignored: <" << timeout << ">\n";
49 0 : timeoutms = 0;
50 0 : }
51 :
52 1250 : std::string felix_netio_pages = config.property[FELIX_CLIENT_NETIO_PAGES];
53 819 : unsigned netio_pages = felix_netio_pages == "" ? 0 : std::stoul(felix_netio_pages, 0, 0);
54 :
55 1250 : std::string felix_netio_pagesize = config.property[FELIX_CLIENT_NETIO_PAGESIZE];
56 819 : unsigned netio_pagesize = felix_netio_pagesize == "" ? 0 : std::stoul(felix_netio_pagesize, 0, 0);
57 :
58 1250 : std::string bus_dir = config.property[FELIX_CLIENT_BUS_DIR];
59 1246 : bus_dir = bus_dir == "" ? "./bus" : bus_dir;
60 :
61 1250 : std::string bus_group_name = config.property[FELIX_CLIENT_BUS_GROUP_NAME];
62 1250 : bus_group_name = bus_group_name == "" ? "FELIX" : bus_group_name;
63 :
64 625 : client = new FelixClient(local_hostname,
65 : bus_dir,
66 : bus_group_name,
67 : level,
68 1250 : config.property[FELIX_CLIENT_VERBOSE_BUS] == "True",
69 : netio_pages,
70 3125 : netio_pagesize);
71 :
72 625 : if (config.on_init_callback) {
73 1250 : client->callback_on_init(config.on_init_callback);
74 : }
75 :
76 625 : if (config.on_data_callback) {
77 380 : client->callback_on_data(config.on_data_callback);
78 : }
79 :
80 625 : if (config.on_connect_callback) {
81 1234 : client->callback_on_connect(config.on_connect_callback);
82 : }
83 :
84 625 : if (config.on_disconnect_callback) {
85 1218 : client->callback_on_disconnect(config.on_disconnect_callback);
86 : }
87 :
88 625 : if (config.property[FELIX_CLIENT_THREAD_AFFINITY] != "") {
89 0 : client->set_thread_affinity(config.property[FELIX_CLIENT_THREAD_AFFINITY]);
90 : }
91 :
92 625 : thread = std::thread(&FelixClient::run, client);
93 :
94 1249 : while(!client->is_ready()) {
95 624 : usleep(100000);
96 : }
97 625 : }
98 :
99 1094 : FelixClientThreadImpl::~FelixClientThreadImpl() {
100 547 : client->stop();
101 547 : if (thread.joinable()) {
102 547 : thread.join();
103 : }
104 547 : delete client;
105 1094 : }
106 :
107 :
108 569 : void FelixClientThreadImpl::send_data(uint64_t fid, const uint8_t* data, size_t size, bool flush) {
109 569 : client->send_data(fid, data, size, flush);
110 521 : }
111 :
112 :
113 297 : void FelixClientThreadImpl::send_data(netio_tag_t fid, const std::vector<const uint8_t*>& msgs, const std::vector<size_t>& sizes) {
114 297 : client->send_data(fid, msgs, sizes);
115 297 : }
116 :
117 :
118 32 : void FelixClientThreadImpl::send_data_nb(uint64_t fid, const uint8_t* data, size_t size, bool flush) {
119 32 : client->send_data_nb(fid, data, size, flush);
120 32 : }
121 :
122 :
123 6 : void FelixClientThreadImpl::send_data_nb(netio_tag_t fid, const std::vector<const uint8_t*>& msgs, const std::vector<size_t>& sizes) {
124 6 : client->send_data_nb(fid, msgs, sizes);
125 6 : }
126 :
127 540 : void FelixClientThreadImpl::subscribe(uint64_t fid) {
128 540 : int status = client->subscribe(fid, timeoutms);
129 540 : if (status == FELIX_CLIENT_STATUS_TIMEOUT) {
130 2 : throw FelixClientResourceNotAvailableException();
131 : }
132 538 : }
133 :
134 22 : void FelixClientThreadImpl::subscribe(const std::vector<uint64_t>& fids) {
135 22 : int status = client->subscribe(fids, timeoutms);
136 22 : if (status == FELIX_CLIENT_STATUS_TIMEOUT) {
137 4 : throw FelixClientResourceNotAvailableException();
138 : }
139 18 : }
140 :
141 :
142 536 : void FelixClientThreadImpl::unsubscribe(uint64_t fid) {
143 536 : while(client->unsubscribe(fid) == FELIX_CLIENT_STATUS_CONNECTION_BUSY)
144 : {
145 0 : usleep(100);
146 : }
147 536 : }
148 :
149 94 : void FelixClientThreadImpl::exec(const UserFunction &user_function) {
150 94 : client->exec(user_function);
151 94 : }
152 :
153 40 : void FelixClientThreadImpl::init_send_data(uint64_t fid) {
154 40 : client->init_send_data(fid);
155 40 : }
156 :
157 392 : void FelixClientThreadImpl::init_subscribe(uint64_t fid) {
158 392 : client->init_subscribe(fid);
159 392 : }
160 :
161 40 : FelixClientThreadImpl::Status FelixClientThreadImpl::send_cmd(const std::vector<uint64_t>& fids, Cmd cmd, const std::vector<std::string>& cmd_args, std::vector<Reply>& replies) {
162 40 : return client->send_cmd(fids, cmd, cmd_args, replies);
163 : }
164 :
165 0 : void FelixClientThreadImpl::user_timer_start(unsigned long interval) {
166 0 : client->user_timer_start(interval);
167 0 : }
168 :
169 0 : void FelixClientThreadImpl::user_timer_stop() {
170 0 : client->user_timer_stop();
171 0 : }
172 :
173 0 : void FelixClientThreadImpl::callback_on_user_timer(OnUserTimerCallback on_user_timer_cb) {
174 0 : client->callback_on_user_timer(on_user_timer_cb);
175 0 : }
176 :
177 : extern "C" {
178 611 : FelixClientThreadInterface * create_felix_client(FelixClientThreadInterface::Config& config) {
179 611 : return new FelixClientThreadImpl(config);
180 : }
181 : }
|