Line data Source code
1 : #include "felix/felix_client_context_handler.hpp"
2 : #include "felix/felix_client_exception.hpp"
3 :
4 : #include "clog.h"
5 :
6 : /**
7 : * @brief Get netio_socket_type by FID
8 : *
9 : * The information is stored in the info object or this FID and is read from the bus when the info object is created.
10 : * @param fid: FID
11 : */
12 8252 : netio_socket_type FelixClientContextHandler::getType(uint64_t fid){
13 8252 : return infoByFid[fid]->getType();
14 : }
15 :
16 : /**
17 : * @brief Get netio_socket_type from socket
18 : *
19 : * This function returns the netio_socket_type of a void pointer to a socket.
20 : * If the socket does not exists, it returns BUFFERED since it has to have a valid return value.
21 : * @param socket: void pointer to a netio_socket
22 : */
23 0 : netio_socket_type FelixClientContextHandler::getType(void* socket){
24 0 : return std::visit(
25 0 : overloaded{[](netio_subscribe_socket& s) {
26 : return BUFFERED;
27 : },
28 : [](netio_unbuffered_subscribe_socket& s) {
29 : return UNBUFFERED;
30 : },
31 : [](netio_send_socket& s) {
32 : return UNBUFFERED;
33 : },
34 : [](netio_buffered_send_socket& s) {
35 : return BUFFERED;
36 : },
37 : [](std::monostate& s) {
38 : return BUFFERED;
39 : },
40 0 : }, *socketContextMap[socket]->socket);
41 : }
42 :
43 : /**
44 : * @brief Get netio_socket_connection by FID
45 : *
46 : * The information is stored in the info object or this FID and is read from the bus when the info object is created.
47 : * A connection type can either be of type PUBSUB or SENDRECV.
48 : * @param fid: FID
49 : */
50 2308 : netio_socket_connection FelixClientContextHandler::getConnectionType(uint64_t fid){
51 2308 : return infoByFid[fid]->getConnectionType();
52 : }
53 :
54 : /**
55 : * @brief Get netio_socket_connection from socket
56 : *
57 : * This function returns the netio_socket_connection of a void pointer to a socket.
58 : * A connection type can either be of type PUBSUB or SENDRECV.
59 : * If the socket does not exists, it returns SENDRECV since it has to have a valid return value.
60 : * @param socket: void pointer to a netio_socket
61 : */
62 3756 : netio_socket_connection FelixClientContextHandler::getConnectionType(void* socket){
63 3756 : return std::visit(
64 3756 : overloaded{[](netio_subscribe_socket& s) {
65 : return PUBSUB;
66 : },
67 : [](netio_unbuffered_subscribe_socket& s) {
68 : return PUBSUB;
69 : },
70 : [](netio_send_socket& s) {
71 : return SENDRECV;
72 : },
73 : [](netio_buffered_send_socket& s) {
74 : return SENDRECV;
75 : },
76 : [](std::monostate& s) {
77 : return SENDRECV;
78 : },
79 3756 : }, *socketContextMap[socket]->socket);
80 : }
81 :
82 : /**
83 : * @brief Get netio_socket_state by FID
84 : *
85 : * Returns the state of a socket that is associated to this FID. If the FID has no associated socket the function returns DISCONNECTED.
86 : * We must portect this call with a mutex since send_sockets are removed from the socketContextMap when a connection is closed.
87 : * @param fid: FID
88 : */
89 11472 : netio_socket_state FelixClientContextHandler::getSocketState(uint64_t fid){
90 11472 : std::unique_lock<std::mutex> lck(send_mtx);
91 11472 : SocketWrapper* sw = infoByFid[fid]->getSocket();
92 11472 : return std::visit(
93 11472 : overloaded{[](std::monostate& s) {
94 : return DISCONNECTED;
95 10668 : },[this](auto& s){
96 10668 : return socketContextMap[static_cast<void*>(&s)]->state.load();
97 : }
98 11472 : }, *sw);
99 11472 : }
100 :
101 : /**
102 : * @brief Get subscribe_state of a FID
103 : *
104 : * Returns the ssubscribe_state of a FID.
105 : * @param fid: FID
106 : */
107 6624 : subscribe_state FelixClientContextHandler::getSubscriptionState(uint64_t fid){
108 6624 : return infoByFid[fid]->getSubscriptionState();
109 : }
110 :
111 : /**
112 : * @brief Returns a map with timestamps to measure subscription times
113 : *
114 : * The first timestamp is taken in the subscribe function, called by the user, the second one when the subscribe is processed by the eventloop.
115 : * @param fid: FID
116 : */
117 2177 : std::unordered_map<std::string, timespec> FelixClientContextHandler::getSubscriptionTimes(uint64_t fid){
118 2177 : return infoByFid[fid]->getSubscriptionTimes();
119 : }
120 :
121 553 : uint FelixClientContextHandler::getWatermark(uint64_t fid){
122 553 : return infoByFid[fid]->getWatermark();
123 : }
124 :
125 1025 : uint FelixClientContextHandler::getPagesize(uint64_t fid){
126 1025 : return infoByFid[fid]->getPagesize();
127 : }
128 :
129 575 : uint FelixClientContextHandler::getPages(uint64_t fid){
130 575 : return infoByFid[fid]->getPages();
131 : }
132 :
133 1968 : uint FelixClientContextHandler::getPort(uint64_t fid){
134 1968 : return infoByFid[fid]->getPort();
135 : }
136 :
137 1968 : std::string FelixClientContextHandler::getIp(uint64_t fid){
138 1968 : return infoByFid[fid]->getIp();
139 : }
140 :
141 5472 : std::string FelixClientContextHandler::getAddress(uint64_t fid){
142 16416 : return infoByFid[fid]->getIp() + ":" + std::to_string(infoByFid[fid]->getPort());
143 : }
144 :
145 3762 : std::vector<uint64_t> FelixClientContextHandler::getFidsToResubscribe(){
146 3762 : return mResubFids;
147 : }
148 :
149 6268 : std::vector<uint64_t> FelixClientContextHandler::getFidsBySocket(void* socket){
150 6268 : return socketContextMap[socket]->subFids;
151 : }
152 :
153 :
154 : /**
155 : * @brief Get vector of all subscribed FIDs
156 : *
157 : * This function returns a vector of all FIDs that are subscribed via a subscribe socket right now.
158 : * This list is used in the destructor to unsubscribe and close a connection properly before we delete the associated subscribe socket.
159 : */
160 643 : std::vector<uint64_t> FelixClientContextHandler::getFidsToUnsubscribe(){
161 643 : std::vector<uint64_t> allFids;
162 1352 : std::for_each(socketContextMap.begin(), socketContextMap.end(), [&allFids](auto& ctx_it){std::copy(ctx_it.second->subFids.begin(), ctx_it.second->subFids.end(), std::back_inserter(allFids));});
163 1286 : std::vector<uint64_t> fidsToUnsub(allFids.size());
164 3201 : auto end_it = std::copy_if(allFids.begin(), allFids.end(), fidsToUnsub.begin(), [this](auto& fid){return (this->infoByFid[fid]->getSubscriptionState() == SUB || this->infoByFid[fid]->getSubscriptionState() == SUBING)&&( this->infoByFid[fid]->getConnectionType() == PUBSUB);});
165 643 : fidsToUnsub.resize(std::distance(fidsToUnsub.begin(), end_it));
166 643 : return fidsToUnsub;
167 643 : }
168 :
169 :
170 : /**
171 : * @brief Get vector of all send sockets
172 : *
173 : * This function returns a vector of pointers to all send sockets that are connected right now.
174 : * The function iterates over all sockets in the socketContextMap and adds the pointer of netio_send_socket (also in buffered case) to the vector.
175 : * This list is used in the destructor to properly shutdown the connection to the server side via netio_disconnect() before deleting the socket object.
176 : */
177 595 : std::vector<netio_send_socket*> FelixClientContextHandler::getSendSockets(){
178 595 : std::vector<netio_send_socket*> sendSockets;
179 1256 : for (auto& ctx_it : socketContextMap){
180 661 : netio_send_socket* socket = std::visit(
181 661 : overloaded{[](netio_subscribe_socket& s) {
182 : return static_cast<netio_send_socket*>(NULL);
183 : },
184 : [](netio_unbuffered_subscribe_socket& s) {
185 : return static_cast<netio_send_socket*>(NULL);
186 : },
187 : [](netio_send_socket& s) {
188 : return &s;
189 : },
190 267 : [](netio_buffered_send_socket& s) {
191 267 : return &(s.send_socket);
192 : },
193 : [](std::monostate& s) {
194 : return static_cast<netio_send_socket*>(NULL);
195 : },
196 661 : }, *ctx_it.second->socket);
197 661 : if(socket){
198 501 : sendSockets.push_back(socket);
199 : }
200 : }
201 595 : return sendSockets;
202 0 : }
203 :
204 : /**
205 : * @brief Returns a map of void pointers to subscribe socket and the associated netio_socket_type
206 : *
207 : * Since we need to distinguish between buffered and unbuffered subscribe sockets when we want to clean them up, this function returns void pointer to
208 : * all subscribe sockets and their netio_socket_type stored in map.
209 : * This map/list is used in the destructor to properly shutdown the subscribe socket after all FIDs are unsubscribed.
210 : */
211 0 : std::unordered_map<void*, netio_socket_type> FelixClientContextHandler::getSubSocketsToDelete(){
212 0 : std::unordered_map<void*, netio_socket_type> voidSubSocketsToDelete;
213 0 : for (auto& it :mSubSocketsToDelete){
214 0 : netio_socket_type type = std::visit(
215 0 : overloaded{[](netio_subscribe_socket& s) {
216 : return BUFFERED;
217 : },
218 : [](netio_unbuffered_subscribe_socket& s) {
219 : return UNBUFFERED;
220 : },
221 : [](netio_send_socket& s) {
222 : return UNBUFFERED;
223 : },
224 : [](netio_buffered_send_socket& s) {
225 : return BUFFERED;
226 : },
227 : [](std::monostate& s) {
228 : return BUFFERED;
229 : },
230 0 : }, *it);
231 0 : voidSubSocketsToDelete[mGetVoidSocketPointer(*it)] = type;
232 : }
233 0 : return voidSubSocketsToDelete;
234 0 : }
235 :
236 :
237 351 : bool FelixClientContextHandler::isForRegister(uint64_t fid){
238 351 : return infoByFid[fid]->getForRegister();
239 : }
240 :
241 : /**
242 : * @brief Returns true is FID is subscribed
243 : *
244 : * @param fid: FID
245 : */
246 1150 : bool FelixClientContextHandler::isSubscribed(uint64_t fid){
247 1150 : if (infoByFid.count(fid) > 0){
248 1150 : return (infoByFid[fid]->getSubscriptionState() == SUB);
249 : } else {
250 0 : return false;
251 : }
252 : }
253 :
254 : /**
255 : * @brief Function blocks until all FIDs are subscribed or the timeout expired.
256 : *
257 : * The function can be used to wait for the subscription of one or more FIDs was successful or the timeot expired.
258 : * To achieve this, the function adds a multiWaitCtx object to all FID info objects, which has in internal counter that is increased for every
259 : * subscription that was successful. The conditional variable "multiWaitCtx->allSubscribed" uses the std function wait_for to determine if the timeout expires
260 : * or the condition is met.
261 : * Note: The wait_for() function requires a unique_lock on a mutex. Since we do not need to synchronize something, it is sufficient to use a local mutex/lock
262 : * @param fids: vector fo FIDs tha are checked for their subscription state
263 : * @param timeoutMs:timeout in ms after which the function returns even when not all FIDs are subscribed.
264 : */
265 436 : bool FelixClientContextHandler::isSubscribed(std::vector<uint64_t> fids, uint64_t timeoutMs){
266 436 : auto multiWaitCtx = std::make_unique<MultiFidsWaitContext>();
267 436 : multiWaitCtx->expectedSize = fids.size();
268 2018 : for(auto fid : fids){
269 1582 : if (infoByFid.count(fid) > 0){
270 1582 : if(infoByFid[fid]->getSubscriptionState() != SUB){
271 1582 : infoByFid[fid]->setWaitContext(multiWaitCtx.get());
272 : }
273 0 : else{multiWaitCtx->count++;}
274 : }
275 : }
276 436 : clog_debug("Waiting for %d fids to subscribe. Count: %d", multiWaitCtx->expectedSize, multiWaitCtx->count);
277 436 : bool status = true;
278 436 : if(multiWaitCtx->count < multiWaitCtx->expectedSize ){
279 432 : auto timeExpiredMs = std::chrono::milliseconds(0);
280 432 : std::mutex m;
281 432 : std::unique_lock<std::mutex> lck(m);
282 432 : auto t0 = std::chrono::steady_clock::now();
283 1308 : while (multiWaitCtx->count != multiWaitCtx->expectedSize && timeExpiredMs < std::chrono::milliseconds(timeoutMs)){
284 444 : status = (multiWaitCtx->allSubscribed.wait_for(lck, std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::milliseconds(timeoutMs) - timeExpiredMs)) == std::cv_status::no_timeout);
285 444 : timeExpiredMs = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - t0);
286 : }
287 432 : }
288 2018 : for (auto fid : fids){
289 1582 : infoByFid[fid]->clearWaitContext();
290 : }
291 436 : return status;
292 892 : }
293 :
294 : /**
295 : * @brief Checks if all FIDs are unsubscribed
296 : *
297 : * Since this function iterates over all sockets, we need to make sure that no socket is removed from the socketContextMap while we are are iterating.
298 : * Because the socketContextMap contains sockets of all types, a scoped_lock is used to lock the send_mtx and sub_mtx.
299 : */
300 2704 : bool FelixClientContextHandler::areAllFidsUnsubscribed(){
301 2704 : std::scoped_lock lock(send_mtx, sub_mtx);
302 3383 : for (auto& ctx_it : socketContextMap){
303 2836 : clog_debug("socket 0x%p has state 0x%p", ctx_it.first, netio_socket_state_name(ctx_it.second->state.load()));
304 2836 : if(ctx_it.second->state != DISCONNECTED && getConnectionType(ctx_it.first) == PUBSUB){
305 2704 : return false;
306 : }
307 : }
308 : return true;
309 2704 : }
310 :
311 :
312 : /**
313 : * @brief Checks if all FIDs associated with a particular socket are unsubscribed
314 : *
315 : * This function checks if there are FIDs associated to the socket, that are not ualready nscubscribed or currently unsubscribing.
316 : * @param socket: void pointer to a netio_socket. (should be a subscribe_socket but will also work with send_socket)
317 : */
318 2167 : bool FelixClientContextHandler::areAllFidsUnsubscribed(void* socket){
319 2167 : auto fids = getFidsBySocket(socket);
320 93042 : int i = std::count_if(fids.begin(), fids.end(), [this](auto& fid){return (this->infoByFid[fid]->getSubscriptionState() != UNSUB && this->infoByFid[fid]->getSubscriptionState() != UNSUBING);});
321 2167 : clog_debug("Remaining subscritions: %d", i);
322 2167 : return i;
323 2167 : }
324 :
325 :
326 : /**
327 : * @brief Check if felix-client can subscribe to this FID
328 : *
329 : * FIDs are not eligible to subscribe if we don't have a felix_client_info object for this FID or if the FID was unsubscribed before.
330 : *
331 : * TODO:Maybe we also need to check what we want to do here if the FID is in the process of unsubscribing but not done yet
332 : *
333 : * @param fid: FID
334 : */
335 2316 : bool FelixClientContextHandler::canSubscribe(uint64_t fid){
336 2316 : if (infoByFid.count(fid) == 0 || (infoByFid[fid]->getSubscriptionState() == UNSUB)) {
337 2288 : return true;
338 : } else {
339 : return false;
340 : }
341 : }
342 :
343 :
344 : /**
345 : * @brief Checks if all felix_client_info object for this FID already exists.
346 : *
347 : * @param fid: FID
348 : */
349 5410 : bool FelixClientContextHandler::exists(uint64_t fid){
350 5410 : return infoByFid.count(fid);
351 : }
352 :
353 : /**
354 : * @brief checks if all socket for this void pointer still exists.
355 : *
356 : * @param socket: void pointer to socket of any type.
357 : */
358 3824 : bool FelixClientContextHandler::exists(void* socket){
359 3824 : return socketContextMap.count(socket);
360 : }
361 :
362 :
363 : /**
364 : * @brief Function that waits for a socket associated to this FID to be connected or timeout
365 : *
366 : * This function retrives the socket associated to this FID and checks its connection state.
367 : * If it is currently connecting, it will wait on a condition variable that is notified in the on_connection_established() callback.
368 : * If hte connection is not established within the specified timeout, the function returns fails for not connected.
369 : *
370 : * @param fid: FID
371 : * @param timeoutMs: timeout [ms]
372 : *
373 : */
374 529 : bool FelixClientContextHandler::waitConnected(uint64_t fid, uint64_t timeoutMs){
375 529 : if (infoByFid.count(fid) > 0){
376 529 : SocketWrapper* sw = infoByFid[fid]->getSocket();
377 529 : return std::visit(
378 529 : overloaded{[](std::monostate& s) {
379 : return false;
380 529 : },[this, timeoutMs, fid](auto& s){
381 529 : if(socketContextMap[static_cast<void*>(&s)].get()->state.load()==CONNECTED){
382 : return true;
383 529 : } else if(socketContextMap[static_cast<void*>(&s)].get()->state.load()==DISCONNECTED){
384 : return false;
385 : }else{
386 529 : auto timeExpiredMs = std::chrono::milliseconds(0);
387 529 : std::unique_lock<std::mutex> lck(socketContextMap[static_cast<void*>(&s)].get()->mMux);
388 529 : clog_debug("Start waiting, fid: 0x%x with socket 0x%p", fid, socketContextMap[static_cast<void*>(&s)].get());
389 529 : auto t0 = std::chrono::steady_clock::now();
390 1058 : while(socketContextMap[static_cast<void*>(&s)].get()->state.load()==CONNECTING && timeExpiredMs < std::chrono::milliseconds(timeoutMs)){
391 529 : socketContextMap[static_cast<void*>(&s)].get()->mConnectionWait.wait_for(lck, std::chrono::milliseconds(timeoutMs));
392 529 : timeExpiredMs = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - t0);
393 : }
394 529 : return (socketContextMap[static_cast<void*>(&s)].get()->state.load() == CONNECTED);
395 529 : }
396 : }
397 : }, *sw);
398 : } else {
399 0 : return false;
400 : }
401 : }
402 :
403 4443 : void FelixClientContextHandler::setSubscriptionTimes(uint64_t fid, std::string name, timespec ts){
404 8886 : return infoByFid[fid]->setSubscriptionTimes(name, ts);
405 : }
406 :
407 : /**
408 : * @brief Setting socket state
409 : *
410 : * If the new state is CONNECTED or DISCONNECTED, the a condition variable is notified to inform the subscribing thread if the operation was successful or not
411 : *
412 : * @param fid: FID
413 : * @param s: New socket state
414 : *
415 : */
416 0 : void FelixClientContextHandler::setSocketState(uint64_t fid, netio_socket_state s){
417 0 : if (infoByFid.count(fid) > 0){
418 0 : SocketWrapper* sw = infoByFid[fid]->getSocket();
419 0 : clog_debug("For socket: 0x%p new state %d", mGetVoidSocketPointer(*sw), s);
420 0 : socketContextMap[mGetVoidSocketPointer(*sw)]->state = s;
421 0 : if (s == CONNECTED || s == DISCONNECTED){socketContextMap[mGetVoidSocketPointer(*sw)]->mConnectionWait.notify_all();}
422 : }
423 0 : }
424 :
425 : /**
426 : * @brief Setting socket state
427 : *
428 : * If the new state is CONNECTED or DISCONNECTED, the a condition variable is notified to inform the subscribing thread if the operation was successful or not
429 : *
430 : * @param socket: void pointer to a netio_socket
431 : * @param s: New socket state
432 : *
433 : */
434 1693 : void FelixClientContextHandler::setSocketState(void* socket, netio_socket_state s){
435 1693 : socketContextMap[socket]->state = s;
436 1693 : if (s == CONNECTED || s == DISCONNECTED){socketContextMap[socket]->mConnectionWait.notify_all();}
437 1693 : }
438 :
439 9154 : void FelixClientContextHandler::setSubscriptionState(uint64_t fid, subscribe_state s){
440 9154 : infoByFid[fid]->setSubscriptionState(s);
441 9154 : }
442 :
443 :
444 : /**
445 : * @brief Adds FID to a set of FIDs that need to subscribe
446 : *
447 : * FIDs get either added to the list during the first subscription, if they are not subscribed yet or if the server side shut down the connection.
448 : * Felix-client will try to subscribe all FIDs in this list periodically until the subscription is successful.
449 : *
450 : * @param fid: FID
451 : */
452 2169 : void FelixClientContextHandler::addToFidsToResubscribe(uint64_t fid){
453 2169 : auto it = std::find(mResubFids.begin(), mResubFids.end(), fid);
454 2169 : if (it == mResubFids.end()){
455 2169 : mResubFids.push_back(fid);
456 : }
457 2169 : }
458 :
459 2700 : void FelixClientContextHandler::removeFromFidsToResubscribe(uint64_t fid){
460 2700 : auto it = std::find(mResubFids.begin(), mResubFids.end(), fid);
461 2700 : if (it != mResubFids.end()){
462 2181 : mResubFids.erase(it);
463 : }else{
464 519 : clog_debug("Fid not foud in mReSubFids");
465 : }
466 2700 : }
467 :
468 : /**
469 : * @brief Remove socket object
470 : *
471 : * This function removes sockets form the socketContextMaps and thereby destroying it.
472 : * Different cases need to be considered to take care of all references to this socket and to update the corresponding information.
473 : *
474 : * SubSocket objects must not be destroyed here, since they contain a send and listen socket that are not cleaned up yet.
475 : * Instead all references to the socket are removed and they object is put into a different list to destroy it later.
476 : *
477 : * @param socket: void pointer to a netio_socket
478 : * @param connection_refused: Status variable if the socket is removed because the connection was refused
479 : */
480 307 : void FelixClientContextHandler::removeSocket(void* socket, bool connection_refused){
481 307 : auto socketFids = getFidsBySocket(socket);
482 307 : if (getConnectionType(socket) == PUBSUB ){
483 298 : for (auto& fid : socketFids)
484 : {
485 107 : if(!terminatingFelixClient.load()){ //necessary?
486 107 : auto it = std::find(mResubFids.begin(), mResubFids.end(), fid);
487 107 : if (it == mResubFids.end()){
488 10 : mResubFids.push_back(fid);
489 : }
490 : }else{
491 0 : clog_debug("Already terminating felix-client not adding fids to mResubFids");
492 : }
493 107 : infoByFid.at(fid)->removeSocket();
494 107 : infoByFid.at(fid)->setSubscriptionState(UNSUB);
495 : }
496 :
497 191 : if(!connection_refused){
498 94 : mSubSocketsToDelete.push_back(std::move(socketContextMap[socket]->socket));
499 : }
500 191 : std::unique_lock<std::mutex> lock(sub_mtx);
501 191 : socketContextMap.erase(socket);
502 191 : } else {
503 116 : {
504 116 : std::unique_lock<std::mutex> lck(socketContextMap[socket].get()->mMux);
505 232 : for (auto& fid : socketFids)
506 : {
507 116 : if(!terminatingFelixClient.load()){
508 116 : auto it = std::find(mResubFids.begin(), mResubFids.end(), fid);
509 116 : if (it == mResubFids.end()){
510 6 : mResubFids.push_back(fid);
511 : }
512 : }else{
513 0 : clog_debug("Already terminating felix-client not adding fids to mResubFids");
514 : }
515 116 : infoByFid.at(fid)->removeSocket();
516 : }
517 0 : }
518 116 : std::unique_lock<std::mutex> lock(send_mtx);
519 116 : socketContextMap.erase(socket);
520 116 : }
521 307 : }
522 :
523 :
524 : /**
525 : * @brief Update all information related to a subscription when unsubscribing
526 : *
527 : * This function erases the FID from the corresponding socket and the socket is removed from the fid_info.
528 : *
529 : * @param fid: FID
530 : */
531 2167 : void FelixClientContextHandler::updateFidsWhenUnsub(uint64_t fid){
532 2167 : auto fids = &socketContextMap[mGetVoidSocketPointer(*infoByFid[fid]->getSocket())].get()->subFids;
533 2167 : auto it = std::find(fids->begin(), fids->end(), fid);
534 2167 : if (it != fids->end()){
535 2167 : fids->erase(it);
536 2167 : infoByFid[fid]->removeSocket();
537 : }else{
538 0 : clog_warn("Fid not foud in SubFids");
539 : }
540 2167 : }
541 :
542 :
543 : /**
544 : * @brief Updates the info from bus (usually before subscribing) or creates new felix_client_info object if it does not exist yet
545 : *
546 : * @param fid: FID
547 : * @param bus: pointer to FelixBus object
548 : * @param forRegister: determines if FID is used to get info from register (which assigns a different callback)
549 : */
550 5405 : void FelixClientContextHandler::createOrUpdateInfo(uint64_t fid, felixbus::FelixBus* bus, bool forRegister){
551 5405 : if (infoByFid.count(fid)) {
552 2701 : if((fid & 0x8000) && getSocketState(fid) != DISCONNECTED){
553 : return;
554 : }
555 2332 : try {
556 2332 : mUpdateInfo(fid, bus);
557 4 : } catch(std::exception& e){
558 4 : clog_info("Error while updating info from bus: %s. FID: 0x%x", e.what(), fid);
559 4 : throw;
560 4 : }
561 :
562 : } else {
563 2704 : try {
564 2704 : mCreateInfo(fid, bus);
565 12 : } catch(std::exception& e){
566 12 : clog_info("Error while creating info from bus: %s. FID: 0x%x", e.what(), fid);
567 12 : throw FelixClientResourceNotAvailableException();
568 12 : }
569 : }
570 5020 : if(forRegister){
571 34 : infoByFid[fid]->setForRegister(forRegister);
572 : }
573 : }
574 :
575 : /**
576 : * @brief Creates new felix_client_info object
577 : *
578 : * The information is retrieved from the bus file. A mutex prevents concurrent access to the bus object.
579 : * Additionally time for the bus read is measured and a warning is printed is the access took more than 1s.
580 : *
581 : * @param fid: FID
582 : * @param bus: pointer to FelixBus object
583 : */
584 2704 : void FelixClientContextHandler::mCreateInfo(uint64_t fid, felixbus::FelixBus* bus){
585 2704 : std::error_code ec;
586 2704 : felixbus::FelixBusInfo bus_info;
587 2704 : {
588 2704 : struct timespec t0, t1;
589 2704 : clock_gettime(CLOCK_MONOTONIC_RAW, &t0);
590 2704 : std::lock_guard<std::mutex> lock(info_mtx);
591 2704 : bus_info = bus->get_info(fid, ec);
592 2704 : clock_gettime(CLOCK_MONOTONIC_RAW, &t1);
593 2704 : double bus_useconds = (t1.tv_sec - t0.tv_sec)*1e6 + (t1.tv_nsec - t0.tv_nsec)*1e-3;
594 2704 : if(bus_useconds > 1000000){
595 0 : clog_warn("Bus access took more than one second: %dus.", bus_useconds);
596 : }
597 0 : }
598 2704 : if (!ec) {
599 2692 : infoByFid[fid] = std::make_unique<FelixClientContext>();
600 2692 : if(bus_info.ip == ""){throw std::invalid_argument("Elink not in bus");}
601 2692 : infoByFid[fid]->setInfos(bus_info.ip, bus_info.port, bus_info.netio_pages, bus_info.netio_pagesize, 0.9 * bus_info.netio_pagesize, bus_info.unbuffered, bus_info.pubsub);
602 :
603 : } else {
604 12 : throw FelixClientResourceNotAvailableException();//d::exception(ec.message());
605 : }
606 2692 : clog_debug("Connection: %d type: %d for FID: 0x%x", bus_info.pubsub, bus_info.unbuffered, fid);
607 2692 : clog_debug("IP: %s Port: %d", bus_info.ip.c_str(), bus_info.port);
608 2704 : }
609 :
610 : /**
611 : * @brief Updates felix_client_info object
612 : *
613 : * The information is retrieved from the bus file. A mutex prevents concurrent access to the bus object.
614 : * Additionally time for the bus read is measured and a warning is printed is the access took more than 1s.
615 : *
616 : * @param fid: FID
617 : * @param bus: pointer to FelixBus object
618 : */
619 2332 : void FelixClientContextHandler::mUpdateInfo(uint64_t fid, felixbus::FelixBus* bus){
620 2332 : std::error_code ec;
621 2332 : felixbus::FelixBusInfo bus_info;
622 2332 : {
623 2332 : struct timespec t0, t1;
624 2332 : clock_gettime(CLOCK_MONOTONIC_RAW, &t0);
625 2332 : std::lock_guard<std::mutex> lock(info_mtx);
626 2332 : bus_info = bus->get_info(fid, ec);
627 2332 : clock_gettime(CLOCK_MONOTONIC_RAW, &t1);
628 2332 : double bus_useconds = (t1.tv_sec - t0.tv_sec)*1e6 + (t1.tv_nsec - t0.tv_nsec)*1e-3;
629 2332 : if(bus_useconds > 1000000){
630 0 : clog_warn("Bus access took more than one second: %dus.", bus_useconds);
631 : }
632 0 : }
633 2332 : if (!ec) {
634 2328 : auto info = std::move(infoByFid[fid]);
635 2328 : if (info.get() == NULL){
636 0 : mCreateInfo(fid, bus);
637 0 : return;
638 : }
639 2328 : if(bus_info.ip == ""){throw std::invalid_argument("Elink not in bus");}
640 2328 : info->setInfos(bus_info.ip, bus_info.port, bus_info.netio_pages, bus_info.netio_pagesize, 0.9 * bus_info.netio_pagesize, bus_info.unbuffered, bus_info.pubsub);
641 2328 : infoByFid[fid] = std::move(info);
642 2328 : } else {
643 4 : clog_error("Error while reading bus: %s", ec.message().c_str());
644 4 : throw FelixClientResourceNotAvailableException();
645 : }
646 2332 : }
647 :
648 1923 : void FelixClientContextHandler::setSocketToAllUnsubscribed(void* socket){
649 1923 : socketContextMap[socket]->closingConnection = true;
650 1923 : }
651 :
652 2915 : bool FelixClientContextHandler::addOrCreateSocket(uint64_t fid){ //returns true if a new socket was created and needs to be initialized
653 2915 : if(mAddSocket(fid)){return false;}
654 968 : else {mCreateSocket(fid);}
655 968 : return true;
656 : }
657 :
658 : /**
659 : * @brief Adds socket to felix_client_info
660 : *
661 : * Sockets are identified by the endpoint they connect to. If a socket to the requested endpoint already exists, this socket is
662 : * added to the felix_client_info object of the particular FID. The stats is set to SUB automatically, since the subscription should
663 : * be fast and there is no exact feedback from the derver (felix-tohost) side when the subscription is actually processed.
664 : *
665 : * @param fid: FID
666 : */
667 2915 : bool FelixClientContextHandler::mAddSocket(uint64_t fid){
668 2915 : std::string fid_address = getAddress(fid);
669 : //Socket already exists
670 3073 : for(auto&& ctx : socketContextMap){
671 2105 : if(ctx.second->address == fid_address && !ctx.second->closingConnection){
672 1947 : infoByFid[fid]->setSocket(ctx.second->socket.get());
673 1947 : infoByFid[fid]->setSubscriptionState(SUB);
674 1947 : ctx.second->subFids.push_back(fid);
675 2915 : return true;
676 : }
677 : }
678 : return false;
679 2915 : }
680 :
681 : /**
682 : * @brief Create new socket
683 : *
684 : * The type of socket is determined by the felix-bus and the information provided for a particular FID.
685 : * Afterwars the socket is added to the socketContextMap that holds all socket objects.
686 : *
687 : * @param fid: FID
688 : */
689 968 : void FelixClientContextHandler::mCreateSocket(uint64_t fid){
690 968 : std::unique_ptr<SocketWrapper> sw_ptr;
691 968 : if(infoByFid[fid]->getType() == BUFFERED && infoByFid[fid]->getConnectionType() == PUBSUB){
692 218 : sw_ptr = std::make_unique<SocketWrapper>(SocketWrapper{netio_subscribe_socket{}});
693 750 : } else if (infoByFid[fid]->getType() == UNBUFFERED && infoByFid[fid]->getConnectionType() == PUBSUB){
694 133 : sw_ptr = std::make_unique<SocketWrapper>(SocketWrapper{netio_unbuffered_subscribe_socket{}});
695 617 : }else if(infoByFid[fid]->getType() == BUFFERED && infoByFid[fid]->getConnectionType() == SENDRECV){
696 335 : sw_ptr = std::make_unique<SocketWrapper>(SocketWrapper{netio_buffered_send_socket{}});
697 282 : } else if(infoByFid[fid]->getType() == UNBUFFERED && infoByFid[fid]->getConnectionType() == SENDRECV){
698 282 : sw_ptr = std::make_unique<SocketWrapper>(SocketWrapper{netio_send_socket{}});
699 : }else{
700 0 : return; //should not enter here
701 : }
702 968 : socketContextMap[static_cast<void*>(sw_ptr.get())] = std::make_unique<SocketContext>();
703 968 : auto* sctx = socketContextMap[static_cast<void*>(sw_ptr.get())].get();
704 968 : sctx->state = CONNECTING;
705 968 : sctx->closingConnection = false;
706 968 : sctx->subFids = std::vector<uint64_t>{fid};
707 968 : sctx->socket = std::move(sw_ptr);
708 968 : sctx->address = getAddress(fid);
709 968 : infoByFid[fid]->setSocket(sctx->socket.get());
710 968 : clog_debug("Create Socket. FID: 0x%x. Socket 0x%p should be 0x%p", fid, sctx->socket.get(), infoByFid[fid]->getSocket());
711 : return;
712 968 : }
713 :
714 : /**
715 : * @brief Returns a void pointer to all kinds of sockets
716 : *
717 : * @param wrapper: Pointer to SocketWrapper objects that contains requested socket
718 : */
719 9041 : void* FelixClientContextHandler::mGetVoidSocketPointer(SocketWrapper& wrapper) {
720 9041 : return std::visit([](auto& containedValueRef) {
721 : return static_cast<void*>(&containedValueRef);
722 9041 : }, wrapper);
723 : }
724 :
725 : /**
726 : * @brief deletes subscribe sockets
727 : *
728 : * Subsocket objects must not be destroyed right away, when they are erased.
729 : * This function takes care about destroying the objects later, when it is save to do so.
730 : *
731 : * @param socket: Pointer to subscribe socket
732 : */
733 0 : void FelixClientContextHandler::deleteSubSocket(void* socket){
734 0 : for( std::vector<std::unique_ptr<SocketWrapper>> ::iterator iter = mSubSocketsToDelete.begin(); iter != mSubSocketsToDelete.end(); ++iter )
735 : {
736 0 : if( mGetVoidSocketPointer(*iter->get()) == socket )
737 : {
738 0 : mSubSocketsToDelete.erase(iter);
739 0 : break;
740 : }
741 : }
742 0 : }
743 :
744 1471 : void FelixClientContextHandler::pullSendTask(send_var_t& task){
745 1471 : mSendQueue->popTask(task);
746 1471 : }
747 :
748 4287 : void FelixClientContextHandler::pullSubTask(sub_var_t& task){
749 4287 : mSubQueue->popTask(task);
750 4287 : }
|