Line data Source code
1 : #include "felix/felix_client.hpp"
2 : #include "felix/felix_client_exception.hpp"
3 : #include "felix/felix_client_util.hpp"
4 : #include "felix/felix_client_thread.hpp"
5 :
6 : #include "felix/felix_fid.h"
7 :
8 : #include "clog.h"
9 :
10 : #include "netio/netio.h"
11 : #include "netio/netio_tcp.h"
12 :
13 : extern "C" {
14 : #include "jWrite.h"
15 : }
16 :
17 : #include <algorithm>
18 : #include <uuid/uuid.h>
19 : #include <unordered_set>
20 : #include <numeric>
21 : #include <regex>
22 : #include <unistd.h>
23 : #include <set>
24 :
25 419 : FelixClient::FelixClient(const std::string& local_ip_or_interface,
26 : std::string bus_dir,
27 : std::string bus_group_name,
28 : unsigned log_level,
29 : bool verbose_bus,
30 : unsigned netio_pages,
31 419 : unsigned netio_pagesize) :
32 419 : ready(false),
33 419 : timer_delay_millis(0),
34 419 : ctx({0}),
35 419 : user_timer({0}),
36 419 : unbuffered_netio_pages(netio_pages),
37 419 : unbuffered_netio_pagesize(netio_pagesize),
38 838 : local_ip_address(get_ip_from_interface(local_ip_or_interface)) {
39 :
40 : // // handle here to make sure trace logging looks ok
41 : // local_ip_address = get_ip_from_interface(local_ip_or_interface);
42 :
43 419 : if (local_ip_address == "") {
44 0 : clog_fatal("Could not deduce the local ip address from %s", local_ip_or_interface.c_str());
45 0 : throw FelixClientException("felix-client: Could not deduce the local ip address from local_ip_or_interface");
46 : }
47 :
48 419 : clog_info("Local IP: %s", local_ip_address.c_str());
49 :
50 419 : clog_info("Initializing netio-next");
51 419 : netio_init(&ctx);
52 :
53 419 : clog_set_level(log_level);
54 419 : netio_set_debug_level(log_level);
55 :
56 : // init callback
57 419 : ctx.evloop.cb_init = on_init_eventloop;
58 419 : ctx.evloop.data = this;
59 :
60 419 : contextHandler = std::make_unique<FelixClientContextHandler>();
61 :
62 :
63 419 : send_signal = netio_signal();//(struct netio_signal*)malloc(sizeof(struct netio_signal)); // c style
64 :
65 419 : netio_signal_init(&ctx.evloop, &send_signal);
66 419 : send_signal.data = this;
67 419 : send_signal.cb = on_send_signal;
68 :
69 :
70 419 : sub_signal = netio_signal();
71 419 : netio_signal_init(&ctx.evloop, &sub_signal);
72 419 : sub_signal.data = this;
73 419 : sub_signal.cb = on_sub_signal;
74 :
75 : // setup timer for reconnection/resubscription
76 419 : netio_timer_init(&ctx.evloop, &subscribe_timer);
77 419 : subscribe_timer.cb = on_timer;
78 419 : subscribe_timer.data = this;
79 419 : terminating_felix_client = false;
80 419 : contextHandler->terminatingFelixClient = false;
81 :
82 419 : user_timer_init();
83 :
84 419 : clog_info("Initializing felix-bus");
85 838 : bus.set_path(bus_dir);
86 :
87 419 : bus.set_groupname(bus_group_name);
88 419 : bus.set_verbose(verbose_bus);
89 419 : }
90 :
91 0 : std::vector<int> FelixClient::parseCpuRange(const std::string &range) {
92 0 : static const std::regex re(R"(((\d+)-(\d+))|(\d+))");
93 0 : std::smatch sm;
94 0 : std::vector<int> data;
95 0 : for (std::string s(range); std::regex_search(s, sm, re); s = sm.suffix()) {
96 0 : if (sm[1].str().empty()) {
97 0 : data.push_back(std::stoi(sm[0]));
98 : } else {
99 0 : for (int i = std::stoi(sm[2]); i <= std::stoi(sm[3]); ++i) {
100 0 : data.push_back(i);
101 : }
102 : }
103 0 : }
104 0 : return data;
105 0 : }
106 :
107 0 : void FelixClient::set_thread_affinity(const std::string& affinity) {
108 0 : this->affinity = affinity;
109 0 : }
110 :
111 308 : FelixClient::~FelixClient(){
112 308 : clog_info("Cleaning up FelixClient.");
113 431 : for(auto& it : unbuf_mem_regions){
114 123 : clog_debug("Removing mem reg: 0x%x. There are %d mem regions remaining.", it.second.mr, unbuf_mem_regions.size());
115 123 : it.second.size = 0;
116 123 : free(it.second.data);
117 : }
118 308 : }
119 :
120 : // event loop control
121 419 : void FelixClient::run() {
122 419 : if (affinity != "") {
123 0 : std::vector<int> cpus = parseCpuRange(affinity);
124 0 : if (!cpus.empty()) {
125 0 : cpu_set_t pmask;
126 0 : CPU_ZERO(&pmask);
127 0 : for (int cpu : cpus) {
128 0 : CPU_SET(cpu, &pmask);
129 : }
130 0 : int s = sched_setaffinity(0, sizeof(pmask), &pmask);
131 0 : if (s != 0) {
132 0 : clog_error("Setting affinity to CPU rage [%s] failed with error = %d", affinity.c_str(), s);
133 : }
134 : }
135 0 : }
136 419 : pthread_setname_np(pthread_self(), "felix-client-thread");
137 :
138 419 : evloop_thread_id = std::this_thread::get_id();
139 419 : clog_info("In run: 0x%x", evloop_thread_id);
140 419 : clog_info("Running netio_run");
141 419 : netio_run(&ctx.evloop);
142 318 : clog_info("Finished netio_run");
143 318 : }
144 :
145 423 : void FelixClient::stop() {
146 423 : clog_info("Stopping netio");
147 423 : terminating_felix_client = true;
148 423 : contextHandler->terminatingFelixClient = true;
149 423 : netio_timer_close(&ctx.evloop, &subscribe_timer);
150 423 : netio_timer_close(&ctx.evloop, &user_timer);
151 :
152 : //unsubscribing all elinks
153 423 : std::vector<uint64_t> fids_to_unsubscribe = contextHandler->getFidsToUnsubscribe();
154 423 : clog_debug("Closing %d subscriptions", fids_to_unsubscribe.size());
155 2323 : for(auto fid : fids_to_unsubscribe){
156 1900 : if(evloop_thread_id == std::this_thread::get_id()){
157 35 : send_unsubscribe_msg(fid);
158 : } else {
159 1865 : unsubscribe(fid);
160 : }
161 : }
162 :
163 : uint timeout = 1000;
164 : uint t_expired = 0;
165 : uint span = 25U;
166 :
167 :
168 6170 : while (!contextHandler->areAllFidsUnsubscribed() && (t_expired <= timeout)){
169 5747 : std::this_thread::sleep_for(std::chrono::milliseconds(span));
170 5747 : t_expired += span;
171 :
172 : }
173 423 : if (timeout <= t_expired){
174 139 : clog_error("Timeout during stopping netio, not all subscriptions have been closed. %d subscriptions remain open.", contextHandler->getFidsToUnsubscribe().size());
175 : }
176 :
177 :
178 423 : std::vector<netio_send_socket*> send_sockets = contextHandler->getSendSockets();
179 423 : total_number_send_sockets.store(send_sockets.size());
180 423 : clog_debug("Closing netio %d send sockets.", total_number_send_sockets.load());
181 687 : for(auto& socket : send_sockets){
182 264 : netio_disconnect(socket);
183 : }
184 :
185 : t_expired = 0;
186 685 : while ((total_number_send_sockets.load(std::memory_order_relaxed) != 0) && (t_expired <= timeout)){
187 262 : std::this_thread::sleep_for(std::chrono::milliseconds(span));
188 262 : t_expired += span;
189 : }
190 423 : if (timeout <= t_expired){
191 0 : clog_error("Timeout during stopping netio, not all send sockets have been disconnected, %d remain open.", total_number_send_sockets.load());
192 : }
193 423 : netio_signal_close(&ctx.evloop, &send_signal);
194 423 : netio_signal_close(&ctx.evloop, &sub_signal);
195 423 : netio_terminate_signal(&ctx.evloop);
196 423 : }
197 :
198 14207 : int FelixClient::send_evloop(netio_tag_t fid, ToFlxHeader header, const uint8_t* data, size_t size, bool flush, std::promise<uint8_t> prom_send_done){
199 14207 : if(contextHandler->getSocketState(fid) == DISCONNECTED){
200 0 : clog_error("Unbuffered socket is disconnected. Thread: 0x%x FID: 0x%x", std::this_thread::get_id(), fid);
201 0 : std::exception_ptr e = std::make_exception_ptr(FelixClientSendConnectionException());
202 0 : prom_send_done.set_exception(e);
203 0 : return 1;
204 0 : }
205 :
206 14207 : int status = NETIO_STATUS_OK;
207 :
208 14207 : if (contextHandler->getType(fid) == BUFFERED) {
209 14059 : netio_buffered_send_socket* buffered_socket = get_buffered_send_socket(fid);
210 14059 : if(!contextHandler->exists(static_cast<void*>(buffered_socket))){return NETIO_STATUS_OK;}
211 14059 : size_t len = sizeof(header) + size;
212 14059 : uint8_t* buf = (uint8_t*)malloc(len);
213 14059 : memcpy(buf, &header, sizeof(header));
214 14059 : memcpy(buf + sizeof(header), data, size);
215 14059 : clog_debug("Sending (buffered) for 0x%x, i.e. tag %d, socket 0x%p", fid, header.elink, buffered_socket);
216 14059 : status = netio_buffered_send(buffered_socket, (void*)buf, len);
217 :
218 14059 : clog_debug("Status %d", status);
219 14059 : free(buf);
220 :
221 14059 : if (flush) {
222 14059 : netio_buffered_flush(buffered_socket);
223 : }
224 :
225 : } else {
226 :
227 148 : netio_send_socket* unbuffered_socket = get_unbuffered_send_socket(fid);
228 148 : std::string addr = contextHandler->getAddress(fid);
229 148 : clog_debug("Begin sending process. Buffer ptr: 0x%p", unbuf_mem_regions[addr].data);
230 148 : memcpy(unbuf_mem_regions[addr].data, &header, sizeof(header));
231 148 : memcpy((uint8_t*)unbuf_mem_regions[addr].data + sizeof(header), data, size);
232 148 : clog_debug("Memcopy done. Total size: %d (header: %d + size: %d)", sizeof(header) + size, sizeof(header), size);
233 148 : if(!contextHandler->exists(static_cast<void*>(unbuffered_socket))){return NETIO_STATUS_OK;}
234 148 : status = netio_send(unbuffered_socket, &unbuf_mem_regions[addr], unbuf_mem_regions[addr].data, sizeof(header) + size, (uint64_t)&unbuf_mem_regions[addr]);
235 148 : }
236 14207 : prom_send_done.set_value(status);
237 14207 : return status;
238 : }
239 :
240 :
241 :
242 :
243 :
244 :
245 :
246 7 : int FelixClient::send_evloop(netio_tag_t fid, const std::vector<const uint8_t*>& msgs, const std::vector<size_t>& sizes, std::promise<uint8_t> prom_send_done){
247 :
248 7 : if(contextHandler->getSocketState(fid) == DISCONNECTED){
249 0 : clog_error("Unbuffered socket is disconnected. Thread: 0x%x FID: 0x%x", std::this_thread::get_id(), fid);
250 0 : std::exception_ptr e = std::make_exception_ptr(FelixClientSendConnectionException());
251 0 : prom_send_done.set_exception(e);
252 0 : return 1;
253 0 : }
254 :
255 7 : int status = NETIO_STATUS_OK;
256 :
257 7 : if (contextHandler->getType(fid) == BUFFERED) {
258 3 : netio_buffered_send_socket* buffered_socket = get_buffered_send_socket(fid);
259 3 : clog_debug("Socket = 0x%x with state: %d", buffered_socket, contextHandler->getSocketState(fid));
260 3 : if(!contextHandler->exists(static_cast<void*>(buffered_socket))){return NETIO_STATUS_OK;}
261 : //Adding up messages
262 3 : ToFlxHeader header;
263 6 : size_t totalSize = accumulate(sizes.begin(), sizes.end(), 0) + sizeof(header) * msgs.size();
264 3 : uint8_t* buf = (uint8_t*)malloc(totalSize);
265 3 : size_t currentSize = 0;
266 :
267 12 : for(unsigned i = 0; i < msgs.size(); i++){
268 :
269 9 : header.length = sizes[i];
270 9 : header.reserved = 0;
271 9 : header.elink = get_elink(fid);
272 :
273 9 : memcpy(buf + currentSize, &header, sizeof(header));
274 9 : memcpy(buf + currentSize + sizeof(header), msgs[i], sizes[i]);
275 :
276 9 : currentSize += sizeof(header) + sizes[i];
277 : }
278 :
279 3 : clog_debug("Sending (buffered) for 0x%x", fid);
280 3 : clog_debug("Sending %d messages with the total size of %d", msgs.size(), totalSize);
281 3 : {
282 3 : status = netio_buffered_send(buffered_socket, (void*)buf, totalSize);
283 3 : clog_debug("Status %d", status);
284 3 : free(buf);
285 3 : netio_buffered_flush(buffered_socket);
286 : }
287 : } else {
288 : // Unbuffered
289 4 : std::string addr = contextHandler->getAddress(fid);
290 4 : netio_send_socket* unbuffered_socket = get_unbuffered_send_socket(fid);
291 4 : if(!contextHandler->exists(static_cast<void*>(unbuffered_socket))){return NETIO_STATUS_OK;}
292 4 : ToFlxHeader header;
293 8 : size_t totalSize = accumulate(sizes.begin(), sizes.end(), 0) + sizeof(header) * msgs.size();
294 4 : size_t currentSize = 0;
295 :
296 16 : for(unsigned i = 0; i < msgs.size(); i++){
297 :
298 12 : header.length = sizes[i];
299 12 : header.reserved = 0;
300 12 : header.elink = get_elink(fid);
301 :
302 12 : memcpy((uint8_t*)unbuf_mem_regions[addr].data + currentSize, &header, sizeof(header));
303 12 : memcpy((uint8_t*)unbuf_mem_regions[addr].data + currentSize + sizeof(header), msgs[i], sizes[i]);
304 :
305 12 : currentSize += sizeof(header) + sizes[i];
306 : }
307 :
308 4 : clog_debug("Sending (unbuffered) for 0x%x", fid);
309 4 : clog_debug("Sending %d messages with the total size of %d", msgs.size(), totalSize);
310 :
311 4 : {
312 4 : status = netio_send(unbuffered_socket, &unbuf_mem_regions[addr], unbuf_mem_regions[addr].data, totalSize, (uint64_t)&unbuf_mem_regions[addr]);
313 : }
314 :
315 4 : }
316 7 : prom_send_done.set_value(status);
317 7 : return status;
318 :
319 : }
320 :
321 :
322 :
323 :
324 :
325 :
326 :
327 329 : void FelixClient::establish_send_connection(uint64_t fid, std::promise<uint8_t> prom_sub_done){
328 329 : if (contextHandler->getType(fid) == BUFFERED) {
329 173 : netio_buffered_send_socket* buffered_socket = get_buffered_send_socket(fid);
330 173 : clog_debug("Buffered socket = 0x%p with state: %d", buffered_socket, contextHandler->getSocketState(fid));
331 173 : if(contextHandler->getSocketState(fid) == CONNECTING || contextHandler->getSocketState(fid) == CONNECTED){
332 173 : prom_sub_done.set_value(0);
333 173 : return;
334 : } else {
335 0 : clog_error("Buffered socket is disconnected. Evloop thread: 0x%x FID: 0x%x state is %d", std::this_thread::get_id(), fid, contextHandler->getSocketState(fid));
336 0 : std::exception_ptr e = std::make_exception_ptr(FelixClientSendConnectionException());
337 0 : prom_sub_done.set_exception(e);
338 0 : return;
339 0 : }
340 : } else {
341 156 : netio_send_socket* unbuffered_socket = get_unbuffered_send_socket(fid);
342 156 : clog_debug("Unbuffered socket = 0x%x with state: %d", unbuffered_socket, contextHandler->getSocketState(fid));
343 156 : if(contextHandler->getSocketState(fid) == CONNECTING || contextHandler->getSocketState(fid) == CONNECTED){
344 156 : prom_sub_done.set_value(0);
345 156 : return;
346 : } else {
347 0 : clog_error("Unbuffered socket is disconnected. Evloop thread: 0x%x FID: 0x%x", std::this_thread::get_id(), fid);
348 0 : std::exception_ptr e = std::make_exception_ptr(FelixClientSendConnectionException());
349 0 : prom_sub_done.set_exception(e);
350 0 : return;
351 0 : }
352 : }
353 :
354 : }
355 :
356 :
357 :
358 14211 : void FelixClient::send_data(netio_tag_t fid, const uint8_t* data, size_t size, bool flush) {
359 14211 : if(terminating_felix_client.load()){
360 0 : clog_debug("Already terminating, should not send data");
361 0 : return;
362 : }
363 :
364 14211 : try{
365 14211 : contextHandler->createOrUpdateInfo(fid, &bus, false);
366 1 : } catch (std::exception& e) {
367 1 : clog_error("Problem while reading bus: %s", e.what());
368 1 : throw FelixClientSendConnectionException();
369 1 : }
370 :
371 14210 : clog_debug("Address for FID 0x%x; %s:%d", fid, contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid));
372 :
373 14210 : if (!contextHandler->exists(fid)) {
374 0 : clog_error("Invalid Bus entry");
375 0 : throw FelixClientSendConnectionException();
376 : }
377 :
378 14210 : if(evloop_thread_id == std::this_thread::get_id()){
379 0 : send_data_intern(fid, data, size, flush);
380 0 : return;
381 : }
382 :
383 : //If there is no connection, we have to establish a new one through the evloop
384 14210 : if (contextHandler->getSocketState(fid) == DISCONNECTED || contextHandler->getSubscriptionState(fid) == TIMEOUT){ //What about CONNECTING?
385 276 : std::promise<uint8_t> prom_sub_done;
386 276 : std::future<uint8_t> future_sub_done = prom_sub_done.get_future();
387 276 : contextHandler->pushSendTask(SendConnectEvent{fid, std::move(prom_sub_done)});
388 276 : netio_signal_fire(&send_signal);
389 276 : try{
390 276 : std::chrono::milliseconds timeout (5000);
391 276 : if (future_sub_done.wait_for(timeout)==std::future_status::timeout){
392 0 : clog_error("Timeout to establish send connection expired, stop blocking to prevent deadlock.");
393 0 : throw FelixClientResourceNotAvailableException();
394 : }
395 276 : future_sub_done.get();
396 0 : } catch(std::exception& e){
397 0 : clog_info("Exception in evloop thread: %s thread ID: 0x%x FID: 0x%x", e.what(), std::this_thread::get_id(), fid);
398 0 : throw;
399 0 : }
400 276 : }
401 :
402 14210 : if(contextHandler->getSocketState(fid) == CONNECTING){
403 270 : if(!contextHandler->waitConnected(fid, 2000)){
404 24 : clog_error("Socket is not connected. Instead: %d on thread: 0x%x FID: 0x%x", contextHandler->getSocketState(fid), std::this_thread::get_id(), fid);
405 24 : throw FelixClientSendConnectionException();
406 : }
407 : }
408 :
409 14186 : ToFlxHeader header;
410 14186 : header.length = size;
411 14186 : header.reserved = 0;
412 14186 : header.elink = get_elink(fid);
413 14186 : int status = NETIO_STATUS_OK;
414 :
415 : //send
416 14186 : std::promise<uint8_t> prom_send_done;
417 14186 : std::future<uint8_t> future_send_done = prom_send_done.get_future();
418 :
419 18077 : contextHandler->pushSendTask(SendDataEvent{fid, header, data, size, flush, std::move(prom_send_done)});
420 14186 : netio_signal_fire(&send_signal);
421 :
422 : // wait for promise
423 :
424 14186 : try{
425 14186 : std::chrono::milliseconds timeout (5000);
426 14186 : if (future_send_done.wait_for(timeout)==std::future_status::timeout){
427 0 : clog_error("Send timeout expired, stop blocking to prevent deadlock."); //No information if send we executed later. Potentially data is corrupted from now on since function will return.
428 0 : throw FelixClientException("Send timeout expired");
429 : }
430 14186 : status = future_send_done.get();
431 0 : } catch(std::exception& e){
432 0 : clog_info("Exception in evloop thread: %s thread ID: 0x%x FID: 0x%x", e.what(), std::this_thread::get_id(), fid);
433 0 : throw;
434 0 : }
435 :
436 14186 : switch(status) {
437 10295 : case NETIO_STATUS_OK: return;
438 0 : case NETIO_STATUS_TOO_BIG: throw FelixClientMessageTooBigException();
439 3891 : case NETIO_STATUS_AGAIN: throw FelixClientResourceNotAvailableException();
440 0 : case NETIO_STATUS_PARTIAL: throw FelixClientPartiallyCompletedException();
441 0 : default: throw FelixClientException("Unknown status code: " + status);
442 : }
443 14186 : }
444 :
445 4 : void FelixClient::send_data(netio_tag_t fid, const std::vector<const uint8_t*>& msgs, const std::vector<size_t>& sizes){
446 4 : if(terminating_felix_client.load()){
447 0 : clog_debug("Already terminating, should not send data");
448 0 : return;
449 : }
450 :
451 4 : try{
452 4 : contextHandler->createOrUpdateInfo(fid, &bus, false);
453 0 : } catch (std::exception& e) {
454 0 : clog_error("Problem while reading bus: %s", e.what());
455 0 : throw FelixClientSendConnectionException();
456 0 : }
457 :
458 4 : if (!contextHandler->exists(fid)) {
459 0 : clog_error("Invalid Bus entry ");
460 0 : throw FelixClientSendConnectionException();
461 : }
462 :
463 4 : clog_info("Address %s", contextHandler->getAddress(fid).c_str());
464 4 : if (msgs.size() != sizes.size()){
465 0 : FelixClientException("Error! Number of messages differs from number of sizes");
466 : }
467 :
468 4 : if(evloop_thread_id == std::this_thread::get_id()){
469 0 : send_data_intern(fid, msgs, sizes);
470 0 : return;
471 : }
472 :
473 4 : if (contextHandler->getSocketState(fid) == DISCONNECTED || contextHandler->getSubscriptionState(fid) == TIMEOUT){ //What about CONNECTIONG?
474 4 : std::promise<uint8_t> prom_sub_done;
475 4 : std::future<uint8_t> future_sub_done = prom_sub_done.get_future();
476 4 : contextHandler->pushSendTask(SendConnectEvent{fid, std::move(prom_sub_done)});
477 4 : netio_signal_fire(&send_signal);
478 4 : try{
479 4 : std::chrono::milliseconds timeout (5000);
480 4 : if (future_sub_done.wait_for(timeout)==std::future_status::timeout){
481 0 : clog_error("Timeout to establish send connection expired, stop blocking to prevent deadlock.");
482 0 : throw FelixClientResourceNotAvailableException();
483 : }
484 4 : future_sub_done.get();
485 0 : } catch(std::exception& e){
486 0 : clog_info("Exception in evloop thread: %s thread ID: 0x%x FID: 0x%x", e.what(), std::this_thread::get_id(), fid);
487 0 : throw;
488 0 : }
489 4 : }
490 :
491 4 : if(contextHandler->getSocketState(fid) == CONNECTING){
492 4 : if(!contextHandler->waitConnected(fid, 2000)){
493 0 : clog_error("Socket is not connected. Instead: %d on thread: 0x%x FID: 0x%x", contextHandler->getSocketState(fid), std::this_thread::get_id(), fid);
494 0 : throw FelixClientSendConnectionException();
495 : }
496 : }
497 :
498 4 : std::promise<uint8_t> prom_send_done;
499 4 : std::future<uint8_t> future_send_done = prom_send_done.get_future();
500 4 : contextHandler->pushSendTask(SendDataVectorEvent{fid, msgs, sizes, std::move(prom_send_done)});
501 4 : netio_signal_fire(&send_signal);
502 :
503 4 : int status = NETIO_STATUS_OK;
504 : // wait for promise
505 :
506 4 : try{
507 4 : std::chrono::milliseconds timeout (5000);
508 4 : if (future_send_done.wait_for(timeout)==std::future_status::timeout){
509 0 : clog_error("Send timeout expired, stop blocking to prevent deadlock."); //No information if send we executed later. Potentially data is corrupted from now on since function will return.
510 0 : throw FelixClientException("Send timeout expired");
511 : }
512 4 : status = future_send_done.get();
513 0 : } catch(std::exception& e){
514 0 : clog_info("Exception in evloop thread: %s thread ID: 0x%x FID: 0x%x", e.what(), std::this_thread::get_id(), fid);
515 0 : throw;
516 0 : }
517 :
518 4 : switch(status) {
519 4 : case NETIO_STATUS_OK: return;
520 0 : case NETIO_STATUS_TOO_BIG: throw FelixClientMessageTooBigException();
521 0 : case NETIO_STATUS_AGAIN: throw FelixClientResourceNotAvailableException();
522 0 : case NETIO_STATUS_PARTIAL: throw FelixClientPartiallyCompletedException();
523 0 : default: throw FelixClientException("Unknown status code: " + status);
524 : }
525 4 : }
526 :
527 :
528 21 : void FelixClient::send_data_nb(netio_tag_t fid, const uint8_t* data, size_t size, bool flush) {
529 21 : if(terminating_felix_client.load()){
530 0 : clog_debug("Already terminating, should not send data");
531 0 : return;
532 : }
533 :
534 21 : if (!contextHandler->exists(fid) || contextHandler->getSocketState(fid) != CONNECTED) {
535 0 : clog_error("No corresponding send connection for this FID. Have you tried init_send_data?");
536 0 : throw FelixClientSendConnectionException();
537 : }
538 :
539 21 : std::vector<uint8_t> data_copy(data, data+size);
540 :
541 21 : ToFlxHeader header;
542 21 : header.length = size;
543 21 : header.reserved = 0;
544 21 : header.elink = get_elink(fid);
545 :
546 : //send
547 21 : std::promise<uint8_t> prom_send_done;
548 :
549 21 : contextHandler->pushSendTask(SendDataNbEvent{fid, header, std::move(data_copy), size, flush, std::move(prom_send_done)});
550 21 : netio_signal_fire(&send_signal);
551 21 : }
552 :
553 3 : void FelixClient::send_data_nb(netio_tag_t fid, const std::vector<const uint8_t*>& msgs, const std::vector<size_t>& sizes){
554 3 : if(terminating_felix_client.load()){
555 0 : clog_debug("Already terminating, should not send data");
556 0 : return;
557 : }
558 :
559 3 : if (!contextHandler->exists(fid) || contextHandler->getSocketState(fid) != CONNECTED) {
560 0 : clog_error("No corresponding send connection for this FID. Have you tried init_send_data?");
561 0 : throw FelixClientSendConnectionException();
562 : }
563 :
564 3 : if (msgs.size() != sizes.size()){
565 0 : FelixClientException("Error! Number of messages differs from number of sizes");
566 : }
567 :
568 3 : std::vector<std::vector<uint8_t>> data;
569 3 : std::vector<const uint8_t*> msgs_copy;
570 12 : for (size_t i = 0; i < msgs.size(); i++){
571 9 : data.emplace_back(msgs[i], msgs[i]+sizes[i]);
572 9 : msgs_copy.push_back(&data[i][0]);
573 : }
574 :
575 3 : std::vector<uint64_t> size_copy(sizes);
576 :
577 3 : std::promise<uint8_t> prom_send_done;
578 3 : contextHandler->pushSendTask(SendDataVectorNbEvent{fid, std::move(data), std::move(msgs_copy), std::move(size_copy), std::move(prom_send_done)});
579 3 : netio_signal_fire(&send_signal);
580 3 : }
581 :
582 :
583 : //We need non-blocking version of send data for tests that only run FelixClient instead of FelixClientThread
584 0 : void FelixClient::send_data_intern(netio_tag_t fid, const uint8_t* data, size_t size, bool flush) {
585 :
586 0 : ToFlxHeader header;
587 0 : header.length = size;
588 0 : header.reserved = 0;
589 0 : header.elink = get_elink(fid);
590 0 : int status = NETIO_STATUS_OK;
591 0 : std::promise<uint8_t> dummy_prom_sub;
592 0 : std::promise<uint8_t> dummy_prom_send;
593 :
594 0 : if((contextHandler->getSocketState(fid) != CONNECTING) || (contextHandler->getSocketState(fid) != CONNECTED)){
595 0 : try{
596 0 : establish_send_connection(fid, std::move(dummy_prom_sub));
597 0 : } catch(std::exception& e){
598 0 : clog_info("Exception while establishing connection: %s thread ID: 0x%x FID: 0x%x", e.what(), std::this_thread::get_id(), fid);
599 0 : throw;
600 0 : }
601 : }
602 :
603 0 : if(contextHandler->getSocketState(fid) == CONNECTING){
604 0 : contextHandler->pushSendTask(SendDataInternEvent{fid, header, data, size, flush, std::move(dummy_prom_send),1});
605 0 : netio_signal_fire(&send_signal);
606 : return;
607 : }
608 :
609 0 : try{
610 0 : status = send_evloop(fid, header, data, size, flush, std::move(dummy_prom_send));
611 0 : } catch(std::exception& e){
612 0 : clog_info("Exception while sending: %s thread ID: 0x%x FID: 0x%x", e.what(), std::this_thread::get_id(), fid);
613 0 : throw;
614 0 : }
615 :
616 0 : switch(status) {
617 : case NETIO_STATUS_OK: return;
618 0 : case NETIO_STATUS_TOO_BIG: throw FelixClientMessageTooBigException();
619 0 : case NETIO_STATUS_AGAIN: throw FelixClientResourceNotAvailableException();
620 0 : case NETIO_STATUS_PARTIAL: throw FelixClientPartiallyCompletedException();
621 0 : default: throw FelixClientException("Unknown status code: " + status);
622 : }
623 0 : }
624 :
625 :
626 : //We need non-blocking version of send data for tests that only run FelixClient instead of FelixClientThread
627 0 : void FelixClient::send_data_intern(netio_tag_t fid, const std::vector<const uint8_t*>& msgs, const std::vector<size_t>& sizes) {
628 :
629 0 : int status = NETIO_STATUS_OK;
630 0 : std::promise<uint8_t> dummy_prom_sub;
631 0 : std::promise<uint8_t> dummy_prom_send;
632 :
633 0 : if((contextHandler->getSocketState(fid) != CONNECTING) || (contextHandler->getSocketState(fid) != CONNECTED)){
634 0 : try{
635 0 : establish_send_connection(fid, std::move(dummy_prom_sub));
636 0 : } catch(std::exception& e){
637 0 : clog_info("Exception while establishing connection: %s thread ID: 0x%x FID: 0x%x", e.what(), std::this_thread::get_id(), fid);
638 0 : throw;
639 0 : }
640 : }
641 :
642 0 : if(contextHandler->getSocketState(fid) == CONNECTING){
643 0 : contextHandler->pushSendTask(SendDataInternVectorEvent{fid, msgs, sizes, std::move(dummy_prom_send), 1});
644 0 : netio_signal_fire(&send_signal);
645 : return;
646 : }
647 :
648 0 : try{
649 0 : status = send_evloop(fid, msgs, sizes, std::move(dummy_prom_send));
650 0 : } catch(std::exception& e){
651 0 : clog_info("Exception while sending: %s thread ID: 0x%x FID: 0x%x", e.what(), std::this_thread::get_id(), fid);
652 0 : throw;
653 0 : }
654 :
655 0 : switch(status) {
656 : case NETIO_STATUS_OK: return;
657 0 : case NETIO_STATUS_TOO_BIG: throw FelixClientMessageTooBigException();
658 0 : case NETIO_STATUS_AGAIN: throw FelixClientResourceNotAvailableException();
659 0 : case NETIO_STATUS_PARTIAL: throw FelixClientPartiallyCompletedException();
660 0 : default: throw FelixClientException("Unknown status code: " + status);
661 : }
662 0 : }
663 :
664 :
665 14519 : void FelixClient::exec_send(){
666 14519 : send_var_t task;
667 14519 : contextHandler->pullSendTask(task);
668 :
669 14519 : std::visit(overloaded {
670 0 : [](std::monostate const&) {
671 0 : clog_info("Nothing to send. Send task queue is empty.");
672 : },
673 14186 : [this](SendDataEvent& sendData) {
674 14186 : this->send_evloop(sendData.fid, sendData.header, sendData.data, sendData.size, sendData.flush, std::move(sendData.prom_send_done));
675 14186 : },
676 21 : [this](SendDataNbEvent& sendNbData) {
677 21 : this->send_evloop(sendNbData.fid, sendNbData.header, &sendNbData.data[0], sendNbData.size, sendNbData.flush, std::move(sendNbData.prom_send_done));
678 21 : },
679 4 : [this](SendDataVectorEvent& sendDataVector) {
680 4 : this->send_evloop(sendDataVector.fid, sendDataVector.msgs, sendDataVector.sizes, std::move(sendDataVector.prom_send_done));
681 4 : },
682 3 : [this](SendDataVectorNbEvent& sendDataNbVector) {
683 3 : this->send_evloop(sendDataNbVector.fid, sendDataNbVector.msgs, sendDataNbVector.sizes, std::move(sendDataNbVector.prom_send_done));
684 3 : },
685 305 : [this](SendConnectEvent& sendConnect) {
686 305 : this->establish_send_connection(sendConnect.fid, std::move(sendConnect.prom_sub_done));
687 305 : },
688 0 : [this](SendDataInternEvent& sendData) {
689 0 : if(contextHandler->getSocketState(sendData.fid) == CONNECTED){
690 0 : this->send_evloop(sendData.fid, sendData.header, sendData.data, sendData.size, sendData.flush, std::move(sendData.prom_send_done));
691 : }else{
692 0 : if(sendData.count< maxSendAttempts){
693 0 : contextHandler->pushSendTask(SendDataInternEvent{sendData.fid, sendData.header, sendData.data, sendData.size, sendData.flush, std::move(sendData.prom_send_done), ++sendData.count});
694 : }
695 : }
696 0 : },
697 0 : [this](SendDataInternVectorEvent& sendDataVector) {
698 0 : if(contextHandler->getSocketState(sendDataVector.fid) == CONNECTED){
699 0 : this->send_evloop(sendDataVector.fid, sendDataVector.msgs, sendDataVector.sizes, std::move(sendDataVector.prom_send_done));
700 : }else{
701 0 : if(sendDataVector.count < maxSendAttempts){
702 0 : contextHandler->pushSendTask(SendDataInternVectorEvent{sendDataVector.fid, sendDataVector.msgs, sendDataVector.sizes, std::move(sendDataVector.prom_send_done), ++sendDataVector.count});
703 : }
704 : }
705 0 : },
706 : }, task);
707 14519 : }
708 :
709 :
710 4085 : void FelixClient::exec_sub(){
711 4085 : sub_var_t task;
712 4085 : contextHandler->pullSubTask(task);
713 :
714 4085 : std::visit(overloaded {
715 0 : [](std::monostate const&) {
716 0 : clog_info("Nothing to send. Send task queue is empty.");
717 : },
718 1978 : [this](UnsubEvent& unsub) {
719 1978 : this->send_unsubscribe_msg(unsub.fid, std::move(unsub.prom_unsub_done));
720 1978 : },
721 2107 : [this](SubscribeEvent const& sub) {
722 2107 : this->subscribe_evloop(sub.fid);
723 : },
724 : }, task);
725 4085 : }
726 :
727 :
728 25 : void FelixClient::init_send_data(netio_tag_t fid) {
729 25 : clog_info("init_send_data for FID 0x%x", fid);
730 25 : if(terminating_felix_client.load()){
731 0 : clog_debug("Already terminating, should not send data");
732 0 : return;
733 : }
734 :
735 25 : try{
736 25 : contextHandler->createOrUpdateInfo(fid, &bus, false);
737 0 : } catch (std::exception& e) {
738 0 : clog_error("Problem while reading bus: %s", e.what());
739 0 : throw FelixClientSendConnectionException();
740 0 : }
741 :
742 25 : clog_debug("Address for FID 0x%x; %s:%d", fid, contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid));
743 :
744 25 : if (!contextHandler->exists(fid)) {
745 0 : clog_error("Invalid Bus entry");
746 0 : throw FelixClientSendConnectionException();
747 : }
748 :
749 : //If there is no connection, we have to establish a new one through the evloop
750 25 : if (contextHandler->getSocketState(fid) == DISCONNECTED || contextHandler->getSubscriptionState(fid) == TIMEOUT){ //What about CONNECTING?
751 25 : std::promise<uint8_t> prom_sub_done;
752 25 : contextHandler->pushSendTask(SendConnectEvent{fid, std::move(prom_sub_done)});
753 25 : netio_signal_fire(&send_signal);
754 25 : }
755 :
756 : return;
757 : }
758 :
759 391 : void FelixClient::init_subscribe(netio_tag_t fid) {
760 391 : clog_info("init_subscribe not implemented, subscribe will do the job");
761 391 : return;
762 : }
763 :
764 :
765 14519 : void FelixClient::on_send_signal(void* ptr){
766 14519 : clog_trace("On send signal");
767 14519 : static_cast<FelixClient*>(ptr)->exec_send();
768 14519 : }
769 :
770 4085 : void FelixClient::on_sub_signal(void* ptr){
771 4085 : clog_debug("On sub signal");
772 4085 : static_cast<FelixClient*>(ptr)->exec_sub();
773 4085 : }
774 :
775 :
776 0 : void FelixClient::send_data(netio_tag_t fid, struct iovec* iov, unsigned n, bool flush) {
777 0 : clog_info("send_msg(netio_tag_t fid, struct iovec* iov, unsigned n): not implemented");
778 0 : throw FelixClientSendConnectionException();
779 : }
780 :
781 586 : int FelixClient::subscribe(const std::vector<netio_tag_t>& fids, uint timeoutms, bool for_register) {
782 :
783 586 : std::vector<netio_tag_t> all_fids;
784 586 : clog_debug("Subscribing to %d FIDs", fids.size());
785 586 : uint8_t alreadySubscribedCounter = 0;
786 2725 : for(auto& fid : fids){
787 : // Already subscribed
788 2139 : if (!contextHandler->canSubscribe(fid)) {
789 24 : clog_info("Already subscribed to 0x%x", fid);
790 24 : alreadySubscribedCounter++;
791 24 : continue;
792 : }
793 2115 : if (fid & 0x8000){
794 0 : clog_info("Can't subscribe to toflx fid: 0x%x", fid);
795 0 : continue;
796 : }
797 :
798 2115 : try{
799 2115 : contextHandler->createOrUpdateInfo(fid, &bus, for_register);
800 2107 : struct timespec t0;
801 2107 : clock_gettime(CLOCK_MONOTONIC_RAW, &t0);
802 4214 : contextHandler->setSubscriptionTimes(fid, "subscribe", t0);
803 2107 : contextHandler->setSubscriptionState(fid, SUBING);
804 8 : } catch(...){
805 8 : continue;
806 8 : }
807 2107 : all_fids.push_back(fid);
808 2107 : contextHandler->addToFidsToResubscribe(fid);
809 : }
810 586 : if (fids.size() == alreadySubscribedCounter){
811 : return FELIX_CLIENT_STATUS_ALREADY_DONE;
812 : }
813 :
814 582 : uint64_t actual_delay = 1000;
815 582 : netio_timer_start_ms(&subscribe_timer, actual_delay);
816 582 : if(evloop_thread_id == std::this_thread::get_id()){
817 0 : for (const auto& fid : all_fids){
818 0 : if (!subscribe_evloop(fid)){
819 0 : if (timeoutms == 0) {
820 586 : return FELIX_CLIENT_STATUS_OK;
821 : }
822 0 : return FELIX_CLIENT_STATUS_TIMEOUT;
823 : }
824 : }
825 : } else {
826 2689 : for (auto& fid : all_fids){
827 2107 : contextHandler->pushSubTask(SubscribeEvent{fid});
828 2107 : netio_signal_fire(&sub_signal);
829 : }
830 : }
831 :
832 582 : if (timeoutms == 0) {
833 : return FELIX_CLIENT_STATUS_OK;
834 : }
835 420 : std::vector<netio_tag_t> not_subscribed_fids;
836 :
837 420 : if(!contextHandler->isSubscribed(all_fids, timeoutms)){
838 483 : for (auto& fid : all_fids){
839 478 : if(!contextHandler->isSubscribed(fid)){
840 0 : not_subscribed_fids.push_back(fid);
841 0 : contextHandler->setSubscriptionState(fid, TIMEOUT);
842 : }
843 : }
844 : }
845 :
846 420 : if (not_subscribed_fids.empty()) {
847 420 : if(fids.size() > all_fids.size() + alreadySubscribedCounter){
848 : return FELIX_CLIENT_STATUS_TIMEOUT;
849 : }
850 412 : return FELIX_CLIENT_STATUS_OK;
851 : }
852 : return FELIX_CLIENT_STATUS_TIMEOUT;
853 :
854 586 : }
855 :
856 544 : int FelixClient::subscribe(netio_tag_t fid, uint timeoutms, bool for_register) {
857 544 : if(terminating_felix_client.load()){
858 0 : clog_debug("Already terminating, should not subscribe anymore");
859 0 : return FELIX_CLIENT_STATUS_OK;
860 : }
861 544 : std::vector<uint64_t> fids{fid};
862 544 : return subscribe(fids, timeoutms, for_register);
863 544 : }
864 :
865 2156 : bool FelixClient::subscribe_evloop(netio_tag_t fid) {
866 2156 : try{
867 2156 : contextHandler->createOrUpdateInfo(fid, &bus);
868 2156 : struct timespec t1;
869 2156 : clock_gettime(CLOCK_MONOTONIC_RAW, &t1);
870 4312 : contextHandler->setSubscriptionTimes(fid, "subscribe_evloop", t1);
871 0 : } catch(const std::exception& e){
872 0 : return false;
873 0 : }
874 2156 : int ret;
875 2156 : clog_trace("type (0=buffered, 1=unbuffered) %d", contextHandler->getType(fid));
876 :
877 2156 : if (contextHandler->getType(fid) == BUFFERED) {
878 1093 : netio_subscribe_socket* buffered_socket = get_buffered_subscribe_socket(fid);
879 1093 : clog_debug("(Re-)subscribing (buffered) for fid 0x%x and socket 0x%p", fid, buffered_socket);
880 1093 : ret = netio_subscribe(buffered_socket, fid);
881 1093 : clog_debug("(Re-)subscribing (buffered) for 0x%x with send socket has been done", fid);
882 :
883 : } else {
884 1063 : netio_unbuffered_subscribe_socket* unbuffered_socket = get_unbuffered_subscribe_socket(fid);
885 1063 : clog_debug("(Re-)subscribing (unbuffered) for 0x%x and socket 0x%p", fid, unbuffered_socket);
886 1063 : clog_debug("Subscribed to unbuffered subscribe socket. Netio_page_size: %d netio pages: %d", unbuffered_socket->recv_socket.attr.buffer_size, unbuffered_socket->recv_socket.attr.num_buffers);
887 1063 : ret = netio_unbuffered_subscribe(unbuffered_socket, fid);
888 : }
889 2156 : if(ret == NETIO_STATUS_AGAIN){
890 0 : contextHandler->updateFidsWhenUnsub(fid);
891 0 : contextHandler->pushSubTask(SubscribeEvent{fid});
892 0 : netio_signal_fire(&sub_signal);
893 0 : return false;
894 2156 : } else if(ret == NETIO_STATUS_ERROR){
895 : return false;
896 : }
897 2156 : if(contextHandler->getSocketState(fid) != CONNECTED && contextHandler->getSocketState(fid) != CONNECTING){
898 0 : contextHandler->setSocketState(fid, CONNECTING);
899 : }
900 2156 : if(contextHandler->getSocketState(fid) == CONNECTED){
901 1107 : contextHandler->setSubscriptionState(fid, SUB);
902 1107 : contextHandler->removeFromFidsToResubscribe(fid);
903 1107 : struct timespec t2;
904 1107 : clock_gettime(CLOCK_MONOTONIC_RAW, &t2);
905 1107 : std::unordered_map<std::string, timespec> subscriptionTimes = contextHandler->getSubscriptionTimes(fid);
906 :
907 2214 : double subscribe_useconds = (t2.tv_sec - subscriptionTimes["subscribe"].tv_sec)*1e6
908 2214 : + (t2.tv_nsec - subscriptionTimes["subscribe"].tv_nsec)*1e-3;
909 1107 : clog_debug("Beginning of subscribe() until on_connect for fid 0x%x took %fus", fid, subscribe_useconds);
910 :
911 2214 : double subscribe_evloop_useconds = (t2.tv_sec - subscriptionTimes["subscribe_evloop"].tv_sec)*1e6
912 2214 : + (t2.tv_nsec - subscriptionTimes["subscribe_evloop"].tv_nsec)*1e-3;
913 1107 : clog_debug("Beginning of subscribe_evloop() until on_connect for fid 0x%x took %fus", fid, subscribe_evloop_useconds);
914 1107 : if (cb_on_connect) {
915 1107 : (cb_on_connect)(fid);
916 : }
917 1107 : }
918 : return true;
919 : }
920 :
921 2013 : void FelixClient::send_unsubscribe_msg(uint64_t fid, std::promise<uint8_t> prom_unsub_done){
922 2013 : void* socket;
923 2013 : int ret = 0;
924 2013 : if (contextHandler->getType(fid) == BUFFERED) {
925 1022 : struct netio_subscribe_socket* buffered_socket;
926 1022 : buffered_socket = contextHandler->getSocket<netio_subscribe_socket*>(fid);
927 1022 : socket = static_cast<void*>(buffered_socket);
928 1022 : if(!contextHandler->exists(socket) || contextHandler->getSubscriptionState(fid) == UNSUB){prom_unsub_done.set_value(1);return;}
929 1022 : clog_info("Unsubscribing (buffered) for 0x%x from %s and socket 0x%p", fid, get_address(buffered_socket).c_str(), buffered_socket);
930 1022 : ret = netio_unsubscribe(buffered_socket, fid);
931 : } else {
932 991 : struct netio_unbuffered_subscribe_socket* unbuffered_socket;
933 991 : unbuffered_socket = contextHandler->getSocket<netio_unbuffered_subscribe_socket*>(fid);
934 991 : socket = static_cast<void*>(unbuffered_socket);
935 991 : if(!contextHandler->exists(socket) || contextHandler->getSubscriptionState(fid) == UNSUB){prom_unsub_done.set_value(1);return;}
936 991 : clog_info("Unsubscribing (unbuffered) for 0x%x from %s", fid, get_address(unbuffered_socket).c_str());
937 991 : ret = netio_unbuffered_unsubscribe(unbuffered_socket, fid);
938 : }
939 2013 : if(ret == NETIO_STATUS_AGAIN){
940 0 : contextHandler->pushSubTask(UnsubEvent{fid, std::move(prom_unsub_done)});
941 0 : netio_signal_fire(&sub_signal);
942 0 : return;
943 2013 : } else if(ret == NETIO_STATUS_ERROR){
944 0 : prom_unsub_done.set_value(2);
945 : }
946 2013 : contextHandler->updateFidsWhenUnsub(fid);
947 2013 : contextHandler->setSubscriptionState(fid, UNSUB);
948 2013 : if(contextHandler->areAllFidsUnsubscribed(socket)){
949 1824 : contextHandler->setSocketToAllUnsubscribed(socket);
950 : }
951 2013 : prom_unsub_done.set_value(0);
952 : }
953 :
954 :
955 2178 : int FelixClient::unsubscribe(netio_tag_t fid) {
956 2178 : clog_info("Unsubscribing from 0x%x", fid);
957 2178 : if (fid & 0x8000){
958 : return FELIX_CLIENT_STATUS_OK;
959 : }
960 :
961 1978 : if(evloop_thread_id == std::this_thread::get_id()){
962 0 : send_unsubscribe_msg(fid);
963 0 : return FELIX_CLIENT_STATUS_OK;
964 : }
965 1978 : if(!contextHandler->exists(fid)){
966 : return FELIX_CLIENT_STATUS_OK;
967 : }
968 :
969 1978 : std::promise<uint8_t> prom_unsub_done;
970 1978 : std::future<uint8_t> future_unsub_done = prom_unsub_done.get_future();
971 1978 : uint8_t status = FELIX_CLIENT_STATUS_OK;
972 1978 : if (contextHandler->getSubscriptionState(fid) == SUB || contextHandler->getSubscriptionState(fid) == SUBING){
973 1978 : contextHandler->setSubscriptionState(fid, UNSUBING);
974 1978 : contextHandler->pushSubTask(UnsubEvent{fid, std::move(prom_unsub_done)});
975 1978 : netio_signal_fire(&sub_signal);
976 :
977 1978 : try{
978 1978 : std::chrono::milliseconds timeout (5000);
979 1978 : if (future_unsub_done.wait_for(timeout)==std::future_status::timeout){
980 0 : clog_error("Unsubscribe timeout expired, stop blocking to prevent deadlock.");
981 0 : return FELIX_CLIENT_STATUS_TIMEOUT;
982 : }
983 1978 : status = future_unsub_done.get();
984 0 : } catch(std::exception& e){
985 0 : clog_info("Exception while unsubscribing: %s FID: 0x%x", e.what(), fid);
986 0 : throw;
987 0 : }
988 : }
989 1978 : return status; //0 = OK, 1 = Already done
990 1978 : }
991 :
992 :
993 195 : void FelixClient::exec(const UserFunction &user_function ) {
994 195 : struct netio_signal* signal = (struct netio_signal*)malloc(sizeof(struct netio_signal)); // c style
995 195 : struct SignalData* data = new SignalData(); // C++ style (struct SignalData*)malloc(sizeof(struct SignalData)); Does this need to be cleaned up?
996 195 : data->signal = signal;
997 195 : data->user_function = user_function;
998 195 : data->evloop = &ctx.evloop;
999 :
1000 195 : netio_signal_init(&ctx.evloop, signal);
1001 195 : signal->data = data;
1002 195 : signal->cb = on_signal;
1003 :
1004 195 : netio_signal_fire(signal);
1005 195 : }
1006 :
1007 :
1008 52 : netio_tag_t FelixClient::get_ctrl_fid(netio_tag_t fid) {
1009 52 : uint8_t sid = 0;
1010 52 : uint8_t to_flx = 1;
1011 52 : uint8_t virt = 1;
1012 104 : netio_tag_t ctrl_fid = get_fid_from_ids(get_did(fid),
1013 : get_cid(fid),
1014 : COMMAND_REPLY_LINK,
1015 : sid,
1016 52 : get_vid(fid),
1017 : to_flx,
1018 : virt);
1019 52 : return ctrl_fid;
1020 : }
1021 :
1022 33 : netio_tag_t FelixClient::get_subscribe_fid(netio_tag_t fid) {
1023 33 : uint8_t sid = 0;
1024 33 : uint8_t to_flx = 0;
1025 33 : uint8_t virt = 1;
1026 66 : netio_tag_t subscribe_fid = get_fid_from_ids(get_did(fid),
1027 : get_cid(fid),
1028 : COMMAND_REPLY_LINK,
1029 : sid,
1030 33 : get_vid(fid),
1031 : to_flx,
1032 : virt);
1033 33 : return subscribe_fid;
1034 : }
1035 :
1036 :
1037 20 : FelixClientThread::Status FelixClient::send_cmd(const std::vector<uint64_t>& fids,
1038 : FelixClientThread::Cmd cmd,
1039 : const std::vector<std::string>& cmd_args,
1040 : std::vector<FelixClientThread::Reply>& replies) {
1041 20 : clog_debug("send_cmd");
1042 20 : if(terminating_felix_client.load()){
1043 0 : clog_debug("Already terminating not sending command");
1044 0 : return FelixClientThread::Status::OK;
1045 : }
1046 :
1047 : // No commands to be send, all OK
1048 20 : if(fids.size() == 0) {
1049 : return FelixClientThread::Status::OK;
1050 : }
1051 :
1052 20 : std::string message;
1053 20 : FelixClientThread::Status status = FelixClientThread::Status::OK;
1054 :
1055 : // check the command and command arguments
1056 20 : switch(cmd) {
1057 0 : case FelixClientThread::NOOP:
1058 0 : if (cmd_args.size() != 0) {
1059 0 : message = std::string("Too many arguments for NOOP, needs 0 while %d given.", cmd_args.size());
1060 0 : status = FelixClientThread::Status::ERROR_INVALID_ARGS;
1061 0 : break;
1062 : }
1063 : break;
1064 14 : case FelixClientThread::GET:
1065 14 : if (cmd_args.size() != 1) {
1066 1 : message = std::string("Not enough arguments for GET, needs 1 while %d given.", cmd_args.size());
1067 1 : status = FelixClientThread::Status::ERROR_INVALID_ARGS;
1068 1 : break;
1069 : }
1070 : break;
1071 2 : case FelixClientThread::SET:
1072 2 : if (cmd_args.size() != 2) {
1073 0 : message = std::string("Not enough arguments for SET, needs 2 while %d given.", cmd_args.size());
1074 0 : status = FelixClientThread::Status::ERROR_INVALID_ARGS;
1075 0 : break;
1076 : }
1077 : break;
1078 3 : case FelixClientThread::ECR_RESET:
1079 3 : if (cmd_args.size() != 1) {
1080 0 : message = std::string("Not enough arguments for ECR_RESET, needs 1 while", cmd_args.size());
1081 0 : status = FelixClientThread::Status::ERROR_INVALID_ARGS;
1082 0 : break;
1083 : }
1084 : break;
1085 1 : default:
1086 1 : message = "Cmd not available: " + FelixClientThread::to_string(cmd);
1087 1 : status = FelixClientThread::Status::ERROR_INVALID_CMD;
1088 1 : break;
1089 : }
1090 :
1091 :
1092 : // deduce set of ctrl_fids from list of normal fids
1093 40 : std::unordered_map<netio_tag_t, netio_tag_t> ctrl_by_subscribe_fid;
1094 40 : std::set<uint64_t> sub_fids_set;
1095 20 : clog_info("Looking up %d subscribe FIDs", fids.size());
1096 53 : for(auto const& fid : fids) {
1097 33 : netio_tag_t ctrl_fid = get_ctrl_fid(fid);
1098 33 : netio_tag_t sub_fid = get_subscribe_fid(ctrl_fid);
1099 33 : sub_fids_set.insert(sub_fid);
1100 33 : std::error_code ec;
1101 33 : felixbus::FelixBusInfo bus_info = bus.get_info(sub_fid, ec);
1102 :
1103 33 : if( bus_info.ip != "" ){
1104 31 : ctrl_by_subscribe_fid[sub_fid] = ctrl_fid;
1105 : }
1106 : else{
1107 2 : clog_error("Discarding ctrl fid 0x%x because reply fid 0x%x does not appear in the bus", ctrl_fid, sub_fid);
1108 2 : FelixClientThread::Reply reply;
1109 2 : reply.ctrl_fid = ctrl_fid;
1110 2 : reply.status = FelixClientThread::Status::ERROR_NO_SUBSCRIPTION;
1111 2 : reply.message = "Invalid fid, not found in bus";
1112 2 : reply.value = 0;
1113 2 : replies.push_back(reply);
1114 2 : }
1115 33 : }
1116 :
1117 40 : std::vector<uint64_t> sub_fids;
1118 20 : sub_fids.assign(sub_fids_set.begin(), sub_fids_set.end());
1119 :
1120 20 : clog_info("Subscribing to %d FIDs", ctrl_by_subscribe_fid.size());
1121 20 : uint subscribe_timeout = 5000;
1122 20 : bool for_register = true;
1123 20 : int ret = subscribe(sub_fids, subscribe_timeout, for_register);
1124 20 : if(ret == FELIX_CLIENT_STATUS_TIMEOUT){
1125 2 : auto it = ctrl_by_subscribe_fid.begin();
1126 3 : while (it != ctrl_by_subscribe_fid.end()){
1127 1 : if(!contextHandler->isSubscribed(it->first)){
1128 0 : FelixClientThread::Reply reply;
1129 0 : reply.ctrl_fid = it->second;
1130 0 : reply.status = FelixClientThread::Status::ERROR_NO_SUBSCRIPTION;
1131 0 : reply.message = "Could not subscribe: timeout";
1132 0 : reply.value = 0;
1133 0 : replies.push_back(reply);
1134 0 : it = ctrl_by_subscribe_fid.erase(it);
1135 0 : } else {
1136 1 : it++;
1137 : }
1138 : }
1139 : }
1140 :
1141 20 : clog_info("Sending command");
1142 :
1143 : // continue with the connected ctrl_fids and send them the command
1144 40 : std::unordered_map<std::string, netio_tag_t> ctrl_fid_by_uuid;
1145 41 : for(auto const& sub_ctrl_fids : ctrl_by_subscribe_fid) {
1146 21 : uuid_t uuid_dec;
1147 21 : char uuid[36 + 1];
1148 21 : uuid_generate(uuid_dec);
1149 21 : uuid_unparse_upper(uuid_dec, uuid);
1150 :
1151 21 : clog_info("Preparing cmd...");
1152 21 : char json[256];
1153 21 : struct jWriteControl jwc;
1154 21 : jwOpen(&jwc, json, 256 - 1, JW_ARRAY, JW_COMPACT);
1155 21 : jwArr_object(&jwc);
1156 21 : jwObj_string(&jwc, FelixClientThread::to_string(FelixClientThread::UUID).c_str(), uuid);
1157 42 : jwObj_string(&jwc, FelixClientThread::to_string(FelixClientThread::CMD).c_str(), FelixClientThread::to_string(cmd).c_str());
1158 21 : jwObj_array(&jwc, FelixClientThread::to_string(FelixClientThread::CMD_ARGS).c_str());
1159 45 : for(auto const& cmd_arg : cmd_args) {
1160 24 : jwArr_string(&jwc, cmd_arg.c_str());
1161 : }
1162 21 : jwEnd(&jwc);
1163 21 : jwEnd(&jwc);
1164 21 : jwClose(&jwc);
1165 21 : int json_length = strlen(json);
1166 21 : clog_debug("len=%d; json=%d", json_length, json);
1167 :
1168 21 : try {
1169 21 : clog_info("Sending cmd to fid 0x%x", sub_ctrl_fids.second);
1170 21 : send_data(sub_ctrl_fids.second, (const uint8_t*)json, json_length, true);
1171 20 : uuids.emplace(uuid);
1172 21 : ctrl_fid_by_uuid.emplace(uuid, sub_ctrl_fids.second);
1173 1 : } catch (FelixClientException& error) {
1174 1 : clog_info("Could not send cmd for FID 0x%x because: %s", sub_ctrl_fids.second, error.what());
1175 : // sending timed out or other problem
1176 1 : FelixClientThread::Reply reply;
1177 1 : reply.ctrl_fid = sub_ctrl_fids.second;
1178 1 : reply.status = FelixClientThread::Status::ERROR_NO_CONNECTION;
1179 1 : reply.message = "Could not send cmd";
1180 1 : reply.value = 0;
1181 1 : replies.push_back(reply);
1182 1 : }
1183 :
1184 : }
1185 :
1186 : // wait for replies (or timeout)
1187 20 : clog_info("Waiting for %d replies...", ctrl_fid_by_uuid.size());
1188 40 : for(auto const &it : ctrl_fid_by_uuid) {
1189 20 : clog_debug("%s 0x%x", it.first.c_str(), it.second);
1190 : }
1191 :
1192 20 : uint timeoutms = 5000;
1193 20 : uint timeout = 0;
1194 20 : uint span = std::min(timeoutms, 250U);
1195 20 : uint recv_replies = 0;
1196 116 : while ((ctrl_fid_by_uuid.size() != recv_replies) && (timeout <= timeoutms)) {
1197 96 : recv_replies = 0;
1198 196 : for (auto& uuid_it : ctrl_fid_by_uuid){
1199 100 : if (reply_by_uuid.find(uuid_it.first) != reply_by_uuid.end()) {
1200 20 : recv_replies++;
1201 : }
1202 : }
1203 96 : std::this_thread::sleep_for(std::chrono::milliseconds(span));
1204 96 : timeout += span;
1205 : }
1206 20 : clog_info("Finished waiting");
1207 :
1208 40 : for (auto& ctrl_fid_uuid : ctrl_fid_by_uuid){
1209 : // take it out of the list of uuids
1210 20 : std::string uuid = ctrl_fid_uuid.first;
1211 20 : uuids.erase(uuid);
1212 :
1213 20 : auto reply_it = reply_by_uuid.find(uuid);
1214 20 : if(reply_it != reply_by_uuid.end()){
1215 18 : clog_info("uuid: %s", ctrl_fid_uuid.first.c_str());
1216 18 : FelixClientThread::Reply reply = (*reply_it).second;
1217 18 : replies.push_back(reply);
1218 :
1219 18 : clog_info("Found reply for %s", ctrl_fid_uuid.first.c_str());
1220 18 : clog_info(" %s", FelixClientThread::to_string(reply).c_str());
1221 :
1222 18 : reply_by_uuid.erase(reply_it);
1223 18 : } else {
1224 2 : netio_tag_t ctrl_fid = ctrl_fid_uuid.second;
1225 2 : clog_debug("UUID %s timed out for ctrl_fid 0x%x", uuid.c_str(), ctrl_fid);
1226 :
1227 2 : FelixClientThread::Reply reply;
1228 2 : reply.ctrl_fid = ctrl_fid;
1229 2 : reply.status = FelixClientThread::Status::ERROR_NO_REPLY;
1230 2 : reply.message = "Did not receive a reply";
1231 2 : reply.value = 0;
1232 2 : replies.push_back(reply);
1233 2 : } //TODO: What happens to replies that arrive aftert the timeout and staty in the map
1234 20 : clog_debug("send_cmd done");
1235 20 : }
1236 :
1237 : // calculate summary_status, no replies is an ERROR
1238 20 : FelixClientThread::Status summary_status = replies.size() > 0 ? FelixClientThread::Status::OK : FelixClientThread::Status::ERROR;
1239 20 : summary_status = std::max(summary_status, status);
1240 43 : for(auto const& reply : replies) {
1241 30 : summary_status = std::max(summary_status, reply.status);
1242 : }
1243 20 : clog_info("Calculated summary status %d:%s from %d replies", summary_status, FelixClientThread::to_string(summary_status).c_str(), replies.size());
1244 :
1245 20 : return summary_status;
1246 20 : }
1247 :
1248 419 : void FelixClient::callback_on_init( OnInitCallback on_init ) {
1249 419 : clog_info("Registering on_init");
1250 419 : cb_on_init = on_init;
1251 419 : }
1252 :
1253 184 : void FelixClient::callback_on_data( OnDataCallback on_data ) {
1254 184 : clog_info("Registering on_data");
1255 184 : cb_on_data = on_data;
1256 184 : }
1257 :
1258 2 : void FelixClient::callback_on_buffer( OnBufferCallback on_buffer ) {
1259 2 : clog_info("Registering on_buffer");
1260 2 : cb_on_buffer = on_buffer;
1261 2 : }
1262 :
1263 413 : void FelixClient::callback_on_connect( OnConnectCallback on_connect ) {
1264 413 : clog_info("Registering on_connect");
1265 413 : cb_on_connect = on_connect;
1266 413 : }
1267 :
1268 409 : void FelixClient::callback_on_disconnect( OnDisconnectCallback on_disconnect ) {
1269 409 : clog_info("Registering on_disconnect");
1270 409 : cb_on_disconnect = on_disconnect;
1271 409 : }
1272 :
1273 25 : void FelixClient::callback_on_user_timer( OnUserTimerCallback on_user_timer_cb ) {
1274 25 : clog_info("Registering on_user_timer");
1275 25 : cb_on_user_timer = on_user_timer_cb;
1276 25 : }
1277 :
1278 419 : void FelixClient::on_init_eventloop(void* ptr) {
1279 419 : clog_info("On init");
1280 419 : FelixClient* client = static_cast<FelixClient*>(ptr);
1281 419 : client->ready = true;
1282 419 : std::function<void()> callback = client->cb_on_init;
1283 419 : if (callback) {
1284 419 : (callback)();
1285 : }
1286 419 : }
1287 :
1288 459 : void FelixClient::on_connection_established(void* socket) {
1289 459 : clog_debug("on_connection_established");
1290 459 : if (contextHandler->exists(socket)){
1291 459 : contextHandler->setSocketState(socket, CONNECTED);
1292 1739 : for(auto fid : contextHandler->getFidsBySocket(socket)){
1293 1280 : if (contextHandler->exists(fid)){
1294 1280 : if(contextHandler->getConnectionType(fid) == PUBSUB){
1295 1004 : if(contextHandler->getSubscriptionState(fid) == TIMEOUT){ //FID has timed out before and we do not want to notify the user nor need this subscription
1296 0 : contextHandler->setSubscriptionState(fid, UNSUBING);
1297 0 : send_unsubscribe_msg(fid);
1298 : //contextHandler->removeFromFidsToResubscribe(fid);
1299 0 : return;
1300 : }
1301 : }
1302 1280 : contextHandler->removeFromFidsToResubscribe(fid);
1303 1280 : contextHandler->setSubscriptionState(fid, SUB);
1304 1280 : struct timespec t2;
1305 1280 : clock_gettime(CLOCK_MONOTONIC_RAW, &t2);
1306 1280 : std::unordered_map<std::string, timespec> subscriptionTimes = contextHandler->getSubscriptionTimes(fid);
1307 2560 : double subscribe_useconds = (t2.tv_sec - subscriptionTimes["subscribe"].tv_sec)*1e6
1308 2560 : + (t2.tv_nsec - subscriptionTimes["subscribe"].tv_nsec)*1e-3;
1309 1280 : clog_debug("Beginning of subscribe() until on_connect for fid 0x%x took %fus", fid, subscribe_useconds);
1310 2560 : double subscribe_evloop_useconds = (t2.tv_sec - subscriptionTimes["subscribe_evloop"].tv_sec)*1e6
1311 2560 : + (t2.tv_nsec - subscriptionTimes["subscribe_evloop"].tv_nsec)*1e-3;
1312 1280 : clog_debug("Beginning of subscribe_evloop() until on_connect for fid 0x%x took %fus", fid, subscribe_evloop_useconds);
1313 1280 : if (cb_on_connect) {
1314 1276 : (cb_on_connect)(fid);
1315 : }
1316 1280 : clog_debug("Tag: 0x%x out of %d FIDs has status: CONNECTED", fid, contextHandler->getFidsBySocket(socket).size());
1317 1280 : }
1318 459 : }
1319 : }
1320 : }
1321 :
1322 124 : void FelixClient::on_connection_established(struct netio_send_socket* socket) {
1323 124 : clog_info("Unbuffered send connection established");
1324 124 : static_cast<FelixClient*>(socket->usr)->on_connection_established((void*)socket);
1325 124 : }
1326 :
1327 142 : void FelixClient::on_connection_established(struct netio_buffered_send_socket* socket) {
1328 142 : clog_info("Buffered send connection established");
1329 142 : static_cast<FelixClient*>(socket->usr)->on_connection_established((void*)socket);
1330 142 : }
1331 :
1332 108 : void FelixClient::on_connection_established(struct netio_subscribe_socket* socket) {
1333 108 : clog_info("Buffered connection established to %s", get_address(socket).c_str());
1334 108 : static_cast<FelixClient*>(socket->usr)->on_connection_established((void*)socket);
1335 108 : }
1336 :
1337 85 : void FelixClient::on_connection_established(struct netio_unbuffered_subscribe_socket* socket) {
1338 85 : clog_info("Unbuffered connection established to %s", get_address(socket).c_str());
1339 85 : static_cast<FelixClient*>(socket->usr)->on_connection_established((void*)socket);
1340 85 : }
1341 :
1342 368 : void FelixClient::on_connection_closed(void* socket) {
1343 368 : clog_debug("FelixClient::on_connection_closed");
1344 368 : contextHandler->setSocketState(socket, DISCONNECTED);
1345 368 : auto fids = contextHandler->getFidsBySocket(socket);
1346 368 : if(contextHandler->getConnectionType(socket) == SENDRECV){
1347 266 : decrement_num_send_sockets();
1348 : }
1349 368 : if(!terminating_felix_client){
1350 48 : contextHandler->removeSocket(socket);
1351 : }
1352 :
1353 654 : for(auto fid : fids ){
1354 286 : if (cb_on_disconnect) {
1355 282 : (cb_on_disconnect)(fid);
1356 : }
1357 : }
1358 :
1359 368 : }
1360 :
1361 124 : void FelixClient::on_connection_closed(struct netio_send_socket* socket) {
1362 124 : clog_info("Unbuffered send connection closed");
1363 124 : static_cast<FelixClient*>(socket->usr)->on_connection_closed((void*)socket);
1364 : //static_cast<FelixClient*>(socket->usr)->decrement_num_send_sockets();
1365 124 : }
1366 :
1367 142 : void FelixClient::on_connection_closed(struct netio_buffered_send_socket* socket) {
1368 142 : clog_info("Buffered send connection closed");
1369 142 : static_cast<FelixClient*>(socket->usr)->on_connection_closed((void*)socket);
1370 : //static_cast<FelixClient*>(socket->usr)->decrement_num_send_sockets();
1371 142 : }
1372 :
1373 61 : void FelixClient::on_connection_closed(struct netio_subscribe_socket* socket) {
1374 61 : clog_info("Buffered connection closed to %s", get_address(socket).c_str());
1375 61 : netio_close_socket(&static_cast<FelixClient*>(socket->usr)->ctx.evloop, socket, socket_type::BSUB);
1376 61 : static_cast<FelixClient*>(socket->usr)->on_connection_closed((void*)socket);
1377 61 : }
1378 :
1379 41 : void FelixClient::on_connection_closed(struct netio_unbuffered_subscribe_socket* socket) {
1380 41 : clog_info("Unbuffered connection closed to %s", get_address(socket).c_str());
1381 41 : netio_close_socket(&static_cast<FelixClient*>(socket->usr)->ctx.evloop, socket, socket_type::USUB);
1382 41 : static_cast<FelixClient*>(socket->usr)->on_connection_closed((void*)socket);
1383 41 : }
1384 :
1385 91 : void FelixClient::on_error_connection_refused(void* socket) {
1386 91 : clog_debug("on_error_connection_refused");
1387 91 : contextHandler->setSocketState(socket, DISCONNECTED);
1388 182 : exec([this, socket]{this->contextHandler->removeSocket(socket, true);});
1389 91 : clog_debug("Socket deleted from socket map.");
1390 91 : }
1391 :
1392 23 : void FelixClient::on_error_connection_refused(struct netio_send_socket* socket) {
1393 23 : clog_info("Unbuffered send connection refused");
1394 23 : static_cast<FelixClient*>(socket->usr)->on_error_connection_refused((void*)socket);
1395 23 : }
1396 :
1397 23 : void FelixClient::on_error_connection_refused(struct netio_buffered_send_socket* socket) {
1398 23 : clog_info("Buffered send connection refused");
1399 23 : static_cast<FelixClient*>(socket->usr)->on_error_connection_refused((void*)socket);
1400 23 : }
1401 :
1402 22 : void FelixClient::on_error_connection_refused(struct netio_subscribe_socket* socket) {
1403 22 : clog_info("Buffered connection refused to %s", get_address(socket).c_str());
1404 22 : static_cast<FelixClient*>(socket->usr)->on_error_connection_refused((void*)socket);
1405 :
1406 22 : }
1407 :
1408 23 : void FelixClient::on_error_connection_refused(struct netio_unbuffered_subscribe_socket* socket) {
1409 23 : clog_info("Unbuffered connection refused to %s", get_address(socket).c_str());
1410 23 : static_cast<FelixClient*>(socket->usr)->on_error_connection_refused((void*)socket);
1411 :
1412 23 : }
1413 :
1414 152 : void FelixClient::on_send_completed(struct netio_send_socket* socket, uint64_t key) {
1415 152 : clog_debug("Unbuffered send completed for 0x%x", key);
1416 152 : }
1417 :
1418 19 : void FelixClient::on_register_msg_received(void* socket, netio_tag_t tag, void* data_ptr, size_t size) {
1419 19 : if (size > 0) {
1420 19 : uint8_t* data = (uint8_t*)(data_ptr);
1421 19 : data++;
1422 19 : size--;
1423 :
1424 : // handle subscriptions for felix-register
1425 19 : clog_info("Answer on register command");
1426 :
1427 19 : netio_tag_t ctrl_fid = get_ctrl_fid(tag);
1428 19 : clog_info(" for ctrl_fid 0x%x", ctrl_fid);
1429 :
1430 19 : simdjson::dom::element ops = parser.parse(simdjson::padded_string((const char*)data, size));
1431 : // FIXME check that this works for more than one reply
1432 38 : for (simdjson::dom::object op : ops) {
1433 19 : FelixClientThread::Reply reply;
1434 38 : std::string uuid = std::string(op[FelixClientThread::to_string(FelixClientThread::UUID)]);
1435 19 : clog_info(" uuid %s", uuid.c_str());
1436 :
1437 : // only add if we are still looking for the uuid (timeout...)
1438 19 : if (uuids.find(uuid) != uuids.end()) {
1439 18 : reply.ctrl_fid = ctrl_fid;
1440 36 : int64_t s = op[FelixClientThread::to_string(FelixClientThread::STATUS)];
1441 18 : reply.status = static_cast<FelixClientThread::Status>(s);
1442 18 : reply.message = std::string(op[FelixClientThread::to_string(FelixClientThread::MESSAGE)]);
1443 36 : reply.value = uint64_t(op[FelixClientThread::to_string(FelixClientThread::VALUE)]);
1444 :
1445 37 : reply_by_uuid[uuid] = reply;
1446 : } else {
1447 1 : clog_info(" uuid not found, answer ignored", uuid.c_str());
1448 : }
1449 19 : }
1450 : }
1451 19 : }
1452 :
1453 19 : void FelixClient::on_register_msg_received(struct netio_subscribe_socket* socket, netio_tag_t tag, void* data, size_t size) {
1454 19 : static_cast<FelixClient*>(socket->usr)->on_register_msg_received((void*)socket, tag, data, size);
1455 19 : }
1456 :
1457 0 : void FelixClient::on_register_msg_received(struct netio_unbuffered_subscribe_socket* socket, netio_tag_t tag, void* data, size_t size) {
1458 0 : static_cast<FelixClient*>(socket->usr)->on_register_msg_received((void*)socket, tag, data, size);
1459 0 : }
1460 :
1461 3197 : void FelixClient::on_timer(void* ptr) {
1462 :
1463 3197 : clog_debug("On timer called. Attempting to resubscribe");
1464 3197 : uint count = 0;
1465 3197 : FelixClient* client = static_cast<FelixClient*>(ptr);
1466 3197 : if(client->terminating_felix_client.load()){
1467 0 : clog_debug("Already terminating no reason to resubscribe");
1468 0 : return;
1469 : }
1470 3197 : std::vector<uint64_t> fids_to_resubscribe = client->contextHandler->getFidsToResubscribe();
1471 3197 : clog_debug("Resubscribing for %d", fids_to_resubscribe.size());
1472 3284 : for(const auto& fid : fids_to_resubscribe) {
1473 87 : if(client->contextHandler->getSocketState(fid) == DISCONNECTED){
1474 73 : if(fid & 0x8000){
1475 24 : std::promise<uint8_t> prom_sub;
1476 24 : std::future<uint8_t> future_sub_done = prom_sub.get_future();
1477 24 : client->establish_send_connection(fid, std::move(prom_sub));
1478 24 : if(!future_sub_done.get()) {
1479 24 : count++;
1480 : }
1481 24 : } else {
1482 49 : if (!client->contextHandler->canSubscribe(fid)) {
1483 0 : clog_debug("Already attempting to subscribe to 0x%x", fid);
1484 0 : continue;
1485 : }
1486 49 : if (client->subscribe_evloop(fid)) {
1487 49 : count++;
1488 : }
1489 : }
1490 : }
1491 87 : clog_trace("Socket has state: %d", client->contextHandler->getSocketState(fid));
1492 87 : clog_trace("FID 0x%x is in state: %d", fid, client->contextHandler->getSubscriptionState(fid));
1493 : }
1494 :
1495 3197 : if (count > 0) {
1496 49 : clog_debug("Subscription timer: resubscribing to %d tags", count);
1497 : }
1498 3197 : }
1499 :
1500 :
1501 195 : void FelixClient::on_signal(void* ptr) {
1502 195 : clog_debug("On signal");
1503 195 : struct SignalData* data = (struct SignalData*)ptr;
1504 195 : UserFunction user_function = data->user_function;
1505 195 : (user_function)();
1506 :
1507 195 : netio_signal_close(data->evloop, data->signal);
1508 195 : free(data->signal);
1509 390 : delete data;
1510 195 : }
1511 :
1512 1343 : std::string FelixClient::get_address(netio_subscribe_socket *socket) {
1513 2686 : return std::string(socket->remote_hostname) + std::string(":") + std::to_string(socket->remote_port);
1514 : }
1515 :
1516 1248 : std::string FelixClient::get_address(netio_unbuffered_subscribe_socket *socket) {
1517 2496 : return std::string(socket->remote_hostname) + std::string(":") + std::to_string(socket->remote_port);
1518 : }
1519 :
1520 :
1521 1093 : struct netio_subscribe_socket* FelixClient::get_buffered_subscribe_socket(uint64_t fid) {
1522 1093 : bool needInititialization = contextHandler->addOrCreateSocket(fid);
1523 1093 : struct netio_subscribe_socket* socket = contextHandler->getSocket<netio_subscribe_socket*>(fid);
1524 1093 : clog_debug("get_buffered_subscribe_socket for fid 0x%x and socket 0x%p", fid, socket);
1525 1093 : if(needInititialization){
1526 130 : initialize_buffered_subscribe_socket(socket, fid);
1527 : }
1528 1093 : return socket;
1529 : }
1530 :
1531 130 : void FelixClient::initialize_buffered_subscribe_socket(struct netio_subscribe_socket* socket, uint64_t fid){
1532 130 : struct netio_buffered_socket_attr attr;
1533 130 : attr.num_pages = 2 * contextHandler->getPages(fid); //Hard-coded double the number of pages for FLX-2041, todo: make a parameter later
1534 130 : attr.pagesize = contextHandler->getPagesize(fid);
1535 130 : attr.watermark = contextHandler->getWatermark(fid);
1536 130 : if (contextHandler->isTcp(fid)) {
1537 31 : clog_debug("Setup tcp socket using address %s:%d", contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid));
1538 62 : netio_subscribe_tcp_socket_init(socket, &ctx, &attr,
1539 62 : netio_hostname(local_ip_address.c_str()), contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid));
1540 : } else {
1541 99 : clog_debug("Setup libfabric socket using address %s:%d", contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid));
1542 198 : netio_subscribe_socket_init(socket, &ctx, &attr,
1543 198 : netio_hostname(local_ip_address.c_str()), contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid));
1544 : }
1545 :
1546 130 : socket->cb_connection_closed = on_connection_closed;
1547 130 : socket->cb_connection_established = on_connection_established;
1548 130 : socket->cb_error_connection_refused = on_error_connection_refused;
1549 130 : if (contextHandler->isForRegister(fid)) {
1550 17 : socket->cb_msg_received = on_register_msg_received;
1551 : } else {
1552 113 : socket->cb_msg_received = on_msg_received;
1553 : }
1554 :
1555 130 : if(buffer_callback){
1556 2 : socket->cb_buf_received = on_buf_received;
1557 : }
1558 :
1559 130 : clog_debug("Setup socket for %s", get_address(socket).c_str());
1560 : // store object pointer (this) in socket, for use in c-style callbacks
1561 130 : socket->usr = this;
1562 130 : }
1563 :
1564 1063 : struct netio_unbuffered_subscribe_socket* FelixClient::get_unbuffered_subscribe_socket(uint64_t fid) {
1565 : // Create socket if it does not exist, otherwise lookup
1566 1063 : bool needInititialization = contextHandler->addOrCreateSocket(fid);
1567 :
1568 1063 : struct netio_unbuffered_subscribe_socket* socket = contextHandler->getSocket<netio_unbuffered_subscribe_socket*>(fid);
1569 1063 : clog_debug("get_unbuffered_subscribe_socket");
1570 1063 : if(needInititialization){
1571 108 : initialize_unbuffered_subscribe_socket(socket, fid);
1572 : }
1573 :
1574 :
1575 1063 : return socket;
1576 : }
1577 108 : void FelixClient::initialize_unbuffered_subscribe_socket(netio_unbuffered_subscribe_socket* socket, uint64_t fid){
1578 108 : clog_debug("Setup socket using address %s:%d", contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid));
1579 :
1580 : // FLX-1222 use params
1581 : // FLX-1222 allocate somewhere globally, per socket
1582 : //buffer size not specified, taken by the bus
1583 108 : if (unbuffered_netio_pages == 0){
1584 13 : clog_trace("Creating unbuffered subscribe socket. Buffer not specified, using bus parameters. Pages: %d pagesize: %d", contextHandler->getPages(fid), contextHandler->getPagesize(fid));
1585 26 : netio_unbuffered_subscribe_socket_init(socket, &ctx,
1586 26 : local_ip_address.c_str(), contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid),
1587 13 : contextHandler->getPagesize(fid), contextHandler->getPages(fid));
1588 : } else {
1589 : //buffer size passed as parameter
1590 95 : clog_trace("Creating unbuffered subscribe socket. Buffer parameter specified. Pages: %d pagesize: %d", unbuffered_netio_pages, unbuffered_netio_pagesize);
1591 95 : netio_unbuffered_subscribe_socket_init(socket, &ctx,
1592 190 : local_ip_address.c_str(), contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid),
1593 95 : unbuffered_netio_pagesize, unbuffered_netio_pages); //correct even if unbuffered_netio_pagesize/-pages are needed?
1594 : }
1595 :
1596 108 : socket->cb_connection_closed = on_connection_closed;
1597 108 : socket->cb_connection_established = on_connection_established;
1598 108 : socket->cb_error_connection_refused = on_error_connection_refused;
1599 108 : if (contextHandler->isForRegister(fid)) {
1600 0 : socket->cb_msg_received = on_register_msg_received;
1601 : } else {
1602 108 : socket->cb_msg_received = on_msg_received;
1603 : }
1604 :
1605 108 : clog_debug("Setup socket for %s", get_address(socket).c_str());
1606 :
1607 108 : socket->usr = this;
1608 108 : }
1609 :
1610 :
1611 14235 : struct netio_buffered_send_socket* FelixClient::get_buffered_send_socket(uint64_t fid) {
1612 : // Create socket if it does not exist, otherwise lookup
1613 14235 : bool needInititialization = false;
1614 14235 : if (contextHandler->getSocket<struct netio_buffered_send_socket*>(fid) == NULL){
1615 172 : needInititialization = contextHandler->addOrCreateSocket(fid);
1616 : }
1617 :
1618 14235 : struct netio_buffered_send_socket* socket = contextHandler->getSocket<struct netio_buffered_send_socket*>(fid);
1619 :
1620 14235 : if (needInititialization)
1621 : {
1622 165 : struct netio_buffered_socket_attr attr;
1623 165 : attr.num_pages = contextHandler->getPages(fid);
1624 165 : attr.pagesize = contextHandler->getPagesize(fid);
1625 165 : attr.watermark = contextHandler->getWatermark(fid);
1626 165 : attr.timeout_ms = 5;
1627 165 : if (contextHandler->isTcp(fid)) {
1628 5 : netio_buffered_send_tcp_socket_init(socket, &ctx, &attr);
1629 : } else {
1630 160 : netio_buffered_send_socket_init(socket, &ctx, &attr);
1631 : }
1632 165 : socket->cb_connection_closed = on_connection_closed;
1633 165 : socket->cb_connection_established = on_connection_established;
1634 165 : socket->cb_error_connection_refused = on_error_connection_refused;
1635 :
1636 : // store object pointer (this) in socket, for use in c-style callbacks
1637 165 : socket->usr = this;
1638 330 : netio_buffered_connect(socket, contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid));
1639 : }
1640 14235 : return socket;
1641 : }
1642 :
1643 :
1644 308 : struct netio_send_socket* FelixClient::get_unbuffered_send_socket(uint64_t fid) {
1645 : // Create socket if it does not exist, otherwise lookup
1646 308 : bool needInititialization = false;
1647 308 : if (contextHandler->getSocket<struct netio_send_socket*>(fid) == NULL){
1648 156 : needInititialization = contextHandler->addOrCreateSocket(fid);
1649 : }
1650 :
1651 308 : struct netio_send_socket* socket = contextHandler->getSocket<struct netio_send_socket*>(fid);
1652 :
1653 308 : if (needInititialization)
1654 : {
1655 147 : if (contextHandler->isTcp(fid)) {
1656 : // FIXME is this equivalent to the non-tcp below ?
1657 10 : netio_init_send_tcp_socket(socket, &ctx);
1658 : } else {
1659 137 : netio_unbuffered_send_socket_init(socket, &ctx);
1660 : }
1661 :
1662 147 : struct netio_buffer buf;
1663 147 : buf.size = UNBUFFERED_MR_SIZE;
1664 147 : buf.data = malloc(UNBUFFERED_MR_SIZE);
1665 147 : memset(buf.data, 0, UNBUFFERED_MR_SIZE);
1666 147 : buf.mr = nullptr;
1667 294 : unbuf_mem_regions[contextHandler->getAddress(fid)] = buf;
1668 :
1669 147 : socket->cb_connection_closed = on_connection_closed;
1670 147 : socket->cb_connection_established = on_connection_established;
1671 147 : socket->cb_error_connection_refused = on_error_connection_refused;
1672 147 : socket->cb_send_completed = on_send_completed;
1673 : // store object pointer (this) in socket, for use in c-style callbacks
1674 147 : socket->usr = this;
1675 147 : netio_connect(socket, contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid));
1676 294 : clog_debug("Connected. Now registering memory region: 0x%p", &unbuf_mem_regions[contextHandler->getAddress(fid)]);
1677 441 : netio_register_send_buffer(socket, &unbuf_mem_regions[contextHandler->getAddress(fid)], 0);
1678 : }
1679 308 : return socket;
1680 : }
1681 :
1682 :
1683 :
1684 444 : void FelixClient::user_timer_init(){
1685 444 : netio_timer_init(&ctx.evloop, &user_timer);
1686 444 : user_timer.cb = on_user_timer;
1687 444 : user_timer.data = this;
1688 444 : }
1689 :
1690 25 : void FelixClient::user_timer_start(unsigned long interval){
1691 25 : netio_timer_start_ms(&user_timer, interval);
1692 25 : }
1693 :
1694 0 : void FelixClient::user_timer_stop(){
1695 0 : netio_timer_stop(&user_timer);
1696 0 : }
1697 :
1698 37 : void FelixClient::on_user_timer(void* ptr) {
1699 37 : FelixClient* client = static_cast<FelixClient*>(ptr);
1700 37 : client->cb_on_user_timer();
1701 37 : }
1702 :
1703 :
1704 266 : void FelixClient::decrement_num_send_sockets(){
1705 266 : if(total_number_send_sockets.load() > 0){
1706 264 : total_number_send_sockets--;
1707 : }
1708 266 : }
|