Line data Source code
1 : #include <iostream>
2 : #include <unistd.h>
3 : #include <cstring>
4 : #include <list>
5 :
6 : #include "felix/felix_client_util.hpp"
7 :
8 : #include "felix/felix_client_properties.h"
9 : #include "felix/felix_client_exception.hpp"
10 : #include "felix/felix_client_thread_impl.hpp"
11 :
12 :
13 384 : FelixClientThreadImpl::FelixClientThreadImpl(Config& config) {
14 384 : if (config.property[FELIX_CLIENT_READ_ENV] == "True") {
15 0 : std::list<std::string> propertiesList = {
16 : FELIX_CLIENT_LOCAL_IP_OR_INTERFACE,
17 : FELIX_CLIENT_LOG_LEVEL,
18 : FELIX_CLIENT_BUS_DIR,
19 : FELIX_CLIENT_BUS_GROUP_NAME,
20 : FELIX_CLIENT_VERBOSE_BUS,
21 : FELIX_CLIENT_TIMEOUT,
22 : FELIX_CLIENT_NETIO_PAGES,
23 : FELIX_CLIENT_NETIO_PAGESIZE,
24 : FELIX_CLIENT_THREAD_AFFINITY,
25 0 : };
26 :
27 0 : std::string prefix = "FELIX_CLIENT_";
28 :
29 0 : for (const auto & property : propertiesList) {
30 0 : char* value = std::getenv((prefix + property).c_str());
31 0 : if (value != NULL) {
32 0 : config.property[property] = value;
33 : }
34 : }
35 0 : }
36 :
37 768 : std::string local_hostname = config.property[FELIX_CLIENT_LOCAL_IP_OR_INTERFACE];
38 768 : std::string log_lvl = config.property[FELIX_CLIENT_LOG_LEVEL];
39 384 : unsigned level = get_log_level(log_lvl);
40 768 : std::string timeout = config.property[FELIX_CLIENT_TIMEOUT];
41 384 : if (timeout == "") {
42 231 : timeout = "0";
43 : }
44 384 : try {
45 384 : timeoutms = std::stoul(timeout, 0, 0);
46 0 : } catch (std::invalid_argument const& error) {
47 0 : std::cout << "WARNING Timeout ignored: <" << timeout << ">\n";
48 0 : timeoutms = 0;
49 0 : }
50 :
51 768 : std::string felix_netio_pages = config.property[FELIX_CLIENT_NETIO_PAGES];
52 537 : unsigned netio_pages = felix_netio_pages == "" ? 0 : std::stoul(felix_netio_pages, 0, 0);
53 :
54 768 : std::string felix_netio_pagesize = config.property[FELIX_CLIENT_NETIO_PAGESIZE];
55 537 : unsigned netio_pagesize = felix_netio_pagesize == "" ? 0 : std::stoul(felix_netio_pagesize, 0, 0);
56 :
57 768 : std::string bus_dir = config.property[FELIX_CLIENT_BUS_DIR];
58 384 : bus_dir = bus_dir == "" ? "./bus" : bus_dir;
59 :
60 768 : std::string bus_group_name = config.property[FELIX_CLIENT_BUS_GROUP_NAME];
61 384 : bus_group_name = bus_group_name == "" ? "FELIX" : bus_group_name;
62 :
63 384 : client = new FelixClient(local_hostname,
64 : bus_dir,
65 : bus_group_name,
66 : level,
67 768 : config.property[FELIX_CLIENT_VERBOSE_BUS] == "True",
68 : netio_pages,
69 1152 : netio_pagesize);
70 :
71 384 : if (config.on_init_callback) {
72 768 : client->callback_on_init(config.on_init_callback);
73 : }
74 :
75 384 : if (config.on_data_callback) {
76 302 : client->callback_on_data(config.on_data_callback);
77 : }
78 :
79 384 : if (config.on_connect_callback) {
80 756 : client->callback_on_connect(config.on_connect_callback);
81 : }
82 :
83 384 : if (config.on_disconnect_callback) {
84 748 : client->callback_on_disconnect(config.on_disconnect_callback);
85 : }
86 :
87 384 : if (config.property[FELIX_CLIENT_THREAD_AFFINITY] != "") {
88 0 : client->set_thread_affinity(config.property[FELIX_CLIENT_THREAD_AFFINITY]);
89 : }
90 :
91 384 : thread = std::thread(&FelixClient::run, client);
92 :
93 768 : while(!client->is_ready()) {
94 384 : usleep(100000);
95 : }
96 384 : }
97 :
98 616 : FelixClientThreadImpl::~FelixClientThreadImpl() {
99 308 : client->stop();
100 308 : if (thread.joinable()) {
101 308 : thread.join();
102 : }
103 308 : delete client;
104 616 : }
105 :
106 :
107 14190 : void FelixClientThreadImpl::send_data(uint64_t fid, const uint8_t* data, size_t size, bool flush) {
108 14190 : client->send_data(fid, data, size, flush);
109 10275 : }
110 :
111 :
112 4 : void FelixClientThreadImpl::send_data(netio_tag_t fid, const std::vector<const uint8_t*>& msgs, const std::vector<size_t>& sizes) {
113 4 : client->send_data(fid, msgs, sizes);
114 4 : }
115 :
116 :
117 21 : void FelixClientThreadImpl::send_data_nb(uint64_t fid, const uint8_t* data, size_t size, bool flush) {
118 21 : client->send_data_nb(fid, data, size, flush);
119 21 : }
120 :
121 :
122 3 : void FelixClientThreadImpl::send_data_nb(netio_tag_t fid, const std::vector<const uint8_t*>& msgs, const std::vector<size_t>& sizes) {
123 3 : client->send_data_nb(fid, msgs, sizes);
124 3 : }
125 :
126 509 : void FelixClientThreadImpl::subscribe(uint64_t fid) {
127 509 : int status = client->subscribe(fid, timeoutms);
128 509 : if (status == FELIX_CLIENT_STATUS_TIMEOUT) {
129 2 : throw FelixClientResourceNotAvailableException();
130 : }
131 507 : }
132 :
133 22 : void FelixClientThreadImpl::subscribe(const std::vector<uint64_t>& fids) {
134 22 : int status = client->subscribe(fids, timeoutms);
135 22 : if (status == FELIX_CLIENT_STATUS_TIMEOUT) {
136 4 : throw FelixClientResourceNotAvailableException();
137 : }
138 18 : }
139 :
140 :
141 313 : void FelixClientThreadImpl::unsubscribe(uint64_t fid) {
142 313 : while(client->unsubscribe(fid) == FELIX_CLIENT_STATUS_CONNECTION_BUSY)
143 : {
144 0 : usleep(100);
145 : }
146 313 : }
147 :
148 94 : void FelixClientThreadImpl::exec(const UserFunction &user_function) {
149 94 : client->exec(user_function);
150 94 : }
151 :
152 25 : void FelixClientThreadImpl::init_send_data(uint64_t fid) {
153 25 : client->init_send_data(fid);
154 25 : }
155 :
156 391 : void FelixClientThreadImpl::init_subscribe(uint64_t fid) {
157 391 : client->init_subscribe(fid);
158 391 : }
159 :
160 20 : FelixClientThreadImpl::Status FelixClientThreadImpl::send_cmd(const std::vector<uint64_t>& fids, Cmd cmd, const std::vector<std::string>& cmd_args, std::vector<Reply>& replies) {
161 20 : return client->send_cmd(fids, cmd, cmd_args, replies);
162 : }
163 :
164 0 : void FelixClientThreadImpl::user_timer_start(unsigned long interval) {
165 0 : client->user_timer_start(interval);
166 0 : }
167 :
168 0 : void FelixClientThreadImpl::user_timer_stop() {
169 0 : client->user_timer_stop();
170 0 : }
171 :
172 0 : void FelixClientThreadImpl::callback_on_user_timer(OnUserTimerCallback on_user_timer_cb) {
173 0 : client->callback_on_user_timer(on_user_timer_cb);
174 0 : }
175 :
176 : extern "C" {
177 376 : FelixClientThreadInterface * create_felix_client(FelixClientThreadInterface::Config& config) {
178 376 : return new FelixClientThreadImpl(config);
179 : }
180 : }
|