Line data Source code
1 : #include "felix/felix_client.hpp"
2 : #include "felix/felix_client_exception.hpp"
3 : #include "felix/felix_client_util.hpp"
4 : #include "felix/felix_client_thread.hpp"
5 :
6 : #include "felix/felix_fid.h"
7 :
8 : #include "clog.h"
9 :
10 : extern "C" {
11 : #include "jWrite.h"
12 : }
13 :
14 : #include <algorithm>
15 : #include <uuid/uuid.h>
16 : #include <unordered_set>
17 : #include <numeric>
18 : #include <regex>
19 : #include <unistd.h>
20 : #include <set>
21 :
22 673 : FelixClient::FelixClient(const std::string& local_ip_or_interface,
23 : std::string bus_dir,
24 : std::string bus_group_name,
25 : unsigned log_level,
26 : bool verbose_bus,
27 : unsigned netio_pages,
28 673 : unsigned netio_pagesize) :
29 673 : ready(false),
30 673 : timer_delay_millis(0),
31 673 : ctx({0}),
32 673 : user_timer({0}),
33 673 : unbuffered_netio_pages(netio_pages),
34 673 : unbuffered_netio_pagesize(netio_pagesize),
35 1346 : local_ip_address(get_ip_from_interface(local_ip_or_interface)) {
36 :
37 : // // handle here to make sure trace logging looks ok
38 : // local_ip_address = get_ip_from_interface(local_ip_or_interface);
39 :
40 673 : if (local_ip_address == "") {
41 0 : clog_fatal("Could not deduce the local ip address from %s", local_ip_or_interface.c_str());
42 0 : throw FelixClientException("felix-client: Could not deduce the local ip address from local_ip_or_interface");
43 : }
44 :
45 673 : clog_info("Local IP: %s", local_ip_address.c_str());
46 :
47 673 : clog_info("Initializing netio-next");
48 673 : netio_init(&ctx);
49 :
50 673 : clog_set_level(log_level);
51 :
52 : // init callback
53 673 : ctx.evloop.cb_init = on_init_eventloop;
54 673 : ctx.evloop.data = this;
55 :
56 673 : contextHandler = std::make_unique<FelixClientContextHandler>();
57 :
58 :
59 673 : send_signal = netio_signal();
60 :
61 673 : netio_signal_init(&ctx.evloop, &send_signal);
62 673 : send_signal.data = this;
63 673 : send_signal.cb = on_send_signal;
64 :
65 :
66 673 : sub_signal = netio_signal();
67 673 : netio_signal_init(&ctx.evloop, &sub_signal);
68 673 : sub_signal.data = this;
69 673 : sub_signal.cb = on_sub_signal;
70 :
71 : // setup timer for reconnection/resubscription
72 673 : netio_timer_init(&ctx.evloop, &subscribe_timer);
73 673 : subscribe_timer.cb = on_timer;
74 673 : subscribe_timer.data = this;
75 673 : terminating_felix_client = false;
76 673 : contextHandler->terminatingFelixClient = false;
77 :
78 673 : uint64_t actual_delay = 1000;
79 673 : netio_timer_start_ms(&subscribe_timer, actual_delay);
80 :
81 673 : user_timer_init();
82 :
83 673 : clog_info("Initializing felix-bus");
84 1346 : bus.set_path(bus_dir);
85 :
86 673 : bus.set_groupname(bus_group_name);
87 673 : bus.set_verbose(verbose_bus);
88 673 : }
89 :
90 0 : std::vector<int> FelixClient::parseCpuRange(const std::string &range) {
91 0 : static const std::regex re(R"(((\d+)-(\d+))|(\d+))");
92 0 : std::smatch sm;
93 0 : std::vector<int> data;
94 0 : for (std::string s(range); std::regex_search(s, sm, re); s = sm.suffix()) {
95 0 : if (sm[1].str().empty()) {
96 0 : data.push_back(std::stoi(sm[0]));
97 : } else {
98 0 : for (int i = std::stoi(sm[2]); i <= std::stoi(sm[3]); ++i) {
99 0 : data.push_back(i);
100 : }
101 : }
102 0 : }
103 0 : return data;
104 0 : }
105 :
106 0 : void FelixClient::set_thread_affinity(const std::string& affinity) {
107 0 : this->affinity = affinity;
108 0 : }
109 :
110 547 : FelixClient::~FelixClient(){
111 547 : clog_info("Cleaning up FelixClient.");
112 781 : for(auto& it : unbuf_mem_regions){
113 234 : clog_debug("Removing mem reg: 0x%x. There are %d mem regions remaining.", it.second.mr, unbuf_mem_regions.size());
114 234 : it.second.size = 0;
115 234 : free(it.second.data);
116 : }
117 3260 : }
118 :
119 : // event loop control
120 673 : void FelixClient::run() {
121 673 : if (affinity != "") {
122 0 : std::vector<int> cpus = parseCpuRange(affinity);
123 0 : if (!cpus.empty()) {
124 0 : cpu_set_t pmask;
125 0 : CPU_ZERO(&pmask);
126 0 : for (int cpu : cpus) {
127 0 : CPU_SET(cpu, &pmask);
128 : }
129 0 : int s = sched_setaffinity(0, sizeof(pmask), &pmask);
130 0 : if (s != 0) {
131 0 : clog_error("Setting affinity to CPU rage [%s] failed with error = %d", affinity.c_str(), s);
132 : }
133 : }
134 0 : }
135 673 : pthread_setname_np(pthread_self(), "felix-client-thread");
136 :
137 673 : evloop_thread_id = std::this_thread::get_id();
138 673 : clog_info("In run: 0x%x", evloop_thread_id);
139 673 : clog_info("Running netio_run");
140 673 : netio_run(&ctx.evloop);
141 559 : clog_info("Finished netio_run");
142 559 : }
143 :
144 : /**
145 : * @brief Stops the eventloop and clean up resources
146 : *
147 : * Since the memory management in netio-next relies on connections and endpoints being closed properly,
148 : * this function takes care to execute this procedure in the right order and then stopping the eventloop afterwards.
149 : * Also netio-next and its data structures use c-style memory allocation, so resources need to be freed manually.
150 : *
151 : * First, all subscriptions are unsubscribed, then all send_sockets close their connections and they are eventually deleted.
152 : * This has to go through the eventloop. The timeouts ensure that everything is closed even if the eventloop or some other
153 : * network resources get stuck.
154 : *
155 : * When using felix-client-thread/felix-interface, this fuction is called automatically in the destructor.
156 : */
157 595 : void FelixClient::stop() {
158 595 : clog_info("Stopping netio");
159 595 : terminating_felix_client = true;
160 595 : contextHandler->terminatingFelixClient = true;
161 595 : netio_timer_close(&ctx.evloop, &subscribe_timer);
162 595 : netio_timer_close(&ctx.evloop, &user_timer);
163 :
164 : //unsubscribing all elinks
165 595 : std::vector<uint64_t> fids_to_unsubscribe = contextHandler->getFidsToUnsubscribe();
166 595 : clog_debug("Closing %d subscriptions", fids_to_unsubscribe.size());
167 2628 : for(auto fid : fids_to_unsubscribe){
168 2033 : if(evloop_thread_id == std::this_thread::get_id()){
169 49 : send_unsubscribe_msg(fid);
170 : } else {
171 1984 : unsubscribe(fid);
172 : }
173 : }
174 :
175 : uint timeout = 1000;
176 : uint t_expired = 0;
177 : uint span = 25U;
178 :
179 :
180 2704 : while (!contextHandler->areAllFidsUnsubscribed() && (t_expired <= timeout)){
181 2109 : std::this_thread::sleep_for(std::chrono::milliseconds(span));
182 2109 : t_expired += span;
183 :
184 : }
185 595 : if (timeout <= t_expired){
186 48 : clog_error("Timeout during stopping netio, not all subscriptions have been closed. %d subscriptions remain open.", contextHandler->getFidsToUnsubscribe().size());
187 : }
188 :
189 :
190 595 : std::vector<netio_send_socket*> send_sockets = contextHandler->getSendSockets();
191 595 : total_number_send_sockets.store(send_sockets.size());
192 595 : clog_debug("Closing netio %d send sockets.", total_number_send_sockets.load());
193 1096 : for(auto& socket : send_sockets){
194 501 : netio_disconnect(socket);
195 : }
196 :
197 : t_expired = 0;
198 1092 : while ((total_number_send_sockets.load(std::memory_order_relaxed) != 0) && (t_expired <= timeout)){
199 497 : std::this_thread::sleep_for(std::chrono::milliseconds(span));
200 497 : t_expired += span;
201 : }
202 595 : if (timeout <= t_expired){
203 0 : clog_error("Timeout during stopping netio, not all send sockets have been disconnected, %d remain open.", total_number_send_sockets.load());
204 : }
205 595 : netio_signal_close(&ctx.evloop, &send_signal);
206 595 : netio_signal_close(&ctx.evloop, &sub_signal);
207 595 : netio_terminate_signal(&ctx.evloop);
208 595 : }
209 :
210 :
211 : /**
212 : * @brief Sends a single message
213 : *
214 : * This function is executed by the eventloop, which ensures there are no data races for completion objects/buffers in netio-next.
215 : *
216 : * In buffered and unbuffered mode, the message is copied either in the buffer or in a memory region.
217 : * Hence, after this function returns it is safe for the user to reuse the memory of this message.
218 : *
219 : * @param fid: FID
220 : * @param header: ToFlxHeader, contains elink and msg size
221 : * @param data: pointer to data to send
222 : * @param size: size of message
223 : * @param flush: for buffered mode: should the buffer be flushed after this message
224 : * @param prom_send_done: promise to notify user thread that the message was sent.
225 : */
226 593 : int FelixClient::send_evloop(netio_tag_t fid, ToFlxHeader header, const uint8_t* data, size_t size, bool flush, std::promise<uint8_t> prom_send_done){
227 593 : if(contextHandler->getSocketState(fid) == DISCONNECTED){
228 0 : clog_error("Unbuffered socket is disconnected. Thread: 0x%x FID: 0x%x", std::this_thread::get_id(), fid);
229 0 : std::exception_ptr e = std::make_exception_ptr(FelixClientSendConnectionException());
230 0 : prom_send_done.set_exception(e);
231 0 : return 1;
232 0 : }
233 :
234 593 : int status = NETIO_STATUS_OK;
235 :
236 593 : if (contextHandler->getType(fid) == BUFFERED) {
237 317 : if(size > contextHandler->getPagesize(fid)){
238 0 : clog_error("Trying to send message that exceed the page size.");
239 0 : return 1;
240 : }
241 317 : netio_buffered_send_socket* buffered_socket = get_buffered_send_socket(fid);
242 317 : if(!contextHandler->exists(static_cast<void*>(buffered_socket))){return NETIO_STATUS_OK;}
243 317 : size_t len = sizeof(header) + size;
244 317 : std::vector<uint8_t> buf;
245 317 : buf.reserve(len);
246 317 : memcpy(buf.data(), &header, sizeof(header));
247 317 : memcpy(buf.data() + sizeof(header), data, size);
248 317 : clog_debug("Sending (buffered) for 0x%x, i.e. tag %d, socket 0x%p", fid, header.elink, buffered_socket);
249 317 : status = netio_buffered_send(buffered_socket, (void*)buf.data(), len);
250 317 : clog_debug("Status %d", status);
251 :
252 317 : if (flush) {
253 317 : netio_buffered_flush(buffered_socket);
254 : }
255 :
256 317 : } else {
257 276 : if(size > UNBUFFERED_MR_SIZE){
258 0 : clog_error("Trying to send message that exceed the page size.");
259 0 : return 1;
260 : }
261 276 : netio_send_socket* unbuffered_socket = get_unbuffered_send_socket(fid);
262 276 : std::string addr = contextHandler->getAddress(fid);
263 276 : clog_debug("Begin sending process. Buffer ptr: 0x%p", unbuf_mem_regions[addr].data);
264 276 : memcpy(unbuf_mem_regions[addr].data, &header, sizeof(header));
265 276 : memcpy((uint8_t*)unbuf_mem_regions[addr].data + sizeof(header), data, size);
266 276 : clog_debug("Memcopy done. Total size: %d (header: %d + size: %d)", sizeof(header) + size, sizeof(header), size);
267 276 : if(!contextHandler->exists(static_cast<void*>(unbuffered_socket))){return NETIO_STATUS_OK;}
268 276 : status = netio_send(unbuffered_socket, &unbuf_mem_regions[addr], unbuf_mem_regions[addr].data, sizeof(header) + size, (uint64_t)&unbuf_mem_regions[addr]);
269 276 : }
270 593 : prom_send_done.set_value(status);
271 593 : return status;
272 : }
273 :
274 : /**
275 : * @brief Sends a vector of multiple messages
276 : *
277 : * This function is executed by the eventloop, which ensures there are no data races for completion objects/buffers in netio-next.
278 : * It works similar to the previous send_eventloop function for a single message, but takes instead a vector of messages that will be sent in one shot.
279 : * In unbuffered mode the data is copied directly into the memory region of the corresponding socket while buffered mode uses an additional step.
280 : * THe function does not have the flush parameter in buffered mode which cause the buffere to be flushed after every message
281 : * (This behavior was requested to speed up the sending of padded messages for e.g. programming FW in the FE electronics.)
282 : *
283 : * @param fid: FID
284 : * @param msgs: std::vector of pointer to all messages to be sent (including the padding)
285 : * @param sizes: std::vector of message sizes for the corresponding messages in the previous parameter
286 : * @param prom_send_done: promise to notify user thread that the message was sent
287 : */
288 303 : int FelixClient::send_evloop(netio_tag_t fid, const std::vector<const uint8_t*>& msgs, const std::vector<size_t>& sizes, std::promise<uint8_t> prom_send_done){
289 :
290 303 : if(contextHandler->getSocketState(fid) == DISCONNECTED){
291 0 : clog_error("Unbuffered socket is disconnected. Thread: 0x%x FID: 0x%x", std::this_thread::get_id(), fid);
292 0 : std::exception_ptr e = std::make_exception_ptr(FelixClientSendConnectionException());
293 0 : prom_send_done.set_exception(e);
294 0 : return 1;
295 0 : }
296 :
297 303 : ToFlxHeader header;
298 606 : size_t totalSize = accumulate(sizes.begin(), sizes.end(), 0) + sizeof(header) * msgs.size();
299 303 : size_t currentSize = 0;
300 :
301 :
302 303 : int status = NETIO_STATUS_OK;
303 :
304 303 : if (contextHandler->getType(fid) == BUFFERED) {
305 133 : if(totalSize > contextHandler->getPagesize(fid)){
306 0 : clog_error("Trying to send message that exceed the page size.");
307 0 : return 1;
308 : }
309 133 : netio_buffered_send_socket* buffered_socket = get_buffered_send_socket(fid);
310 133 : clog_debug("Socket = 0x%x with state: %d", buffered_socket, contextHandler->getSocketState(fid));
311 133 : if(!contextHandler->exists(static_cast<void*>(buffered_socket))){return NETIO_STATUS_OK;}
312 :
313 133 : std::vector<uint8_t> buf;
314 133 : buf.reserve(totalSize);
315 :
316 407 : for(unsigned i = 0; i < msgs.size(); i++){
317 :
318 274 : header.length = sizes[i];
319 274 : header.reserved = 0;
320 274 : header.elink = get_elink(fid);
321 :
322 274 : memcpy(buf.data() + currentSize, &header, sizeof(header));
323 274 : memcpy(buf.data() + currentSize + sizeof(header), msgs[i], sizes[i]);
324 :
325 274 : currentSize += sizeof(header) + sizes[i];
326 : }
327 :
328 133 : clog_debug("Sending (buffered) for 0x%x", fid);
329 133 : clog_debug("Sending %d messages with the total size of %d", msgs.size(), totalSize);
330 133 : {
331 133 : status = netio_buffered_send(buffered_socket, (void*)buf.data(), totalSize);
332 133 : clog_debug("Status %d", status);
333 133 : netio_buffered_flush(buffered_socket);
334 : }
335 133 : } else {
336 : // Unbuffered
337 170 : if(totalSize > UNBUFFERED_MR_SIZE){
338 0 : clog_error("Trying to send message that exceed the page size.");
339 0 : return 1;
340 : }
341 170 : std::string addr = contextHandler->getAddress(fid);
342 170 : netio_send_socket* unbuffered_socket = get_unbuffered_send_socket(fid);
343 170 : if(!contextHandler->exists(static_cast<void*>(unbuffered_socket))){return NETIO_STATUS_OK;}
344 :
345 514 : for(unsigned i = 0; i < msgs.size(); i++){
346 :
347 344 : header.length = sizes[i];
348 344 : header.reserved = 0;
349 344 : header.elink = get_elink(fid);
350 :
351 344 : memcpy((uint8_t*)unbuf_mem_regions[addr].data + currentSize, &header, sizeof(header));
352 344 : memcpy((uint8_t*)unbuf_mem_regions[addr].data + currentSize + sizeof(header), msgs[i], sizes[i]);
353 :
354 344 : currentSize += sizeof(header) + sizes[i];
355 : }
356 :
357 170 : clog_debug("Sending (unbuffered) for 0x%x", fid);
358 170 : clog_debug("Sending %d messages with the total size of %d", msgs.size(), totalSize);
359 :
360 170 : {
361 170 : status = netio_send(unbuffered_socket, &unbuf_mem_regions[addr], unbuf_mem_regions[addr].data, totalSize, (uint64_t)&unbuf_mem_regions[addr]);
362 : }
363 :
364 170 : }
365 303 : prom_send_done.set_value(status);
366 303 : return status;
367 :
368 : }
369 :
370 : /**
371 : * @brief Function run by the eventloop thread to establish send connection
372 : *
373 : * This function will either create a new send_socket and atmept to connect it or just check the current state of this connection.
374 : * If the connection is up or there is an error, it will notify the calling user thread by returning the corresponding error code via the std::promise.
375 : * If the connection is still about to be established, the signal is fired again and the state si checked again in the new function call.
376 : *
377 : *
378 : * @param fid: FID
379 : * @param prom_sub_done: promise to notify user thread when subscription is done
380 : */
381 641 : void FelixClient::establish_send_connection(uint64_t fid, std::promise<uint8_t> prom_sub_done){
382 641 : if (contextHandler->getType(fid) == BUFFERED) {
383 347 : netio_buffered_send_socket* buffered_socket = get_buffered_send_socket(fid);
384 347 : clog_debug("Buffered socket = 0x%p with state: %d", buffered_socket, contextHandler->getSocketState(fid));
385 347 : if(contextHandler->getSocketState(fid) == CONNECTING || contextHandler->getSocketState(fid) == CONNECTED){
386 347 : prom_sub_done.set_value(0);
387 347 : return;
388 : } else {
389 0 : clog_error("Buffered socket is disconnected. Evloop thread: 0x%x FID: 0x%x state is %d", std::this_thread::get_id(), fid, contextHandler->getSocketState(fid));
390 0 : std::exception_ptr e = std::make_exception_ptr(FelixClientSendConnectionException());
391 0 : prom_sub_done.set_exception(e);
392 0 : return;
393 0 : }
394 : } else {
395 294 : netio_send_socket* unbuffered_socket = get_unbuffered_send_socket(fid);
396 294 : clog_debug("Unbuffered socket = 0x%x with state: %d", unbuffered_socket, contextHandler->getSocketState(fid));
397 294 : if(contextHandler->getSocketState(fid) == CONNECTING || contextHandler->getSocketState(fid) == CONNECTED){
398 294 : prom_sub_done.set_value(0);
399 294 : return;
400 : } else {
401 0 : clog_error("Unbuffered socket is disconnected. Evloop thread: 0x%x FID: 0x%x", std::this_thread::get_id(), fid);
402 0 : std::exception_ptr e = std::make_exception_ptr(FelixClientSendConnectionException());
403 0 : prom_sub_done.set_exception(e);
404 0 : return;
405 0 : }
406 : }
407 :
408 : }
409 :
410 :
411 : /**
412 : * @brief Sends a single message (exposed by the API)
413 : *
414 : * This function might be executed by the user thread when calling the API send_data() or by the eventloop when send_data() is called from within a callback.
415 : * First, it is checked which thread is executing this function since this impacts guarantees for thread safety.
416 : * If the function is called by the eventloop thread, send_data_intern() is called and then the function returns.
417 : * If called by the user thread and if there is no send connection to felix-toflx, the function will attempt to create one.
418 : * To avoid data races, this needs to be executed by the eventloop. The corresponding signal is fired and the function is blocked with a future/promise pair until the connection is up and can be used for sending. However, this is only necessary when a new connection needs to be established.
419 : * When the connection is available, the send signal is fired and the user thread is again blocked until the send is executed by the eventloop.
420 : *
421 : * The two future/promise pairs also have a timeout of 2s, respective 5s to avaoid deadlock situations when the eventloop thread is blocked by the user inside a callback.
422 : *
423 : * @param fid: FID
424 : * @param data: pointer to data to send
425 : * @param size: size of message
426 : * @param flush: for buffered mode: should the buffer be flushed after this message
427 : */
428 611 : void FelixClient::send_data(netio_tag_t fid, const uint8_t* data, size_t size, bool flush) {
429 611 : if(terminating_felix_client.load()){
430 0 : clog_debug("Already terminating, should not send data");
431 0 : return;
432 : }
433 :
434 611 : try{
435 611 : contextHandler->createOrUpdateInfo(fid, &bus, false);
436 2 : } catch (std::exception& e) {
437 2 : clog_error("Problem while reading bus: %s", e.what());
438 2 : throw FelixClientSendConnectionException();
439 2 : }
440 :
441 609 : clog_debug("Address for FID 0x%x; %s:%d", fid, contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid));
442 :
443 609 : if (!contextHandler->exists(fid)) {
444 0 : clog_error("Invalid Bus entry");
445 0 : throw FelixClientSendConnectionException();
446 : }
447 :
448 609 : if(evloop_thread_id == std::this_thread::get_id()){
449 0 : send_data_intern(fid, data, size, flush);
450 0 : return;
451 : }
452 :
453 : //If there is no connection, we have to establish a new one through the evloop
454 609 : if (contextHandler->getSocketState(fid) == DISCONNECTED || contextHandler->getSubscriptionState(fid) == TIMEOUT){ //What about CONNECTING?
455 519 : std::promise<uint8_t> prom_sub_done;
456 519 : std::future<uint8_t> future_sub_done = prom_sub_done.get_future();
457 519 : contextHandler->pushSendTask(SendConnectEvent{fid, std::move(prom_sub_done)});
458 519 : netio_signal_fire(&send_signal);
459 519 : try{
460 519 : std::chrono::milliseconds timeout (5000);
461 519 : if (future_sub_done.wait_for(timeout)==std::future_status::timeout){
462 0 : clog_error("Timeout to establish send connection expired, stop blocking to prevent deadlock.");
463 0 : throw FelixClientResourceNotAvailableException();
464 : }
465 519 : future_sub_done.get();
466 0 : } catch(std::exception& e){
467 0 : clog_info("Exception in evloop thread: %s thread ID: 0x%x FID: 0x%x", e.what(), std::this_thread::get_id(), fid);
468 0 : throw;
469 0 : }
470 519 : }
471 :
472 609 : if(contextHandler->getSocketState(fid) == CONNECTING){
473 513 : if(!contextHandler->waitConnected(fid, 2000)){
474 48 : clog_error("Socket is not connected. Instead: %d on thread: 0x%x FID: 0x%x", contextHandler->getSocketState(fid), std::this_thread::get_id(), fid);
475 48 : throw FelixClientSendConnectionException();
476 : }
477 : }
478 :
479 561 : ToFlxHeader header;
480 561 : header.length = size;
481 561 : header.reserved = 0;
482 561 : header.elink = get_elink(fid);
483 561 : int status = NETIO_STATUS_OK;
484 :
485 : //send
486 561 : std::promise<uint8_t> prom_send_done;
487 561 : std::future<uint8_t> future_send_done = prom_send_done.get_future();
488 :
489 561 : contextHandler->pushSendTask(SendDataEvent{fid, header, data, size, flush, std::move(prom_send_done)});
490 561 : netio_signal_fire(&send_signal);
491 :
492 : // wait for promise
493 :
494 561 : try{
495 561 : std::chrono::milliseconds timeout (5000);
496 561 : if (future_send_done.wait_for(timeout)==std::future_status::timeout){
497 0 : clog_error("Send timeout expired, stop blocking to prevent deadlock."); //No information if send we executed later. Potentially data is corrupted from now on since function will return.
498 0 : throw FelixClientException("Send timeout expired");
499 : }
500 561 : status = future_send_done.get();
501 0 : } catch(std::exception& e){
502 0 : clog_info("Exception in evloop thread: %s thread ID: 0x%x FID: 0x%x", e.what(), std::this_thread::get_id(), fid);
503 0 : throw;
504 0 : }
505 :
506 561 : switch(status) {
507 561 : case NETIO_STATUS_OK: return;
508 0 : case NETIO_STATUS_TOO_BIG: throw FelixClientMessageTooBigException();
509 0 : case NETIO_STATUS_AGAIN: throw FelixClientResourceNotAvailableException();
510 0 : case NETIO_STATUS_PARTIAL: throw FelixClientPartiallyCompletedException();
511 0 : default: throw FelixClientException("Unknown status code: " + status);
512 : }
513 561 : }
514 :
515 :
516 : /**
517 : * @brief Sends a vector of multiple messages
518 : *
519 : * This function provides the same functionality as the previous one for single messages but takes a vector of messages as input and
520 : * sends them in a single mesaage to felix-toflx.
521 : * Another difference to the single message version is that the toflx header is not already added here but by the send_evloop() function.
522 : *
523 : * @param fid: FID
524 : * @param msgs: std::vector of pointer to all messages to be sent (including the padding)
525 : * @param sizes: std::vector of message sizes for the corresponding messages in the previous parameter
526 : * @param prom_send_done: promise to notify user thread that the message was sent
527 : */
528 297 : void FelixClient::send_data(netio_tag_t fid, const std::vector<const uint8_t*>& msgs, const std::vector<size_t>& sizes){
529 297 : if(terminating_felix_client.load()){
530 0 : clog_debug("Already terminating, should not send data");
531 0 : return;
532 : }
533 :
534 297 : try{
535 297 : contextHandler->createOrUpdateInfo(fid, &bus, false);
536 0 : } catch (std::exception& e) {
537 0 : clog_error("Problem while reading bus: %s", e.what());
538 0 : throw FelixClientSendConnectionException();
539 0 : }
540 :
541 297 : if (!contextHandler->exists(fid)) {
542 0 : clog_error("Invalid Bus entry ");
543 0 : throw FelixClientSendConnectionException();
544 : }
545 :
546 297 : clog_info("Address %s", contextHandler->getAddress(fid).c_str());
547 297 : if (msgs.size() != sizes.size()){
548 0 : FelixClientException("Error! Number of messages differs from number of sizes");
549 : }
550 :
551 297 : if(evloop_thread_id == std::this_thread::get_id()){
552 0 : send_data_intern(fid, msgs, sizes);
553 0 : return;
554 : }
555 :
556 297 : if (contextHandler->getSocketState(fid) == DISCONNECTED || contextHandler->getSubscriptionState(fid) == TIMEOUT){ //What about CONNECTIONG?
557 16 : std::promise<uint8_t> prom_sub_done;
558 16 : std::future<uint8_t> future_sub_done = prom_sub_done.get_future();
559 16 : contextHandler->pushSendTask(SendConnectEvent{fid, std::move(prom_sub_done)});
560 16 : netio_signal_fire(&send_signal);
561 16 : try{
562 16 : std::chrono::milliseconds timeout (5000);
563 16 : if (future_sub_done.wait_for(timeout)==std::future_status::timeout){
564 0 : clog_error("Timeout to establish send connection expired, stop blocking to prevent deadlock.");
565 0 : throw FelixClientResourceNotAvailableException();
566 : }
567 16 : future_sub_done.get();
568 0 : } catch(std::exception& e){
569 0 : clog_info("Exception in evloop thread: %s thread ID: 0x%x FID: 0x%x", e.what(), std::this_thread::get_id(), fid);
570 0 : throw;
571 0 : }
572 16 : }
573 :
574 297 : if(contextHandler->getSocketState(fid) == CONNECTING){
575 16 : if(!contextHandler->waitConnected(fid, 2000)){
576 0 : clog_error("Socket is not connected. Instead: %d on thread: 0x%x FID: 0x%x", contextHandler->getSocketState(fid), std::this_thread::get_id(), fid);
577 0 : throw FelixClientSendConnectionException();
578 : }
579 : }
580 :
581 297 : std::promise<uint8_t> prom_send_done;
582 297 : std::future<uint8_t> future_send_done = prom_send_done.get_future();
583 297 : contextHandler->pushSendTask(SendDataVectorEvent{fid, msgs, sizes, std::move(prom_send_done)});
584 297 : netio_signal_fire(&send_signal);
585 :
586 297 : int status = NETIO_STATUS_OK;
587 : // wait for promise
588 :
589 297 : try{
590 297 : std::chrono::milliseconds timeout (5000);
591 297 : if (future_send_done.wait_for(timeout)==std::future_status::timeout){
592 0 : clog_error("Send timeout expired, stop blocking to prevent deadlock."); //No information if send we executed later. Potentially data is corrupted from now on since function will return.
593 0 : throw FelixClientException("Send timeout expired");
594 : }
595 297 : status = future_send_done.get();
596 0 : } catch(std::exception& e){
597 0 : clog_info("Exception in evloop thread: %s thread ID: 0x%x FID: 0x%x", e.what(), std::this_thread::get_id(), fid);
598 0 : throw;
599 0 : }
600 :
601 297 : switch(status) {
602 297 : case NETIO_STATUS_OK: return;
603 0 : case NETIO_STATUS_TOO_BIG: throw FelixClientMessageTooBigException();
604 0 : case NETIO_STATUS_AGAIN: throw FelixClientResourceNotAvailableException();
605 0 : case NETIO_STATUS_PARTIAL: throw FelixClientPartiallyCompletedException();
606 0 : default: throw FelixClientException("Unknown status code: " + status);
607 : }
608 297 : }
609 :
610 : /**
611 : * @brief Sends a single message and is guaranteed to be non-blocking (exposed by the API)
612 : *
613 : * This functionality was requested by the DCS team who need send_data() to be guranteed non-blocking to avaoid deadlocks in the OPC/SCA server.
614 : * The function behaves similar to the regular send_data() with two main differences:
615 : * 1) It will not attempt to establish a send connection if it is not already up. This is now in the responsability of the user since this cannot be done in a non=blocking way.
616 : * 2) The data is copied before the function returns and it does not provide any information if the send was successful or not.
617 : * The copy is needed because the user would not now afterwards when it is save to reuse the memory.
618 : *
619 : * @param fid: FID
620 : * @param data: pointer to data to send
621 : * @param size: size of message
622 : * @param flush: for buffered mode: should the buffer be flushed after this message
623 : */
624 32 : void FelixClient::send_data_nb(netio_tag_t fid, const uint8_t* data, size_t size, bool flush) {
625 32 : if(terminating_felix_client.load()){
626 0 : clog_debug("Already terminating, should not send data");
627 0 : return;
628 : }
629 :
630 32 : if (!contextHandler->exists(fid) || contextHandler->getSocketState(fid) != CONNECTED) {
631 0 : clog_error("No corresponding send connection for this FID. Have you tried init_send_data?");
632 0 : throw FelixClientSendConnectionException();
633 : }
634 :
635 32 : std::vector<uint8_t> data_copy(data, data+size);
636 :
637 32 : ToFlxHeader header;
638 32 : header.length = size;
639 32 : header.reserved = 0;
640 32 : header.elink = get_elink(fid);
641 :
642 : //send
643 32 : std::promise<uint8_t> prom_send_done;
644 :
645 32 : contextHandler->pushSendTask(SendDataNbEvent{fid, header, std::move(data_copy), size, flush, std::move(prom_send_done)});
646 32 : netio_signal_fire(&send_signal);
647 32 : }
648 :
649 : /**
650 : * @brief Sends a vector of multiple messages which is guaranteed to be non-blocking (exposed by the API)
651 : *
652 : * Analog to the other send_data() implementations, this function provides the same functionality for a vector of messages instead of a singel message.
653 : * The main differences compared to the single message version is that it will not attempt to establish a send connection if it is not already up. This is now in the responsability of the user since this cannot be done in a non-blocking way.
654 : * And of course it does not provide any information to the user if the send was successful or not.
655 : *
656 : * @param fid: FID
657 : * @param msgs: std::vector of pointer to all messages to be sent (including the padding)
658 : * @param sizes: std::vector of message sizes for the corresponding messages in the previous parameter
659 : * @param prom_send_done: promise to notify user thread that the message was sent
660 : */
661 6 : void FelixClient::send_data_nb(netio_tag_t fid, const std::vector<const uint8_t*>& msgs, const std::vector<size_t>& sizes){
662 6 : if(terminating_felix_client.load()){
663 0 : clog_debug("Already terminating, should not send data");
664 0 : return;
665 : }
666 :
667 6 : if (!contextHandler->exists(fid) || contextHandler->getSocketState(fid) != CONNECTED) {
668 0 : clog_error("No corresponding send connection for this FID. Have you tried init_send_data?");
669 0 : throw FelixClientSendConnectionException();
670 : }
671 :
672 6 : if (msgs.size() != sizes.size()){
673 0 : FelixClientException("Error! Number of messages differs from number of sizes");
674 : }
675 :
676 6 : std::vector<std::vector<uint8_t>> data;
677 6 : std::vector<const uint8_t*> msgs_copy;
678 22 : for (size_t i = 0; i < msgs.size(); i++){
679 16 : data.emplace_back(msgs[i], msgs[i]+sizes[i]);
680 16 : msgs_copy.push_back(&data[i][0]);
681 : }
682 :
683 6 : std::vector<uint64_t> size_copy(sizes);
684 :
685 6 : std::promise<uint8_t> prom_send_done;
686 6 : contextHandler->pushSendTask(SendDataVectorNbEvent{fid, std::move(data), std::move(msgs_copy), std::move(size_copy), std::move(prom_send_done)});
687 6 : netio_signal_fire(&send_signal);
688 6 : }
689 :
690 :
691 : /**
692 : * @brief Send_data() function that must be used by the eventloop thread
693 : *
694 : * Similar to the non-blocking send, the send_data() function call that is executed by the eventloop thread has to be non-blocking.
695 : * But there is no need to a timely return, so this function will still try to establish a connection and then call itself again through the eventloop until the connection is up. The std::pormise that is passed the establish_send_connection() and send_evloop() funcitons are just dummy objects htat are not actually checked.
696 : * Also, it exploits the fact that it is executed by the eventloop and there is no need for thread safety measuremnts.
697 : * So, send_eventloop() is directly called form this function without firing another signal.
698 : *
699 : * @param fid: FID
700 : * @param data: pointer to data to send
701 : * @param size: size of message
702 : * @param flush: for buffered mode: should the buffer be flushed after this message
703 : */
704 0 : void FelixClient::send_data_intern(netio_tag_t fid, const uint8_t* data, size_t size, bool flush) {
705 :
706 0 : ToFlxHeader header;
707 0 : header.length = size;
708 0 : header.reserved = 0;
709 0 : header.elink = get_elink(fid);
710 0 : int status = NETIO_STATUS_OK;
711 0 : std::promise<uint8_t> dummy_prom_sub;
712 0 : std::promise<uint8_t> dummy_prom_send;
713 :
714 0 : if((contextHandler->getSocketState(fid) != CONNECTING) || (contextHandler->getSocketState(fid) != CONNECTED)){
715 0 : try{
716 0 : establish_send_connection(fid, std::move(dummy_prom_sub));
717 0 : } catch(std::exception& e){
718 0 : clog_info("Exception while establishing connection: %s thread ID: 0x%x FID: 0x%x", e.what(), std::this_thread::get_id(), fid);
719 0 : throw;
720 0 : }
721 : }
722 :
723 0 : if(contextHandler->getSocketState(fid) == CONNECTING){
724 0 : contextHandler->pushSendTask(SendDataInternEvent{fid, header, data, size, flush, std::move(dummy_prom_send),1});
725 0 : netio_signal_fire(&send_signal);
726 : return;
727 : }
728 :
729 0 : try{
730 0 : status = send_evloop(fid, header, data, size, flush, std::move(dummy_prom_send));
731 0 : } catch(std::exception& e){
732 0 : clog_info("Exception while sending: %s thread ID: 0x%x FID: 0x%x", e.what(), std::this_thread::get_id(), fid);
733 0 : throw;
734 0 : }
735 :
736 0 : switch(status) {
737 : case NETIO_STATUS_OK: return;
738 0 : case NETIO_STATUS_TOO_BIG: throw FelixClientMessageTooBigException();
739 0 : case NETIO_STATUS_AGAIN: throw FelixClientResourceNotAvailableException();
740 0 : case NETIO_STATUS_PARTIAL: throw FelixClientPartiallyCompletedException();
741 0 : default: throw FelixClientException("Unknown status code: " + status);
742 : }
743 0 : }
744 :
745 :
746 : /**
747 : * @brief Send_data() function for vector of messages that must be used by the eventloop thread
748 : *
749 : * The same changes described in the single message version with respect to the regular send_data() function apply here as well.
750 : *
751 : * @param fid: FID
752 : * @param msgs: std::vector of pointer to all messages to be sent (including the padding)
753 : * @param sizes: std::vector of message sizes for the corresponding messages in the previous parameter
754 : * @param prom_send_done: promise to notify user thread that the message was sent
755 : */
756 0 : void FelixClient::send_data_intern(netio_tag_t fid, const std::vector<const uint8_t*>& msgs, const std::vector<size_t>& sizes) {
757 :
758 0 : int status = NETIO_STATUS_OK;
759 0 : std::promise<uint8_t> dummy_prom_sub;
760 0 : std::promise<uint8_t> dummy_prom_send;
761 :
762 0 : if((contextHandler->getSocketState(fid) != CONNECTING) || (contextHandler->getSocketState(fid) != CONNECTED)){
763 0 : try{
764 0 : establish_send_connection(fid, std::move(dummy_prom_sub));
765 0 : } catch(std::exception& e){
766 0 : clog_info("Exception while establishing connection: %s thread ID: 0x%x FID: 0x%x", e.what(), std::this_thread::get_id(), fid);
767 0 : throw;
768 0 : }
769 : }
770 :
771 0 : if(contextHandler->getSocketState(fid) == CONNECTING){
772 0 : contextHandler->pushSendTask(SendDataInternVectorEvent{fid, msgs, sizes, std::move(dummy_prom_send), 1});
773 0 : netio_signal_fire(&send_signal);
774 : return;
775 : }
776 :
777 0 : try{
778 0 : status = send_evloop(fid, msgs, sizes, std::move(dummy_prom_send));
779 0 : } catch(std::exception& e){
780 0 : clog_info("Exception while sending: %s thread ID: 0x%x FID: 0x%x", e.what(), std::this_thread::get_id(), fid);
781 0 : throw;
782 0 : }
783 :
784 0 : switch(status) {
785 : case NETIO_STATUS_OK: return;
786 0 : case NETIO_STATUS_TOO_BIG: throw FelixClientMessageTooBigException();
787 0 : case NETIO_STATUS_AGAIN: throw FelixClientResourceNotAvailableException();
788 0 : case NETIO_STATUS_PARTIAL: throw FelixClientPartiallyCompletedException();
789 0 : default: throw FelixClientException("Unknown status code: " + status);
790 : }
791 0 : }
792 :
793 : /**
794 : * @brief Generic callback for all send related function calls
795 : *
796 : * This design allows to reuse the same signal (which must have a single callback) for all send related functions.
797 : * The function pops the next task from the send_task_queue in the contextHandler and decides based on the object type which function to call and which parameter to pass.
798 : *
799 : */
800 1471 : void FelixClient::exec_send(){
801 1471 : send_var_t task;
802 1471 : contextHandler->pullSendTask(task);
803 :
804 1471 : std::visit(overloaded {
805 0 : [](std::monostate const&) {
806 0 : clog_info("Nothing to send. Send task queue is empty.");
807 : },
808 561 : [this](SendDataEvent& sendData) {
809 561 : this->send_evloop(sendData.fid, sendData.header, sendData.data, sendData.size, sendData.flush, std::move(sendData.prom_send_done));
810 561 : },
811 32 : [this](SendDataNbEvent& sendNbData) {
812 32 : this->send_evloop(sendNbData.fid, sendNbData.header, &sendNbData.data[0], sendNbData.size, sendNbData.flush, std::move(sendNbData.prom_send_done));
813 32 : },
814 297 : [this](SendDataVectorEvent& sendDataVector) {
815 297 : this->send_evloop(sendDataVector.fid, sendDataVector.msgs, sendDataVector.sizes, std::move(sendDataVector.prom_send_done));
816 297 : },
817 6 : [this](SendDataVectorNbEvent& sendDataNbVector) {
818 6 : this->send_evloop(sendDataNbVector.fid, sendDataNbVector.msgs, sendDataNbVector.sizes, std::move(sendDataNbVector.prom_send_done));
819 6 : },
820 575 : [this](SendConnectEvent& sendConnect) {
821 575 : this->establish_send_connection(sendConnect.fid, std::move(sendConnect.prom_sub_done));
822 575 : },
823 0 : [this](SendDataInternEvent& sendData) {
824 0 : if(contextHandler->getSocketState(sendData.fid) == CONNECTED){
825 0 : this->send_evloop(sendData.fid, sendData.header, sendData.data, sendData.size, sendData.flush, std::move(sendData.prom_send_done));
826 : }else{
827 0 : if(sendData.count< maxSendAttempts){
828 0 : contextHandler->pushSendTask(SendDataInternEvent{sendData.fid, sendData.header, sendData.data, sendData.size, sendData.flush, std::move(sendData.prom_send_done), ++sendData.count});
829 : }
830 : }
831 0 : },
832 0 : [this](SendDataInternVectorEvent& sendDataVector) {
833 0 : if(contextHandler->getSocketState(sendDataVector.fid) == CONNECTED){
834 0 : this->send_evloop(sendDataVector.fid, sendDataVector.msgs, sendDataVector.sizes, std::move(sendDataVector.prom_send_done));
835 : }else{
836 0 : if(sendDataVector.count < maxSendAttempts){
837 0 : contextHandler->pushSendTask(SendDataInternVectorEvent{sendDataVector.fid, sendDataVector.msgs, sendDataVector.sizes, std::move(sendDataVector.prom_send_done), ++sendDataVector.count});
838 : }
839 : }
840 0 : },
841 : }, task);
842 1471 : }
843 :
844 :
845 :
846 0 : void FelixClient::send_data(netio_tag_t fid, struct iovec* iov, unsigned n, bool flush) {
847 0 : clog_info("send_msg(netio_tag_t fid, struct iovec* iov, unsigned n): not implemented");
848 0 : throw FelixClientSendConnectionException();
849 : }
850 :
851 : /**
852 : * @brief Generic callback for all subscribe related function calls
853 : *
854 : * This design allows to reuse the same signal (which must have a single callback) for all subscribe related functions.
855 : * The function pops the next task from the sub_task_queue in the contextHandler and decides based on the object type which function to call and which parameter to pass.
856 : *
857 : */
858 4287 : void FelixClient::exec_sub(){
859 4287 : sub_var_t task;
860 4287 : contextHandler->pullSubTask(task);
861 :
862 4287 : std::visit(overloaded {
863 0 : [](std::monostate const&) {
864 0 : clog_info("Nothing to send. Send task queue is empty.");
865 : },
866 2118 : [this](UnsubEvent& unsub) {
867 2118 : this->send_unsubscribe_msg(unsub.fid, std::move(unsub.prom_unsub_done));
868 2118 : },
869 2169 : [this](SubscribeEvent const& sub) {
870 2169 : this->subscribe_evloop(sub.fid);
871 : },
872 : }, task);
873 4287 : }
874 :
875 : /**
876 : * @brief Initializes a new send connection
877 : *
878 : * This function ust be called before using the send_data_nb() functions since they will not establish a connection on the first function call.
879 : * This function will not block until the connection is up.
880 : *
881 : * TODO: This function should propably be blocking.
882 : *
883 : * @param fid: FID
884 : */
885 40 : void FelixClient::init_send_data(netio_tag_t fid) {
886 40 : clog_info("init_send_data for FID 0x%x", fid);
887 40 : if(terminating_felix_client.load()){
888 0 : clog_debug("Already terminating, should not send data");
889 0 : return;
890 : }
891 :
892 40 : try{
893 40 : contextHandler->createOrUpdateInfo(fid, &bus, false);
894 0 : } catch (std::exception& e) {
895 0 : clog_error("Problem while reading bus: %s", e.what());
896 0 : throw FelixClientSendConnectionException();
897 0 : }
898 :
899 40 : clog_debug("Address for FID 0x%x; %s:%d", fid, contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid));
900 :
901 40 : if (!contextHandler->exists(fid)) {
902 0 : clog_error("Invalid Bus entry");
903 0 : throw FelixClientSendConnectionException();
904 : }
905 :
906 : //If there is no connection, we have to establish a new one through the evloop
907 40 : if (contextHandler->getSocketState(fid) == DISCONNECTED || contextHandler->getSubscriptionState(fid) == TIMEOUT){ //What about CONNECTING?
908 40 : std::promise<uint8_t> prom_sub_done;
909 40 : contextHandler->pushSendTask(SendConnectEvent{fid, std::move(prom_sub_done)});
910 40 : netio_signal_fire(&send_signal);
911 40 : }
912 :
913 : return;
914 : }
915 :
916 392 : void FelixClient::init_subscribe(netio_tag_t fid) {
917 392 : clog_info("init_subscribe not implemented, subscribe will do the job");
918 392 : return;
919 : }
920 :
921 1471 : void FelixClient::on_send_signal(void* ptr){
922 1471 : clog_trace("On send signal");
923 1471 : static_cast<FelixClient*>(ptr)->exec_send();
924 1471 : }
925 :
926 4287 : void FelixClient::on_sub_signal(void* ptr){
927 4287 : clog_debug("On sub signal");
928 4287 : static_cast<FelixClient*>(ptr)->exec_sub();
929 4287 : }
930 :
931 : /**
932 : * @brief Subscribes felix-client to a vector of FIDs
933 : *
934 : * This function performs a number of checks on the right conditions to subscribe and then forwards this tsk to the eventloop thread.
935 : *
936 : * If no timeout is specified or if it is equal to 0, this function will not block and return immediately.
937 : * Otherwise the function will block until either all subscriptino were successful or the timeout expires.
938 : * Note that it will also return FELIX_CLIENT_STATUS_TIMEOUT if just a single subscription was not successful.
939 : *
940 : * Also, if subscribe() is called by the eventloop thread (for instance from within a callback), all functions must be non-blocking.
941 : * Hence, the function will return FELIX_CLIENT_STATUS_TIMEOUT right away, if a timeout was specified, since the thread cannot be blocked and process the subscription at the same time.
942 : * If the function is called by the user thread, the task is passed via a subscribe signal to the eventloop thread to avaoid race conditions.
943 : *
944 : * @param fids: vector of FIDs to subscribe
945 : * @param timeoutms: timeout in ms in which the subscription needs to be successful.
946 : * @param for_register: indicates if subscription is for send_cmd function.
947 : */
948 651 : int FelixClient::subscribe(const std::vector<netio_tag_t>& fids, uint timeoutms, bool for_register) {
949 651 : std::vector<netio_tag_t> all_fids;
950 651 : clog_debug("Subscribing to %d FIDs", fids.size());
951 651 : uint8_t alreadySubscribedCounter = 0;
952 2858 : for(auto& fid : fids){
953 : // Already subscribed
954 2207 : if (!contextHandler->canSubscribe(fid)) {
955 28 : clog_info("Already subscribed to 0x%x", fid);
956 28 : alreadySubscribedCounter++;
957 28 : continue;
958 : }
959 2179 : if (fid & 0x8000){
960 0 : clog_info("Can't subscribe to toflx fid: 0x%x", fid);
961 0 : continue;
962 : }
963 :
964 2179 : try{
965 2179 : contextHandler->createOrUpdateInfo(fid, &bus, for_register);
966 2169 : struct timespec t0;
967 2169 : clock_gettime(CLOCK_MONOTONIC_RAW, &t0);
968 4338 : contextHandler->setSubscriptionTimes(fid, "subscribe", t0);
969 2169 : contextHandler->setSubscriptionState(fid, SUBING);
970 10 : } catch(...){
971 10 : continue;
972 10 : }
973 2169 : all_fids.push_back(fid);
974 2169 : contextHandler->addToFidsToResubscribe(fid);
975 : }
976 651 : if (fids.size() == alreadySubscribedCounter){
977 : return FELIX_CLIENT_STATUS_ALREADY_DONE;
978 : }
979 :
980 643 : if(evloop_thread_id == std::this_thread::get_id()){
981 0 : for (auto& fid : all_fids){
982 0 : if (!subscribe_evloop(fid)){
983 0 : if (timeoutms == 0) {
984 651 : return FELIX_CLIENT_STATUS_OK;
985 : }
986 : return FELIX_CLIENT_STATUS_TIMEOUT;
987 : }
988 : }
989 : } else {
990 2812 : for (auto& fid : all_fids){
991 2169 : contextHandler->pushSubTask(SubscribeEvent{fid});
992 2169 : netio_signal_fire(&sub_signal);
993 : }
994 : }
995 :
996 643 : if (timeoutms == 0) {
997 : return FELIX_CLIENT_STATUS_OK;
998 : }
999 436 : std::vector<netio_tag_t> not_subscribed_fids;
1000 :
1001 436 : if(!contextHandler->isSubscribed(all_fids, timeoutms)){
1002 1160 : for (auto& fid : all_fids){
1003 1148 : if(!contextHandler->isSubscribed(fid)){
1004 0 : not_subscribed_fids.push_back(fid);
1005 0 : contextHandler->setSubscriptionState(fid, TIMEOUT);
1006 : }
1007 : }
1008 : }
1009 :
1010 436 : if (not_subscribed_fids.empty()) {
1011 436 : if(fids.size() > all_fids.size() + alreadySubscribedCounter){
1012 : return FELIX_CLIENT_STATUS_TIMEOUT;
1013 : }
1014 426 : return FELIX_CLIENT_STATUS_OK;
1015 : }
1016 : return FELIX_CLIENT_STATUS_TIMEOUT;
1017 :
1018 651 : }
1019 :
1020 : /**
1021 : * @brief Subscribes a single FIDs
1022 : *
1023 : * The function creates a vector with one element and calls the other subscribe function that takes a vector of FIDs as input to avoid code duplication.
1024 : *
1025 : * @param fid: FID
1026 : * @param timeoutms: timeout in ms in which the subscription needs to be successful.
1027 : * @param for_register: indicates if subscription is for send_cmd function.
1028 : */
1029 589 : int FelixClient::subscribe(netio_tag_t fid, uint timeoutms, bool for_register) {
1030 589 : if(terminating_felix_client.load()){
1031 0 : clog_debug("Already terminating, should not subscribe anymore");
1032 0 : return FELIX_CLIENT_STATUS_OK;
1033 : }
1034 589 : std::vector<uint64_t> fids{fid};
1035 589 : return subscribe(fids, timeoutms, for_register);
1036 589 : }
1037 :
1038 : /**
1039 : * @brief Actually subscribes a FID (executed by the eventloop)
1040 : *
1041 : * Since the subscription process includes sending a subscription message to the felix-tohost application, this function needs to be executed by the eventloop as well.
1042 : * In addition to the actual subscription process, the time it takes to establish the connection is also measured.
1043 : * If the subscription message cannot be sent because of NETIO_STATUS_AGAIN, the function will call itself.
1044 : * At the end the current status is checked. Since there is no netio callback for establsihed connections but felix-client
1045 : * triggers the user on_connection_establshed for every FID, there are basically two cases:
1046 : * 1) The connection is already up. In this case we assume that the subscription will be successful and call the on_connection_establshed callback within this function.
1047 : * 2) The connection has to be establshed first, then the netio on_connection_estalished callback will trigger the user callback for all subscribed FIDs.
1048 : *
1049 : * @param fid: FID
1050 : */
1051 2278 : bool FelixClient::subscribe_evloop(netio_tag_t fid) {
1052 2278 : try{
1053 2278 : contextHandler->createOrUpdateInfo(fid, &bus);
1054 2274 : struct timespec t1;
1055 2274 : clock_gettime(CLOCK_MONOTONIC_RAW, &t1);
1056 4548 : contextHandler->setSubscriptionTimes(fid, "subscribe_evloop", t1);
1057 4 : } catch(std::exception& e){
1058 4 : return false;
1059 4 : }
1060 2274 : int ret;
1061 2274 : clog_trace("type (0=buffered, 1=unbuffered) %d", contextHandler->getType(fid));
1062 :
1063 2274 : if (contextHandler->getType(fid) == BUFFERED) {
1064 1185 : netio_subscribe_socket* buffered_socket = get_buffered_subscribe_socket(fid);
1065 1185 : clog_debug("(Re-)subscribing (buffered) for fid 0x%x and socket 0x%p", fid, buffered_socket);
1066 1185 : ret = netio_subscribe(buffered_socket, fid);
1067 1185 : clog_debug("(Re-)subscribing (buffered) for 0x%x with send socket has been done", fid);
1068 :
1069 : } else {
1070 1089 : netio_unbuffered_subscribe_socket* unbuffered_socket = get_unbuffered_subscribe_socket(fid);
1071 1089 : clog_debug("(Re-)subscribing (unbuffered) for 0x%x and socket 0x%p", fid, unbuffered_socket);
1072 1089 : clog_debug("Subscribed to unbuffered subscribe socket. Netio_page_size: %d netio pages: %d", unbuffered_socket->recv_socket.attr.buffer_size, unbuffered_socket->recv_socket.attr.num_buffers);
1073 1089 : ret = netio_unbuffered_subscribe(unbuffered_socket, fid);
1074 : }
1075 2274 : if(ret == NETIO_STATUS_AGAIN){
1076 0 : contextHandler->updateFidsWhenUnsub(fid);
1077 0 : contextHandler->pushSubTask(SubscribeEvent{fid});
1078 0 : netio_signal_fire(&sub_signal);
1079 0 : return false;
1080 2274 : } else if(ret == NETIO_STATUS_ERROR){
1081 : return false;
1082 : }
1083 2274 : if(contextHandler->getSocketState(fid) != CONNECTED && contextHandler->getSocketState(fid) != CONNECTING){
1084 0 : contextHandler->setSocketState(fid, CONNECTING);
1085 : }
1086 2274 : if(contextHandler->getSocketState(fid) == CONNECTED){
1087 392 : contextHandler->setSubscriptionState(fid, SUB);
1088 392 : contextHandler->removeFromFidsToResubscribe(fid);
1089 392 : struct timespec t2;
1090 392 : clock_gettime(CLOCK_MONOTONIC_RAW, &t2);
1091 392 : std::unordered_map<std::string, timespec> subscriptionTimes = contextHandler->getSubscriptionTimes(fid);
1092 :
1093 784 : double subscribe_useconds = (t2.tv_sec - subscriptionTimes["subscribe"].tv_sec)*1e6
1094 784 : + (t2.tv_nsec - subscriptionTimes["subscribe"].tv_nsec)*1e-3;
1095 392 : clog_debug("Beginning of subscribe() until on_connect for fid 0x%x took %fus", fid, subscribe_useconds);
1096 :
1097 784 : double subscribe_evloop_useconds = (t2.tv_sec - subscriptionTimes["subscribe_evloop"].tv_sec)*1e6
1098 784 : + (t2.tv_nsec - subscriptionTimes["subscribe_evloop"].tv_nsec)*1e-3;
1099 392 : clog_debug("Beginning of subscribe_evloop() until on_connect for fid 0x%x took %fus", fid, subscribe_evloop_useconds);
1100 392 : if (cb_on_connect) {
1101 392 : (cb_on_connect)(fid);
1102 : }
1103 392 : }
1104 : return true;
1105 : }
1106 :
1107 : /**
1108 : * @brief Sends the actual unsubscribe message (executed by the eventloop thread)
1109 : *
1110 : * Analog to the other sending functions, sending an unsubscribe message has also to go through the eventloop.
1111 : *
1112 : * @param fid: FID
1113 : * @param prom_unsub_done: std::promise to notify user thread that the unsubscribe was successful
1114 : */
1115 2167 : void FelixClient::send_unsubscribe_msg(uint64_t fid, std::promise<uint8_t> prom_unsub_done){
1116 2167 : void* socket;
1117 2167 : int ret = 0;
1118 2167 : if (contextHandler->getType(fid) == BUFFERED) {
1119 1126 : struct netio_subscribe_socket* buffered_socket;
1120 1126 : buffered_socket = contextHandler->getSocket<netio_subscribe_socket*>(fid);
1121 1126 : socket = static_cast<void*>(buffered_socket);
1122 1126 : if(!contextHandler->exists(socket) || contextHandler->getSubscriptionState(fid) == UNSUB){prom_unsub_done.set_value(1);return;}
1123 1126 : clog_info("Unsubscribing (buffered) for 0x%x from %s and socket 0x%p", fid, get_address(buffered_socket).c_str(), buffered_socket);
1124 1126 : ret = netio_unsubscribe(buffered_socket, fid);
1125 : } else {
1126 1041 : struct netio_unbuffered_subscribe_socket* unbuffered_socket;
1127 1041 : unbuffered_socket = contextHandler->getSocket<netio_unbuffered_subscribe_socket*>(fid);
1128 1041 : socket = static_cast<void*>(unbuffered_socket);
1129 1041 : if(!contextHandler->exists(socket) || contextHandler->getSubscriptionState(fid) == UNSUB){prom_unsub_done.set_value(1);return;}
1130 1041 : clog_info("Unsubscribing (unbuffered) for 0x%x from %s", fid, get_address(unbuffered_socket).c_str());
1131 1041 : ret = netio_unbuffered_unsubscribe(unbuffered_socket, fid);
1132 : }
1133 2167 : if(ret == NETIO_STATUS_AGAIN){
1134 0 : contextHandler->pushSubTask(UnsubEvent{fid, std::move(prom_unsub_done)});
1135 0 : netio_signal_fire(&sub_signal);
1136 0 : return;
1137 2167 : } else if(ret == NETIO_STATUS_ERROR){
1138 0 : prom_unsub_done.set_value(2);
1139 : }
1140 2167 : contextHandler->updateFidsWhenUnsub(fid);
1141 2167 : contextHandler->setSubscriptionState(fid, UNSUB);
1142 2167 : if(contextHandler->areAllFidsUnsubscribed(socket)){
1143 1923 : contextHandler->setSocketToAllUnsubscribed(socket);
1144 : }
1145 2167 : prom_unsub_done.set_value(0);
1146 : }
1147 :
1148 : /**
1149 : * @brief Unsubscribe function that is exposed to the user via felix-client-thread
1150 : *
1151 : * If the function is called by the eventloop thread, it will send the unsubscribe message directly, otherwise it will trigger
1152 : * the subscribe signal for the eventloop to execute this task.
1153 : * In the latter case, the function will block for up to 5s for the unsubscribe to be performed.
1154 : *
1155 : * @param fid: FID
1156 : */
1157 2520 : int FelixClient::unsubscribe(netio_tag_t fid) {
1158 2520 : clog_info("Unsubscribing from 0x%x", fid);
1159 2520 : if (fid & 0x8000){
1160 : return FELIX_CLIENT_STATUS_OK;
1161 : }
1162 :
1163 2118 : if(evloop_thread_id == std::this_thread::get_id()){
1164 0 : send_unsubscribe_msg(fid);
1165 0 : return FELIX_CLIENT_STATUS_OK;
1166 : }
1167 2118 : if(!contextHandler->exists(fid)){
1168 : return FELIX_CLIENT_STATUS_OK;
1169 : }
1170 :
1171 2118 : std::promise<uint8_t> prom_unsub_done;
1172 2118 : std::future<uint8_t> future_unsub_done = prom_unsub_done.get_future();
1173 2118 : uint8_t status = FELIX_CLIENT_STATUS_OK;
1174 2118 : if (contextHandler->getSubscriptionState(fid) == SUB || contextHandler->getSubscriptionState(fid) == SUBING){
1175 2118 : contextHandler->setSubscriptionState(fid, UNSUBING);
1176 2118 : contextHandler->pushSubTask(UnsubEvent{fid, std::move(prom_unsub_done)});
1177 2118 : netio_signal_fire(&sub_signal);
1178 :
1179 2118 : try{
1180 2118 : std::chrono::milliseconds timeout (5000);
1181 2118 : if (future_unsub_done.wait_for(timeout)==std::future_status::timeout){
1182 0 : clog_error("Unsubscribe timeout expired, stop blocking to prevent deadlock.");
1183 0 : return FELIX_CLIENT_STATUS_TIMEOUT;
1184 : }
1185 2118 : status = future_unsub_done.get();
1186 0 : } catch(std::exception& e){
1187 0 : clog_info("Exception while unsubscribing: %s FID: 0x%x", e.what(), fid);
1188 0 : throw;
1189 0 : }
1190 : }
1191 2118 : return status; //0 = OK, 1 = Already done
1192 2118 : }
1193 :
1194 : /**
1195 : * @brief Forward a user function to be executed by the eventloop
1196 : *
1197 : * It uses a netio-signal to execute the user function within the eventloop. The signal is allocated, registered, and destroyed fo every function call.
1198 : * So using it extensively might impact the performance of felix-client.
1199 : *
1200 : * @param user_function: void user function that is run by the eventloop.
1201 : */
1202 313 : void FelixClient::exec(const UserFunction &user_function ) {
1203 313 : netio_signal* signal = (struct netio_signal*)malloc(sizeof(struct netio_signal)); // c style
1204 313 : SignalData* data = new SignalData();
1205 313 : data->signal = signal;
1206 313 : data->user_function = user_function;
1207 313 : data->evloop = &ctx.evloop;
1208 :
1209 313 : netio_signal_init(&ctx.evloop, signal);
1210 313 : signal->data = data;
1211 313 : signal->cb = on_signal;
1212 :
1213 313 : netio_signal_fire(signal);
1214 313 : }
1215 :
1216 :
1217 104 : netio_tag_t FelixClient::get_ctrl_fid(netio_tag_t fid) {
1218 104 : uint8_t sid = 0;
1219 104 : uint8_t to_flx = 1;
1220 104 : uint8_t virt = 1;
1221 312 : netio_tag_t ctrl_fid = get_fid_from_ids(get_did(fid),
1222 : get_cid(fid),
1223 : COMMAND_REPLY_LINK,
1224 : sid,
1225 104 : get_vid(fid),
1226 : to_flx,
1227 : virt);
1228 104 : return ctrl_fid;
1229 : }
1230 :
1231 66 : netio_tag_t FelixClient::get_subscribe_fid(netio_tag_t fid) {
1232 66 : uint8_t sid = 0;
1233 66 : uint8_t to_flx = 0;
1234 66 : uint8_t virt = 1;
1235 198 : netio_tag_t subscribe_fid = get_fid_from_ids(get_did(fid),
1236 : get_cid(fid),
1237 : COMMAND_REPLY_LINK,
1238 : sid,
1239 66 : get_vid(fid),
1240 : to_flx,
1241 : virt);
1242 66 : return subscribe_fid;
1243 : }
1244 :
1245 :
1246 40 : FelixClientThread::Status FelixClient::send_cmd(const std::vector<uint64_t>& fids,
1247 : FelixClientThread::Cmd cmd,
1248 : const std::vector<std::string>& cmd_args,
1249 : std::vector<FelixClientThread::Reply>& replies) {
1250 40 : clog_debug("send_cmd");
1251 40 : if(terminating_felix_client.load()){
1252 0 : clog_debug("Already terminating not sending command");
1253 0 : return FelixClientThread::Status::OK;
1254 : }
1255 :
1256 : // No commands to be send, all OK
1257 40 : if(fids.size() == 0) {
1258 : return FelixClientThread::Status::OK;
1259 : }
1260 :
1261 40 : std::string message;
1262 40 : FelixClientThread::Status status = FelixClientThread::Status::OK;
1263 :
1264 : // check the command and command arguments
1265 40 : switch(cmd) {
1266 0 : case FelixClientThread::NOOP:
1267 0 : if (cmd_args.size() != 0) {
1268 0 : message = std::string("Too many arguments for NOOP, needs 0 while %d given.", cmd_args.size());
1269 0 : status = FelixClientThread::Status::ERROR_INVALID_ARGS;
1270 0 : break;
1271 : }
1272 : break;
1273 28 : case FelixClientThread::GET:
1274 28 : if (cmd_args.size() != 1) {
1275 2 : message = std::string("Not enough arguments for GET, needs 1 while %d given.", cmd_args.size());
1276 2 : status = FelixClientThread::Status::ERROR_INVALID_ARGS;
1277 2 : break;
1278 : }
1279 : break;
1280 4 : case FelixClientThread::SET:
1281 4 : if (cmd_args.size() != 2) {
1282 0 : message = std::string("Not enough arguments for SET, needs 2 while %d given.", cmd_args.size());
1283 0 : status = FelixClientThread::Status::ERROR_INVALID_ARGS;
1284 0 : break;
1285 : }
1286 : break;
1287 6 : case FelixClientThread::ECR_RESET:
1288 6 : if (cmd_args.size() != 1) {
1289 0 : message = std::string("Not enough arguments for ECR_RESET, needs 1 while", cmd_args.size());
1290 0 : status = FelixClientThread::Status::ERROR_INVALID_ARGS;
1291 0 : break;
1292 : }
1293 : break;
1294 2 : default:
1295 4 : message = "Cmd not available: " + FelixClientThread::to_string(cmd);
1296 2 : status = FelixClientThread::Status::ERROR_INVALID_CMD;
1297 2 : break;
1298 : }
1299 :
1300 :
1301 : // deduce set of ctrl_fids from list of normal fids
1302 80 : std::unordered_map<netio_tag_t, netio_tag_t> ctrl_by_subscribe_fid;
1303 80 : std::set<uint64_t> sub_fids_set;
1304 40 : clog_info("Looking up %d subscribe FIDs", fids.size());
1305 106 : for(auto const& fid : fids) {
1306 66 : netio_tag_t ctrl_fid = get_ctrl_fid(fid);
1307 66 : netio_tag_t sub_fid = get_subscribe_fid(ctrl_fid);
1308 66 : sub_fids_set.insert(sub_fid);
1309 66 : std::error_code ec;
1310 66 : felixbus::FelixBusInfo bus_info = bus.get_info(sub_fid, ec);
1311 :
1312 66 : if( bus_info.ip != "" ){
1313 62 : ctrl_by_subscribe_fid[sub_fid] = ctrl_fid;
1314 : }
1315 : else{
1316 4 : clog_error("Discarding ctrl fid 0x%x because reply fid 0x%x does not appear in the bus", ctrl_fid, sub_fid);
1317 4 : FelixClientThread::Reply reply;
1318 4 : reply.ctrl_fid = ctrl_fid;
1319 4 : reply.status = FelixClientThread::Status::ERROR_NO_SUBSCRIPTION;
1320 4 : reply.message = "Invalid fid, not found in bus";
1321 4 : reply.value = 0;
1322 4 : replies.push_back(reply);
1323 4 : }
1324 66 : }
1325 :
1326 80 : std::vector<uint64_t> sub_fids;
1327 40 : sub_fids.assign(sub_fids_set.begin(), sub_fids_set.end());
1328 :
1329 40 : clog_info("Subscribing to %d FIDs", ctrl_by_subscribe_fid.size());
1330 40 : uint subscribe_timeout = 5000;
1331 40 : bool for_register = true;
1332 40 : int ret = subscribe(sub_fids, subscribe_timeout, for_register);
1333 40 : if(ret == FELIX_CLIENT_STATUS_TIMEOUT){
1334 4 : auto it = ctrl_by_subscribe_fid.begin();
1335 6 : while (it != ctrl_by_subscribe_fid.end()){
1336 2 : if(!contextHandler->isSubscribed(it->first)){
1337 0 : FelixClientThread::Reply reply;
1338 0 : reply.ctrl_fid = it->second;
1339 0 : reply.status = FelixClientThread::Status::ERROR_NO_SUBSCRIPTION;
1340 0 : reply.message = "Could not subscribe: timeout";
1341 0 : reply.value = 0;
1342 0 : replies.push_back(reply);
1343 0 : it = ctrl_by_subscribe_fid.erase(it);
1344 0 : } else {
1345 2 : it++;
1346 : }
1347 : }
1348 : }
1349 :
1350 40 : clog_info("Sending command");
1351 :
1352 : // continue with the connected ctrl_fids and send them the command
1353 80 : std::unordered_map<std::string, netio_tag_t> ctrl_fid_by_uuid;
1354 82 : for(auto const& sub_ctrl_fids : ctrl_by_subscribe_fid) {
1355 42 : uuid_t uuid_dec;
1356 42 : char uuid[36 + 1];
1357 42 : uuid_generate(uuid_dec);
1358 42 : uuid_unparse_upper(uuid_dec, uuid);
1359 :
1360 42 : clog_info("Preparing cmd...");
1361 42 : char json[256];
1362 42 : struct jWriteControl jwc;
1363 42 : jwOpen(&jwc, json, 256 - 1, JW_ARRAY, JW_COMPACT);
1364 42 : jwArr_object(&jwc);
1365 42 : jwObj_string(&jwc, FelixClientThread::to_string(FelixClientThread::UUID).c_str(), uuid);
1366 84 : jwObj_string(&jwc, FelixClientThread::to_string(FelixClientThread::CMD).c_str(), FelixClientThread::to_string(cmd).c_str());
1367 42 : jwObj_array(&jwc, FelixClientThread::to_string(FelixClientThread::CMD_ARGS).c_str());
1368 90 : for(auto const& cmd_arg : cmd_args) {
1369 48 : jwArr_string(&jwc, cmd_arg.c_str());
1370 : }
1371 42 : jwEnd(&jwc);
1372 42 : jwEnd(&jwc);
1373 42 : jwClose(&jwc);
1374 42 : int json_length = strlen(json);
1375 42 : clog_debug("len=%d; json=%d", json_length, json);
1376 :
1377 42 : try {
1378 42 : clog_info("Sending cmd to fid 0x%x", sub_ctrl_fids.second);
1379 42 : send_data(sub_ctrl_fids.second, (const uint8_t*)json, json_length, true);
1380 40 : uuids.emplace(uuid);
1381 42 : ctrl_fid_by_uuid.emplace(uuid, sub_ctrl_fids.second);
1382 2 : } catch (FelixClientException& error) {
1383 2 : clog_info("Could not send cmd for FID 0x%x because: %s", sub_ctrl_fids.second, error.what());
1384 : // sending timed out or other problem
1385 2 : FelixClientThread::Reply reply;
1386 2 : reply.ctrl_fid = sub_ctrl_fids.second;
1387 2 : reply.status = FelixClientThread::Status::ERROR_NO_CONNECTION;
1388 2 : reply.message = "Could not send cmd";
1389 2 : reply.value = 0;
1390 2 : replies.push_back(reply);
1391 2 : }
1392 :
1393 : }
1394 :
1395 : // wait for replies (or timeout)
1396 40 : clog_info("Waiting for %d replies...", ctrl_fid_by_uuid.size());
1397 80 : for(auto const &it : ctrl_fid_by_uuid) {
1398 40 : clog_debug("%s 0x%x", it.first.c_str(), it.second);
1399 : }
1400 :
1401 40 : uint timeoutms = 5000;
1402 40 : uint timeout = 0;
1403 40 : uint span = std::min(timeoutms, 250U);
1404 40 : uint recv_replies = 0;
1405 232 : while ((ctrl_fid_by_uuid.size() != recv_replies) && (timeout <= timeoutms)) {
1406 192 : recv_replies = 0;
1407 392 : for (auto& uuid_it : ctrl_fid_by_uuid){
1408 200 : if (reply_by_uuid.find(uuid_it.first) != reply_by_uuid.end()) {
1409 40 : recv_replies++;
1410 : }
1411 : }
1412 192 : std::this_thread::sleep_for(std::chrono::milliseconds(span));
1413 192 : timeout += span;
1414 : }
1415 40 : clog_info("Finished waiting");
1416 :
1417 80 : for (auto& ctrl_fid_uuid : ctrl_fid_by_uuid){
1418 : // take it out of the list of uuids
1419 40 : std::string uuid = ctrl_fid_uuid.first;
1420 40 : uuids.erase(uuid);
1421 :
1422 40 : auto reply_it = reply_by_uuid.find(uuid);
1423 40 : if(reply_it != reply_by_uuid.end()){
1424 36 : clog_info("uuid: %s", ctrl_fid_uuid.first.c_str());
1425 36 : FelixClientThread::Reply reply = (*reply_it).second;
1426 36 : replies.push_back(reply);
1427 :
1428 36 : clog_info("Found reply for %s", ctrl_fid_uuid.first.c_str());
1429 36 : clog_info(" %s", FelixClientThread::to_string(reply).c_str());
1430 :
1431 36 : reply_by_uuid.erase(reply_it);
1432 36 : } else {
1433 4 : netio_tag_t ctrl_fid = ctrl_fid_uuid.second;
1434 4 : clog_debug("UUID %s timed out for ctrl_fid 0x%x", uuid.c_str(), ctrl_fid);
1435 :
1436 4 : FelixClientThread::Reply reply;
1437 4 : reply.ctrl_fid = ctrl_fid;
1438 4 : reply.status = FelixClientThread::Status::ERROR_NO_REPLY;
1439 4 : reply.message = "Did not receive a reply";
1440 4 : reply.value = 0;
1441 4 : replies.push_back(reply);
1442 4 : } //TODO: What happens to replies that arrive aftert the timeout and staty in the map
1443 40 : clog_debug("send_cmd done");
1444 40 : }
1445 :
1446 : // calculate summary_status, no replies is an ERROR
1447 40 : FelixClientThread::Status summary_status = replies.size() > 0 ? FelixClientThread::Status::OK : FelixClientThread::Status::ERROR;
1448 40 : summary_status = std::max(summary_status, status);
1449 86 : for(auto const& reply : replies) {
1450 60 : summary_status = std::max(summary_status, reply.status);
1451 : }
1452 40 : clog_info("Calculated summary status %d:%s from %d replies", summary_status, FelixClientThread::to_string(summary_status).c_str(), replies.size());
1453 :
1454 40 : return summary_status;
1455 40 : }
1456 :
1457 673 : void FelixClient::callback_on_init( OnInitCallback on_init ) {
1458 673 : clog_info("Registering on_init");
1459 673 : cb_on_init = on_init;
1460 673 : }
1461 :
1462 236 : void FelixClient::callback_on_data( OnDataCallback on_data ) {
1463 236 : clog_info("Registering on_data");
1464 236 : cb_on_data = on_data;
1465 236 : }
1466 :
1467 2 : void FelixClient::callback_on_buffer( OnBufferCallback on_buffer ) {
1468 2 : clog_info("Registering on_buffer");
1469 2 : cb_on_buffer = on_buffer;
1470 2 : }
1471 :
1472 665 : void FelixClient::callback_on_connect( OnConnectCallback on_connect ) {
1473 665 : clog_info("Registering on_connect");
1474 665 : cb_on_connect = on_connect;
1475 665 : }
1476 :
1477 657 : void FelixClient::callback_on_disconnect( OnDisconnectCallback on_disconnect ) {
1478 657 : clog_info("Registering on_disconnect");
1479 657 : cb_on_disconnect = on_disconnect;
1480 657 : }
1481 :
1482 36 : void FelixClient::callback_on_user_timer( OnUserTimerCallback on_user_timer_cb ) {
1483 36 : clog_info("Registering on_user_timer");
1484 36 : cb_on_user_timer = on_user_timer_cb;
1485 36 : }
1486 :
1487 673 : void FelixClient::on_init_eventloop(void* ptr) {
1488 673 : clog_info("On init");
1489 673 : FelixClient* client = static_cast<FelixClient*>(ptr);
1490 673 : client->ready = true;
1491 673 : std::function<void()> callback = client->cb_on_init;
1492 673 : if (callback) {
1493 673 : (callback)();
1494 : }
1495 673 : }
1496 :
1497 : /**
1498 : * @brief Callback when a connection to a remote endpoint was successfully established
1499 : *
1500 : * When a socket is in TIMEOUT state (which means that the subscription timed out), an unsubscribe is triggered because subscribe() already returned
1501 : * an error to the user.
1502 : * Otherwise, the callback sets the correct state for the socket and will trigger the user on_connection_established callback for every FID that is associated with this connection.
1503 : *
1504 : * @param socket: pointer to socket that established the connection. This could be a (un-)buffered send or subscribe socket.
1505 : */
1506 761 : void FelixClient::on_connection_established(void* socket) {
1507 761 : clog_debug("on_connection_established");
1508 761 : if (contextHandler->exists(socket)){
1509 761 : contextHandler->setSocketState(socket, CONNECTED);
1510 3069 : for(auto fid : contextHandler->getFidsBySocket(socket)){
1511 2308 : if (contextHandler->exists(fid)){
1512 2308 : if(contextHandler->getConnectionType(fid) == PUBSUB){
1513 1785 : if(contextHandler->getSubscriptionState(fid) == TIMEOUT){ //FID has timed out before and we do not want to notify the user nor need this subscription
1514 0 : contextHandler->setSubscriptionState(fid, UNSUBING);
1515 0 : send_unsubscribe_msg(fid);
1516 : //contextHandler->removeFromFidsToResubscribe(fid);
1517 0 : return;
1518 : }
1519 1785 : struct timespec t2;
1520 1785 : clock_gettime(CLOCK_MONOTONIC_RAW, &t2);
1521 1785 : std::unordered_map<std::string, timespec> subscriptionTimes = contextHandler->getSubscriptionTimes(fid);
1522 3570 : double subscribe_useconds = (t2.tv_sec - subscriptionTimes["subscribe"].tv_sec)*1e6
1523 3570 : + (t2.tv_nsec - subscriptionTimes["subscribe"].tv_nsec)*1e-3;
1524 1785 : clog_debug("Beginning of subscribe() until on_connect for fid 0x%x took %fus", fid, subscribe_useconds);
1525 3570 : double subscribe_evloop_useconds = (t2.tv_sec - subscriptionTimes["subscribe_evloop"].tv_sec)*1e6
1526 3570 : + (t2.tv_nsec - subscriptionTimes["subscribe_evloop"].tv_nsec)*1e-3;
1527 1785 : clog_debug("Beginning of subscribe_evloop() until on_connect for fid 0x%x took %fus", fid, subscribe_evloop_useconds);
1528 1785 : }
1529 2308 : contextHandler->removeFromFidsToResubscribe(fid);
1530 2308 : contextHandler->setSubscriptionState(fid, SUB);
1531 2308 : if (cb_on_connect) {
1532 2300 : (cb_on_connect)(fid);
1533 : }
1534 2308 : clog_debug("Tag: 0x%x out of %d FIDs has status: CONNECTED", fid, contextHandler->getFidsBySocket(socket).size());
1535 : }
1536 761 : }
1537 : }
1538 : }
1539 :
1540 236 : void FelixClient::on_connection_established(struct netio_send_socket* socket) {
1541 236 : clog_info("Unbuffered send connection established");
1542 236 : static_cast<FelixClient*>(socket->usr)->on_connection_established((void*)socket);
1543 236 : }
1544 :
1545 271 : void FelixClient::on_connection_established(struct netio_buffered_send_socket* socket) {
1546 271 : clog_info("Buffered send connection established");
1547 271 : static_cast<FelixClient*>(socket->usr)->on_connection_established((void*)socket);
1548 271 : }
1549 :
1550 165 : void FelixClient::on_connection_established(struct netio_subscribe_socket* socket) {
1551 165 : clog_info("Buffered connection established to %s", get_address(socket).c_str());
1552 165 : static_cast<FelixClient*>(socket->usr)->on_connection_established((void*)socket);
1553 165 : }
1554 :
1555 89 : void FelixClient::on_connection_established(struct netio_unbuffered_subscribe_socket* socket) {
1556 89 : clog_info("Unbuffered connection established to %s", get_address(socket).c_str());
1557 89 : static_cast<FelixClient*>(socket->usr)->on_connection_established((void*)socket);
1558 89 : }
1559 :
1560 : /**
1561 : * @brief Callback when a connection to a remote endpoint was closed
1562 : *
1563 : * Since send_socket objects are not deleted right away when the connection is closed, a counter is decremented to keep track if all send connections where closed in the stop() function.
1564 : *
1565 : * @param socket: pointer to socket that is associated with this endpoint. This could be a (un-)buffered send or subscribe socket.
1566 : */
1567 725 : void FelixClient::on_connection_closed(void* socket) {
1568 725 : clog_debug("FelixClient::on_connection_closed");
1569 725 : contextHandler->setSocketState(socket, DISCONNECTED);
1570 725 : auto fids = contextHandler->getFidsBySocket(socket);
1571 725 : if(contextHandler->getConnectionType(socket) == SENDRECV){
1572 507 : decrement_num_send_sockets();
1573 : }
1574 725 : if(!terminating_felix_client){
1575 100 : contextHandler->removeSocket(socket);
1576 : }
1577 :
1578 1266 : for(auto fid : fids ){
1579 541 : if (cb_on_disconnect) {
1580 529 : (cb_on_disconnect)(fid);
1581 : }
1582 : }
1583 :
1584 725 : }
1585 :
1586 236 : void FelixClient::on_connection_closed(struct netio_send_socket* socket) {
1587 236 : clog_info("Unbuffered send connection closed");
1588 236 : static_cast<FelixClient*>(socket->usr)->on_connection_closed((void*)socket);
1589 : //static_cast<FelixClient*>(socket->usr)->decrement_num_send_sockets();
1590 236 : }
1591 :
1592 271 : void FelixClient::on_connection_closed(struct netio_buffered_send_socket* socket) {
1593 271 : clog_info("Buffered send connection closed");
1594 271 : static_cast<FelixClient*>(socket->usr)->on_connection_closed((void*)socket);
1595 : //static_cast<FelixClient*>(socket->usr)->decrement_num_send_sockets();
1596 271 : }
1597 :
1598 136 : void FelixClient::on_connection_closed(struct netio_subscribe_socket* socket) {
1599 136 : clog_info("Buffered connection closed to %s", get_address(socket).c_str());
1600 136 : netio_close_socket(&static_cast<FelixClient*>(socket->usr)->ctx.evloop, socket, socket_type::BSUB);
1601 136 : static_cast<FelixClient*>(socket->usr)->on_connection_closed((void*)socket);
1602 136 : }
1603 :
1604 82 : void FelixClient::on_connection_closed(struct netio_unbuffered_subscribe_socket* socket) {
1605 82 : clog_info("Unbuffered connection closed to %s", get_address(socket).c_str());
1606 82 : netio_close_socket(&static_cast<FelixClient*>(socket->usr)->ctx.evloop, socket, socket_type::USUB);
1607 82 : static_cast<FelixClient*>(socket->usr)->on_connection_closed((void*)socket);
1608 82 : }
1609 :
1610 : /**
1611 : * @brief Callback when a connection to a remote endpoint was refused
1612 : *
1613 : * The socket is then removed via exec() since an immediate removal can trigger a crash the the socket is still accessed after the removal by netio-next.
1614 : *
1615 : * @param socket: pointer to socket that is associated with this endpoint. This could be a (un-)buffered send or subscribe socket.
1616 : */
1617 207 : void FelixClient::on_error_connection_refused(void* socket) {
1618 207 : clog_debug("on_error_connection_refused");
1619 207 : contextHandler->setSocketState(socket, DISCONNECTED);
1620 414 : exec([this, socket]{this->contextHandler->removeSocket(socket, true);});
1621 207 : clog_debug("Socket deleted from socket map.");
1622 :
1623 207 : }
1624 :
1625 46 : void FelixClient::on_error_connection_refused(struct netio_send_socket* socket) {
1626 46 : clog_info("Unbuffered send connection refused");
1627 46 : static_cast<FelixClient*>(socket->usr)->on_error_connection_refused((void*)socket);
1628 46 : }
1629 :
1630 64 : void FelixClient::on_error_connection_refused(struct netio_buffered_send_socket* socket) {
1631 64 : clog_info("Buffered send connection refused");
1632 64 : static_cast<FelixClient*>(socket->usr)->on_error_connection_refused((void*)socket);
1633 64 : }
1634 :
1635 53 : void FelixClient::on_error_connection_refused(struct netio_subscribe_socket* socket) {
1636 53 : clog_info("Buffered connection refused to %s", get_address(socket).c_str());
1637 53 : static_cast<FelixClient*>(socket->usr)->on_error_connection_refused((void*)socket);
1638 :
1639 53 : }
1640 :
1641 44 : void FelixClient::on_error_connection_refused(struct netio_unbuffered_subscribe_socket* socket) {
1642 44 : clog_info("Unbuffered connection refused to %s", get_address(socket).c_str());
1643 44 : static_cast<FelixClient*>(socket->usr)->on_error_connection_refused((void*)socket);
1644 :
1645 44 : }
1646 :
1647 443 : void FelixClient::on_send_completed(struct netio_send_socket* socket, uint64_t key) {
1648 443 : clog_debug("Unbuffered send completed for 0x%x", key);
1649 443 : }
1650 :
1651 38 : void FelixClient::on_register_msg_received(void* socket, netio_tag_t tag, void* data_ptr, size_t size) {
1652 38 : if (size > 0) {
1653 38 : uint8_t* data = (uint8_t*)(data_ptr);
1654 38 : data++;
1655 38 : size--;
1656 :
1657 : // handle subscriptions for felix-register
1658 38 : clog_info("Answer on register command");
1659 :
1660 38 : netio_tag_t ctrl_fid = get_ctrl_fid(tag);
1661 38 : clog_info(" for ctrl_fid 0x%x", ctrl_fid);
1662 :
1663 38 : simdjson::dom::element ops = parser.parse(simdjson::padded_string((const char*)data, size));
1664 : // FIXME check that this works for more than one reply
1665 152 : for (simdjson::dom::object op : ops) {
1666 38 : FelixClientThread::Reply reply;
1667 76 : std::string uuid = std::string(op[FelixClientThread::to_string(FelixClientThread::UUID)]);
1668 38 : clog_info(" uuid %s", uuid.c_str());
1669 :
1670 : // only add if we are still looking for the uuid (timeout...)
1671 38 : if (uuids.find(uuid) != uuids.end()) {
1672 36 : reply.ctrl_fid = ctrl_fid;
1673 72 : int64_t s = op[FelixClientThread::to_string(FelixClientThread::STATUS)];
1674 36 : reply.status = static_cast<FelixClientThread::Status>(s);
1675 36 : reply.message = std::string(op[FelixClientThread::to_string(FelixClientThread::MESSAGE)]);
1676 72 : reply.value = uint64_t(op[FelixClientThread::to_string(FelixClientThread::VALUE)]);
1677 :
1678 74 : reply_by_uuid[uuid] = reply;
1679 : } else {
1680 2 : clog_info(" uuid not found, answer ignored", uuid.c_str());
1681 : }
1682 38 : }
1683 : }
1684 38 : }
1685 :
1686 38 : void FelixClient::on_register_msg_received(struct netio_subscribe_socket* socket, netio_tag_t tag, void* data, size_t size) {
1687 38 : static_cast<FelixClient*>(socket->usr)->on_register_msg_received((void*)socket, tag, data, size);
1688 38 : }
1689 :
1690 0 : void FelixClient::on_register_msg_received(struct netio_unbuffered_subscribe_socket* socket, netio_tag_t tag, void* data, size_t size) {
1691 0 : static_cast<FelixClient*>(socket->usr)->on_register_msg_received((void*)socket, tag, data, size);
1692 0 : }
1693 :
1694 :
1695 : /**
1696 : * @brief Callback that is triggered by the subscription timer
1697 : *
1698 : * The timer is hard-coded to 1s in the constructor. The timer is started when there are subscriptions that are not up and is stopped again when all FIDs are subscribed.
1699 : * It is particularly used when the server side (felix-tohost/felix-toflx) is restarted. This way felix-client attempts to resubscribe all FIDs periodically and the user does not have to take care of it.
1700 : * It does not only handle subscriptions to felix-tohost but will also re-establish send connections to felix-toflx
1701 : *
1702 : * @param ptr: pointer to felix-client object itself
1703 : */
1704 3762 : void FelixClient::on_timer(void* ptr) {
1705 :
1706 3762 : clog_debug("On timer called. Attempting to resubscribe");
1707 3762 : uint count = 0;
1708 3762 : FelixClient* client = static_cast<FelixClient*>(ptr);
1709 3762 : if(client->terminating_felix_client.load()){
1710 0 : clog_debug("Already terminating no reason to resubscribe");
1711 0 : return;
1712 : }
1713 3762 : std::vector<uint64_t> fids_to_resubscribe = client->contextHandler->getFidsToResubscribe();
1714 3762 : clog_debug("Resubscribing for %d", fids_to_resubscribe.size());
1715 3945 : for(auto& fid : fids_to_resubscribe) {
1716 183 : if(client->contextHandler->getSocketState(fid) == DISCONNECTED){
1717 175 : if(fid & 0x8000){
1718 66 : try{
1719 66 : std::promise<uint8_t> prom_sub;
1720 66 : std::future<uint8_t> future_sub_done = prom_sub.get_future();
1721 66 : client->establish_send_connection(fid, std::move(prom_sub));
1722 66 : if(!future_sub_done.get()) {
1723 66 : count++;
1724 : }
1725 66 : } catch(...){
1726 0 : continue;
1727 0 : }
1728 : } else {
1729 109 : if (!client->contextHandler->canSubscribe(fid)) {
1730 0 : clog_debug("Already attempting to subscribe to 0x%x", fid);
1731 0 : continue;
1732 : }
1733 109 : if (client->subscribe_evloop(fid)) {
1734 105 : count++;
1735 : }
1736 : }
1737 : }
1738 183 : clog_trace("Socket has state: %d", client->contextHandler->getSocketState(fid));
1739 183 : clog_trace("FID 0x%x is in state: %d", fid, client->contextHandler->getSubscriptionState(fid));
1740 : }
1741 :
1742 3762 : if (count > 0) {
1743 114 : clog_debug("Subscription timer: resubscribing to %d tags", count);
1744 : }
1745 3762 : }
1746 :
1747 :
1748 313 : void FelixClient::on_signal(void* ptr) {
1749 313 : clog_debug("On signal");
1750 313 : struct SignalData* data = (struct SignalData*)ptr;
1751 313 : UserFunction user_function = data->user_function;
1752 313 : (user_function)();
1753 :
1754 313 : netio_signal_close(data->evloop, data->signal);
1755 313 : free(data->signal);
1756 626 : delete(data);
1757 313 : }
1758 :
1759 1698 : std::string FelixClient::get_address(netio_subscribe_socket *socket) {
1760 3396 : return std::string(socket->remote_hostname) + std::string(":") + std::to_string(socket->remote_port);
1761 : }
1762 :
1763 1389 : std::string FelixClient::get_address(netio_unbuffered_subscribe_socket *socket) {
1764 2778 : return std::string(socket->remote_hostname) + std::string(":") + std::to_string(socket->remote_port);
1765 : }
1766 :
1767 :
1768 1185 : struct netio_subscribe_socket* FelixClient::get_buffered_subscribe_socket(uint64_t fid) {
1769 1185 : bool needInititialization = contextHandler->addOrCreateSocket(fid);
1770 1185 : struct netio_subscribe_socket* socket = contextHandler->getSocket<netio_subscribe_socket*>(fid);
1771 1185 : clog_debug("get_buffered_subscribe_socket for fid 0x%x and socket 0x%p", fid, socket);
1772 1185 : if(needInititialization){
1773 218 : initialize_buffered_subscribe_socket(socket, fid);
1774 : }
1775 1185 : return socket;
1776 : }
1777 :
1778 :
1779 : /**
1780 : * @brief initializes a buffered subscribe socket
1781 : *
1782 : * Right now we allocate double the number of pages that are published in the bus to better overcome some fluctuations in the network load.
1783 : *
1784 : * @param socket: pointer to netio_subscribe_socket
1785 : * @param fid: FID associated with this subscribe socket
1786 : */
1787 218 : void FelixClient::initialize_buffered_subscribe_socket(struct netio_subscribe_socket* socket, uint64_t fid){
1788 218 : clog_debug("Setup socket using address %s:%d", contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid));
1789 218 : struct netio_buffered_socket_attr attr;
1790 218 : attr.num_pages = 2 * contextHandler->getPages(fid); //Hard-coded double the number of pages for FLX-2041, todo: make a parameter later
1791 218 : attr.pagesize = contextHandler->getPagesize(fid);
1792 218 : attr.watermark = contextHandler->getWatermark(fid);
1793 436 : netio_subscribe_socket_init(socket, &ctx, &attr,
1794 218 : local_ip_address.c_str(), contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid));
1795 :
1796 218 : socket->cb_connection_closed = on_connection_closed;
1797 218 : socket->cb_connection_established = on_connection_established;
1798 218 : socket->cb_error_connection_refused = on_error_connection_refused;
1799 218 : if (contextHandler->isForRegister(fid)) {
1800 34 : socket->cb_msg_received = on_register_msg_received;
1801 : } else {
1802 184 : socket->cb_msg_received = on_msg_received;
1803 : }
1804 :
1805 218 : if(buffer_callback){
1806 2 : socket->cb_buf_received = on_buf_received;
1807 : }
1808 :
1809 218 : clog_debug("Setup socket for %s", get_address(socket).c_str());
1810 : // store object pointer (this) in socket, for use in c-style callbacks
1811 218 : socket->usr = this;
1812 218 : }
1813 :
1814 1089 : struct netio_unbuffered_subscribe_socket* FelixClient::get_unbuffered_subscribe_socket(uint64_t fid) {
1815 : // Create socket if it does not exist, otherwise lookup
1816 1089 : bool needInititialization = contextHandler->addOrCreateSocket(fid);
1817 :
1818 1089 : struct netio_unbuffered_subscribe_socket* socket = contextHandler->getSocket<netio_unbuffered_subscribe_socket*>(fid);
1819 1089 : clog_debug("get_unbuffered_subscribe_socket");
1820 1089 : if(needInititialization){
1821 133 : initialize_unbuffered_subscribe_socket(socket, fid);
1822 : }
1823 :
1824 :
1825 1089 : return socket;
1826 : }
1827 :
1828 : /**
1829 : * @brief initializes an unbuffered subscribe socket
1830 : *
1831 : * If the 'unbuffered_netio_pages' parameter is specified by the user, the respective number and page_size will be allocated for the buffers.
1832 : * If they are not specified or 0, the default number ans size are taken from the bus.
1833 : * Hence, the user can manually overwrite the the default settings in thus bus. However, this is not recommended if there are no good reason for this.
1834 : * Especially the page size needs to be at least as big as the one on the felix-tohost side.
1835 : *
1836 : * @param socket: pointer to netio_unbuffered_subscribe_socket
1837 : * @param fid: FID associated with this subscribe socket
1838 : */
1839 133 : void FelixClient::initialize_unbuffered_subscribe_socket(netio_unbuffered_subscribe_socket* socket, uint64_t fid){
1840 133 : clog_debug("Setup socket using address %s:%d", contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid));
1841 :
1842 : // FLX-1222 use params
1843 : // FLX-1222 allocate somewhere globally, per socket
1844 : //buffer size not specified, taken by the bus
1845 133 : if (unbuffered_netio_pages == 0){
1846 11 : clog_trace("Creating unbuffered subscribe socket. Buffer not specified, using bus parameters. Pages: %d pagesize: %d", contextHandler->getPages(fid), contextHandler->getPagesize(fid));
1847 22 : netio_unbuffered_subscribe_socket_init(socket, &ctx,
1848 22 : local_ip_address.c_str(), contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid),
1849 11 : contextHandler->getPagesize(fid), contextHandler->getPages(fid));
1850 : } else {
1851 : //buffer size passed as parameter
1852 122 : clog_trace("Creating unbuffered subscribe socket. Buffer parameter specified. Pages: %d pagesize: %d", unbuffered_netio_pages, unbuffered_netio_pagesize);
1853 122 : netio_unbuffered_subscribe_socket_init(socket, &ctx,
1854 244 : local_ip_address.c_str(), contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid),
1855 122 : unbuffered_netio_pagesize, unbuffered_netio_pages); //correct even if unbuffered_netio_pagesize/-pages are needed?
1856 : }
1857 :
1858 133 : socket->cb_connection_closed = on_connection_closed;
1859 133 : socket->cb_connection_established = on_connection_established;
1860 133 : socket->cb_error_connection_refused = on_error_connection_refused;
1861 133 : if (contextHandler->isForRegister(fid)) {
1862 0 : socket->cb_msg_received = on_register_msg_received;
1863 : } else {
1864 133 : socket->cb_msg_received = on_msg_received;
1865 : }
1866 :
1867 133 : clog_debug("Setup socket for %s", get_address(socket).c_str());
1868 :
1869 133 : socket->usr = this;
1870 133 : }
1871 :
1872 :
1873 797 : struct netio_buffered_send_socket* FelixClient::get_buffered_send_socket(uint64_t fid) {
1874 : // Create socket if it does not exist, otherwise lookup
1875 797 : bool needInititialization = false;
1876 797 : if (contextHandler->getSocket<struct netio_buffered_send_socket*>(fid) == NULL){
1877 347 : needInititialization = contextHandler->addOrCreateSocket(fid);
1878 : }
1879 :
1880 797 : struct netio_buffered_send_socket* socket = contextHandler->getSocket<struct netio_buffered_send_socket*>(fid);
1881 :
1882 797 : if (needInititialization)
1883 : {
1884 335 : struct netio_buffered_socket_attr attr;
1885 335 : attr.num_pages = contextHandler->getPages(fid);
1886 335 : attr.pagesize = contextHandler->getPagesize(fid);
1887 335 : attr.watermark = contextHandler->getWatermark(fid);
1888 335 : attr.timeout_ms = 5;
1889 335 : netio_buffered_send_socket_init(socket, &ctx, &attr);
1890 335 : socket->cb_connection_closed = on_connection_closed;
1891 335 : socket->cb_connection_established = on_connection_established;
1892 335 : socket->cb_error_connection_refused = on_error_connection_refused;
1893 :
1894 : // store object pointer (this) in socket, for use in c-style callbacks
1895 335 : socket->usr = this;
1896 670 : netio_buffered_connect(socket, contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid));
1897 : }
1898 797 : return socket;
1899 : }
1900 :
1901 :
1902 740 : struct netio_send_socket* FelixClient::get_unbuffered_send_socket(uint64_t fid) {
1903 : // Create socket if it does not exist, otherwise lookup
1904 740 : bool needInititialization = false;
1905 740 : if (contextHandler->getSocket<struct netio_send_socket*>(fid) == NULL){
1906 294 : needInititialization = contextHandler->addOrCreateSocket(fid);
1907 : }
1908 :
1909 740 : struct netio_send_socket* socket = contextHandler->getSocket<struct netio_send_socket*>(fid);
1910 :
1911 740 : if (needInititialization)
1912 : {
1913 282 : netio_unbuffered_send_socket_init(socket, &ctx);
1914 :
1915 282 : struct netio_buffer buf;
1916 282 : buf.size = UNBUFFERED_MR_SIZE;
1917 282 : buf.data = malloc(UNBUFFERED_MR_SIZE);
1918 282 : memset(buf.data, 0, UNBUFFERED_MR_SIZE);
1919 282 : buf.mr = nullptr;
1920 564 : unbuf_mem_regions[contextHandler->getAddress(fid)] = buf;
1921 :
1922 282 : socket->cb_connection_closed = on_connection_closed;
1923 282 : socket->cb_connection_established = on_connection_established;
1924 282 : socket->cb_error_connection_refused = on_error_connection_refused;
1925 282 : socket->cb_send_completed = on_send_completed;
1926 : // store object pointer (this) in socket, for use in c-style callbacks
1927 282 : socket->usr = this;
1928 282 : netio_connect(socket, contextHandler->getIp(fid).c_str(), contextHandler->getPort(fid));
1929 564 : clog_debug("Connected. Now registering memory region: 0x%p", &unbuf_mem_regions[contextHandler->getAddress(fid)]);
1930 846 : netio_register_send_buffer(socket, &unbuf_mem_regions[contextHandler->getAddress(fid)], 0);
1931 : }
1932 740 : return socket;
1933 : }
1934 :
1935 :
1936 :
1937 709 : void FelixClient::user_timer_init(){
1938 709 : netio_timer_init(&ctx.evloop, &user_timer);
1939 709 : user_timer.cb = on_user_timer;
1940 709 : user_timer.data = this;
1941 709 : }
1942 :
1943 37 : void FelixClient::user_timer_start(unsigned long interval){
1944 37 : netio_timer_start_ms(&user_timer, interval);
1945 37 : }
1946 :
1947 0 : void FelixClient::user_timer_stop(){
1948 0 : netio_timer_stop(&user_timer);
1949 0 : }
1950 :
1951 382 : void FelixClient::on_user_timer(void* ptr) {
1952 382 : FelixClient* client = static_cast<FelixClient*>(ptr);
1953 382 : client->cb_on_user_timer();
1954 382 : }
1955 :
1956 :
1957 507 : void FelixClient::decrement_num_send_sockets(){
1958 507 : if(total_number_send_sockets.load() > 0){
1959 501 : total_number_send_sockets--;
1960 : }
1961 507 : }
|