Line data Source code
1 : #include "felix/felix_client_context_handler.hpp"
2 : #include "felix/felix_client_exception.hpp"
3 :
4 : #include "clog.h"
5 :
6 : /**
7 : * @brief Get netio_socket_type by FID
8 : *
9 : * The information is stored in the info object or this FID and is read from the bus when the info object is created.
10 : * @param fid: FID
11 : */
12 20868 : netio_socket_type FelixClientContextHandler::getType(uint64_t fid){
13 20868 : return infoByFid[fid]->getType();
14 : }
15 :
16 : /**
17 : * @brief Get netio_socket_type from socket
18 : *
19 : * This function returns the netio_socket_type of a void pointer to a socket.
20 : * If the socket does not exists, it returns BUFFERED since it has to have a valid return value.
21 : * @param socket: void pointer to a netio_socket
22 : */
23 0 : netio_socket_type FelixClientContextHandler::getType(void* socket){
24 0 : return std::visit(
25 0 : overloaded{[](netio_subscribe_socket& s) {
26 : return BUFFERED;
27 : },
28 : [](netio_unbuffered_subscribe_socket& s) {
29 : return UNBUFFERED;
30 : },
31 : [](netio_send_socket& s) {
32 : return UNBUFFERED;
33 : },
34 : [](netio_buffered_send_socket& s) {
35 : return BUFFERED;
36 : },
37 : [](std::monostate& s) {
38 : return BUFFERED;
39 : },
40 0 : }, *socketContextMap[socket]->socket);
41 : }
42 :
43 : /**
44 : * @brief Get netio_socket_connection by FID
45 : *
46 : * The information is stored in the info object or this FID and is read from the bus when the info object is created.
47 : * A connection type can either be of type PUBSUB or SENDRECV.
48 : * @param fid: FID
49 : */
50 1280 : netio_socket_connection FelixClientContextHandler::getConnectionType(uint64_t fid){
51 1280 : return infoByFid[fid]->getConnectionType();
52 : }
53 :
54 : /**
55 : * @brief Get netio_socket_connection from socket
56 : *
57 : * This function returns the netio_socket_connection of a void pointer to a socket.
58 : * A connection type can either be of type PUBSUB or SENDRECV.
59 : * If the socket does not exists, it returns SENDRECV since it has to have a valid return value.
60 : * @param socket: void pointer to a netio_socket
61 : */
62 6891 : netio_socket_connection FelixClientContextHandler::getConnectionType(void* socket){
63 6891 : return std::visit(
64 6891 : overloaded{[](netio_subscribe_socket& s) {
65 : return PUBSUB;
66 : },
67 : [](netio_unbuffered_subscribe_socket& s) {
68 : return PUBSUB;
69 : },
70 : [](netio_send_socket& s) {
71 : return SENDRECV;
72 : },
73 : [](netio_buffered_send_socket& s) {
74 : return SENDRECV;
75 : },
76 : [](std::monostate& s) {
77 : return SENDRECV;
78 : },
79 6891 : }, *socketContextMap[socket]->socket);
80 : }
81 :
82 : /**
83 : * @brief Get netio_socket_state by FID
84 : *
85 : * Returns the state of a socket that is associated to this FID. If the FID has no associated socket the function returns DISCONNECTED.
86 : * We must portect this call with a mutex since send_sockets are removed fromt he socketContextMap when a connection is closed.
87 : * @param fid: FID
88 : */
89 62876 : netio_socket_state FelixClientContextHandler::getSocketState(uint64_t fid){
90 62876 : std::unique_lock<std::mutex> lck(send_mtx);
91 62876 : SocketWrapper* sw = infoByFid[fid]->getSocket();
92 62876 : return std::visit(
93 62876 : overloaded{[](std::monostate& s) {
94 : return DISCONNECTED;
95 62473 : },[this](auto& s){
96 62473 : return socketContextMap[static_cast<void*>(&s)]->state.load();
97 : }
98 62876 : }, *sw);
99 62876 : }
100 :
101 : /**
102 : * @brief Get subscribe_state of a FID
103 : *
104 : * Returns the ssubscribe_state of a FID.
105 : * @param fid: FID
106 : */
107 19016 : subscribe_state FelixClientContextHandler::getSubscriptionState(uint64_t fid){
108 19016 : return infoByFid[fid]->getSubscriptionState();
109 : }
110 :
111 : /**
112 : * @brief Returns a map with timestamps to measure subscription times
113 : *
114 : * The first timestamp is taken in the scubribe function, called by the user, the second one when the subscribe is processed by the eventloop.
115 : * @param fid: FID
116 : */
117 2387 : std::unordered_map<std::string, timespec> FelixClientContextHandler::getSubscriptionTimes(uint64_t fid){
118 2387 : return infoByFid[fid]->getSubscriptionTimes();
119 : }
120 :
121 295 : uint FelixClientContextHandler::getWatermark(uint64_t fid){
122 295 : return infoByFid[fid]->getWatermark();
123 : }
124 :
125 321 : uint FelixClientContextHandler::getPagesize(uint64_t fid){
126 321 : return infoByFid[fid]->getPagesize();
127 : }
128 :
129 321 : uint FelixClientContextHandler::getPages(uint64_t fid){
130 321 : return infoByFid[fid]->getPages();
131 : }
132 :
133 15023 : uint FelixClientContextHandler::getPort(uint64_t fid){
134 15023 : return infoByFid[fid]->getPort();
135 : }
136 :
137 15023 : std::string FelixClientContextHandler::getIp(uint64_t fid){
138 15023 : return infoByFid[fid]->getIp();
139 : }
140 :
141 3631 : std::string FelixClientContextHandler::getAddress(uint64_t fid){
142 10893 : return infoByFid[fid]->getIp() + ":" + std::to_string(infoByFid[fid]->getPort());
143 : }
144 :
145 442 : bool FelixClientContextHandler::isTcp(uint64_t fid) {
146 442 : return infoByFid[fid]->isTcp();
147 : }
148 :
149 3197 : std::vector<uint64_t> FelixClientContextHandler::getFidsToResubscribe(){
150 3197 : return mResubFids;
151 : }
152 :
153 4259 : std::vector<uint64_t> FelixClientContextHandler::getFidsBySocket(void* socket){
154 4259 : return socketContextMap[socket]->subFids;
155 : }
156 :
157 :
158 : /**
159 : * @brief Get vector of all subscribed FIDs
160 : *
161 : * This function returns a vector of all FIDs that are subscribed via a subscribe socket right now.
162 : * This list is used in the destructor to unsubscribe and close a connection properly before we delete the associated subscribe socket.
163 : */
164 562 : std::vector<uint64_t> FelixClientContextHandler::getFidsToUnsubscribe(){
165 562 : std::vector<uint64_t> allFids;
166 1159 : std::for_each(socketContextMap.begin(), socketContextMap.end(), [&allFids](auto& ctx_it){std::copy(ctx_it.second->subFids.begin(), ctx_it.second->subFids.end(), std::back_inserter(allFids));});
167 562 : std::vector<uint64_t> fidsToUnsub(allFids.size());
168 2843 : auto end_it = std::copy_if(allFids.begin(), allFids.end(), fidsToUnsub.begin(), [this](auto& fid){return (this->infoByFid[fid]->getSubscriptionState() == SUB || this->infoByFid[fid]->getSubscriptionState() == SUBING)&&( this->infoByFid[fid]->getConnectionType() == PUBSUB);});
169 562 : fidsToUnsub.resize(std::distance(fidsToUnsub.begin(), end_it));
170 562 : return fidsToUnsub;
171 562 : }
172 :
173 :
174 : /**
175 : * @brief Get vector of all send sockets
176 : *
177 : * This function returns a vector of pointers to all send sockets that are connected right now.
178 : * The function iterates over all sockets in the socketContextMap and adds the pointer of netio_send_socket (also in buffered case) to the vector.
179 : * This list is used in the destructor to properly shutdown the connection to the server side via netio_disconnect() before deleting the socket object.
180 : */
181 423 : std::vector<netio_send_socket*> FelixClientContextHandler::getSendSockets(){
182 423 : std::vector<netio_send_socket*> sendSockets;
183 876 : for (auto& ctx_it : socketContextMap){
184 453 : netio_send_socket* socket = std::visit(
185 453 : overloaded{[](netio_subscribe_socket& s) {
186 : return static_cast<netio_send_socket*>(NULL);
187 : },
188 : [](netio_unbuffered_subscribe_socket& s) {
189 : return static_cast<netio_send_socket*>(NULL);
190 : },
191 : [](netio_send_socket& s) {
192 : return &s;
193 : },
194 141 : [](netio_buffered_send_socket& s) {
195 141 : return &(s.send_socket);
196 : },
197 : [](std::monostate& s) {
198 : return static_cast<netio_send_socket*>(NULL);
199 : },
200 453 : }, *ctx_it.second->socket);
201 453 : if(socket){
202 264 : sendSockets.push_back(socket);
203 : }
204 : }
205 423 : return sendSockets;
206 0 : }
207 :
208 : /**
209 : * @brief Returns a map of void pointers to subscribe socket and the associated netio_socket_type
210 : *
211 : * Since we need to distinguish between buffered and unbuffered subscribe sockets when we want to clean them up, this function returns void pointer to
212 : * all subscribe sockets and their netio_socket_type stored in map.
213 : * This map/list is used in the destructor to properly shutdown the subscribe socket after all FIDs are unsubscribed.
214 : */
215 0 : std::unordered_map<void*, netio_socket_type> FelixClientContextHandler::getSubSocketsToDelete(){
216 0 : std::unordered_map<void*, netio_socket_type> voidSubSocketsToDelete;
217 0 : for (auto& it :mSubSocketsToDelete){
218 0 : netio_socket_type type = std::visit(
219 0 : overloaded{[](netio_subscribe_socket& s) {
220 : return BUFFERED;
221 : },
222 : [](netio_unbuffered_subscribe_socket& s) {
223 : return UNBUFFERED;
224 : },
225 : [](netio_send_socket& s) {
226 : return UNBUFFERED;
227 : },
228 : [](netio_buffered_send_socket& s) {
229 : return BUFFERED;
230 : },
231 : [](std::monostate& s) {
232 : return BUFFERED;
233 : },
234 0 : }, *it);
235 0 : voidSubSocketsToDelete[mGetVoidSocketPointer(*it)] = type;
236 : }
237 0 : return voidSubSocketsToDelete;
238 0 : }
239 :
240 :
241 238 : bool FelixClientContextHandler::isForRegister(uint64_t fid){
242 238 : return infoByFid[fid]->getForRegister();
243 : }
244 :
245 : /**
246 : * @brief Returns true is FID is subscribed
247 : *
248 : * @param fid: FID
249 : */
250 479 : bool FelixClientContextHandler::isSubscribed(uint64_t fid){
251 479 : if (infoByFid.count(fid) > 0){
252 479 : return (infoByFid[fid]->getSubscriptionState() == SUB);
253 : } else {
254 : return false;
255 : }
256 : }
257 :
258 : /**
259 : * @brief Function blocks until all FIDs are subscribed or the timeout expired.
260 : *
261 : * The function can be used to wait for the subscription of one or more FIDs was successful or the timeot expired.
262 : * To achieve this, the function adds a multiWaitCtx object to all FID info objects, which has in internal counter that is increased for every
263 : * subscription that was succesful. The conditional variale "multiWaitCtx->allSubscribed" uses the std function wait_for to determine if the timeout expires
264 : * or the condition is met.
265 : * Note: The wait_for() function requires a unique_lock on a mutex. Since we do not realy need to synchronise something, it is sufficient to use a local mutex/lock
266 : * @param fids: vector fo FIDs tha are checked for their subscription state
267 : * @param timeoutMs:timeout in ms after which the funciton returns even when not all FIDs are subscribed.
268 : */
269 420 : bool FelixClientContextHandler::isSubscribed(std::vector<uint64_t> fids, uint64_t timeoutMs){
270 420 : auto multiWaitCtx = std::make_unique<MultiFidsWaitContext>();
271 420 : multiWaitCtx->expectedSize = fids.size();
272 1985 : for(auto fid : fids){
273 1565 : if (infoByFid.count(fid) > 0){
274 1565 : if(infoByFid[fid]->getSubscriptionState() != SUB){
275 1565 : infoByFid[fid]->setWaitContext(multiWaitCtx.get());
276 : }
277 0 : else{multiWaitCtx->count++;}
278 : }
279 : }
280 420 : clog_debug("Waiting for %d fids to subscribe. Count: %d", multiWaitCtx->expectedSize, multiWaitCtx->count);
281 420 : bool status = true;
282 420 : if(multiWaitCtx->count < multiWaitCtx->expectedSize ){
283 417 : auto timeExpiredMs = std::chrono::milliseconds(0);
284 417 : std::mutex m;
285 417 : std::unique_lock<std::mutex> lck(m);
286 417 : auto t0 = std::chrono::steady_clock::now();
287 1256 : while (multiWaitCtx->count != multiWaitCtx->expectedSize && timeExpiredMs < std::chrono::milliseconds(timeoutMs)){
288 422 : status = (multiWaitCtx->allSubscribed.wait_for(lck, std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::milliseconds(timeoutMs) - timeExpiredMs)) == std::cv_status::no_timeout);
289 422 : timeExpiredMs = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - t0);
290 : }
291 417 : }
292 1985 : for (auto fid : fids){
293 1565 : infoByFid[fid]->clearWaitContext();
294 : }
295 420 : return status;
296 847 : }
297 :
298 : /**
299 : * @brief Checks if all FIDs are unsubscribed
300 : *
301 : * Since this function iterates over all sockets, we need to make sure that no socket is removed from the socketContextMap while we are are iterting.
302 : * Because the socketContextMap contains sockets of all types, a scoped_lock is used to lock the send_mtx and sub_mtx.
303 : */
304 6170 : bool FelixClientContextHandler::areAllFidsUnsubscribed(){
305 6170 : std::scoped_lock lock(send_mtx, sub_mtx);
306 6718 : for (auto& ctx_it : socketContextMap){
307 6434 : clog_debug("socket 0x%p has state 0x%p", ctx_it.first, netio_socket_state_name(ctx_it.second->state.load()));
308 6434 : if(ctx_it.second->state != DISCONNECTED && getConnectionType(ctx_it.first) == PUBSUB){
309 6170 : return false;
310 : }
311 : }
312 : return true;
313 6170 : }
314 :
315 :
316 : /**
317 : * @brief Checks if all FIDs associated with a particular socket are unsubscribed
318 : *
319 : * This function checks if there are FIDs associated to the socket, that are not ualready nscubscribed or currently unsubscribing.
320 : * @param socket: void pointer to a netio_socket. (should be a subscribe_socket but will also work with send_socket)
321 : */
322 2013 : bool FelixClientContextHandler::areAllFidsUnsubscribed(void* socket){
323 2013 : auto fids = getFidsBySocket(socket);
324 90716 : int i = std::count_if(fids.begin(), fids.end(), [this](auto& fid){return (this->infoByFid[fid]->getSubscriptionState() != UNSUB && this->infoByFid[fid]->getSubscriptionState() != UNSUBING);});
325 2013 : clog_debug("Remaining subscritions: %d", i);
326 2013 : return i;
327 2013 : }
328 :
329 :
330 : /**
331 : * @brief Check if felix-client can subscribe to this FID
332 : *
333 : * FIDs are not eligible to subscribe if we don't have a felix_client_info object for this FID or if the FID was unsubscribed before.
334 : *
335 : * TODO:Maybe we also need to check what we want to do here if the FID is in the process of unsubscribing but not done yet
336 : *
337 : * @param fid: FID
338 : */
339 2188 : bool FelixClientContextHandler::canSubscribe(uint64_t fid){
340 2188 : if (infoByFid.count(fid) == 0 || (infoByFid[fid]->getSubscriptionState() == UNSUB)) {
341 2164 : return true;
342 : } else {
343 : return false;
344 : }
345 : }
346 :
347 :
348 : /**
349 : * @brief Checks if all felix_client_info object for this FID already exists.
350 : *
351 : * @param fid: FID
352 : */
353 17521 : bool FelixClientContextHandler::exists(uint64_t fid){
354 17521 : return infoByFid.count(fid);
355 : }
356 :
357 : /**
358 : * @brief checks if all socket for this void pointer still exists.
359 : *
360 : * @param socket: void pointer to socket of any type.
361 : */
362 16686 : bool FelixClientContextHandler::exists(void* socket){
363 16686 : return socketContextMap.count(socket);
364 : }
365 :
366 :
367 : /**
368 : * @brief Function that waits for a socket associated to this FID to be connected or timeout
369 : *
370 : * This function retrives the socket associated to this FID and checks its connection state.
371 : * If it is currently connecting, it will wait on a condition variable that is notified in the on_connection_establiched() callback.
372 : * If hte connection is not established within the spcified timeout, the function returns fals for not connected.
373 : *
374 : * @param fid: FID
375 : * @param timeoutMs: timeout [ms]
376 : *
377 : */
378 274 : bool FelixClientContextHandler::waitConnected(uint64_t fid, uint64_t timeoutMs){
379 274 : if (infoByFid.count(fid) > 0){
380 274 : SocketWrapper* sw = infoByFid[fid]->getSocket();
381 274 : return std::visit(
382 274 : overloaded{[](std::monostate& s) {
383 : return false;
384 274 : },[this, timeoutMs, fid](auto& s){
385 274 : if(socketContextMap[static_cast<void*>(&s)].get()->state.load()==CONNECTED){
386 : return true;
387 274 : } else if(socketContextMap[static_cast<void*>(&s)].get()->state.load()==DISCONNECTED){
388 : return false;
389 : }else{
390 274 : auto timeExpiredMs = std::chrono::milliseconds(0);
391 274 : std::unique_lock<std::mutex> lck(socketContextMap[static_cast<void*>(&s)].get()->mMux);
392 274 : clog_debug("Start waiting, fid: 0x%x with socket 0x%p", fid, socketContextMap[static_cast<void*>(&s)].get());
393 274 : auto t0 = std::chrono::steady_clock::now();
394 547 : while(socketContextMap[static_cast<void*>(&s)].get()->state.load()==CONNECTING && timeExpiredMs < std::chrono::milliseconds(timeoutMs)){
395 273 : socketContextMap[static_cast<void*>(&s)].get()->mConnectionWait.wait_for(lck, std::chrono::milliseconds(timeoutMs));
396 273 : timeExpiredMs = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - t0);
397 : }
398 274 : return (socketContextMap[static_cast<void*>(&s)].get()->state.load() == CONNECTED);
399 274 : }
400 : }
401 : }, *sw);
402 : } else {
403 : return false;
404 : }
405 : }
406 :
407 4263 : void FelixClientContextHandler::setSubscriptionTimes(uint64_t fid, std::string name, timespec ts){
408 8526 : return infoByFid[fid]->setSubscriptionTimes(name, ts);
409 : }
410 :
411 : /**
412 : * @brief Setting socket state
413 : *
414 : * If the new state is CONNECTED or DISCONNECTED, the a condition variable is notified to inform the subscribing thread if the operation was successful or not
415 : *
416 : * @param fid: FID
417 : * @param s: New socket state
418 : *
419 : */
420 0 : void FelixClientContextHandler::setSocketState(uint64_t fid, netio_socket_state s){
421 0 : if (infoByFid.count(fid) > 0){
422 0 : SocketWrapper* sw = infoByFid[fid]->getSocket();
423 0 : clog_debug("For socket: 0x%p new state %d", mGetVoidSocketPointer(*sw), s);
424 0 : socketContextMap[mGetVoidSocketPointer(*sw)]->state = s;
425 0 : if (s == CONNECTED || s == DISCONNECTED){socketContextMap[mGetVoidSocketPointer(*sw)]->mConnectionWait.notify_all();}
426 : }
427 0 : }
428 :
429 : /**
430 : * @brief Setting socket state
431 : *
432 : * If the new state is CONNECTED or DISCONNECTED, the a condition variable is notified to inform the subscribing thread if the operation was successful or not
433 : *
434 : * @param socket: void pointer to a netio_socket
435 : * @param s: New socket state
436 : *
437 : */
438 918 : void FelixClientContextHandler::setSocketState(void* socket, netio_socket_state s){
439 918 : socketContextMap[socket]->state = s;
440 918 : if (s == CONNECTED || s == DISCONNECTED){socketContextMap[socket]->mConnectionWait.notify_all();}
441 918 : }
442 :
443 8485 : void FelixClientContextHandler::setSubscriptionState(uint64_t fid, subscribe_state s){
444 8485 : infoByFid[fid]->setSubscriptionState(s);
445 8485 : }
446 :
447 :
448 : /**
449 : * @brief Adds FID to a set of FIDs that need to subscribe
450 : *
451 : * FIDs get either added to the list during the first subscription, if they are not subcribed yet or if the server side shut down the connection.
452 : * Felix-client will try to subscribe all FIDs in this list periodically until the subscription is succesful.
453 : *
454 : * @param fid: FID
455 : */
456 2107 : void FelixClientContextHandler::addToFidsToResubscribe(uint64_t fid){
457 2107 : auto it = std::find(mResubFids.begin(), mResubFids.end(), fid);
458 2107 : if (it == mResubFids.end()){
459 2107 : mResubFids.push_back(fid);
460 : }
461 2107 : }
462 :
463 2387 : void FelixClientContextHandler::removeFromFidsToResubscribe(uint64_t fid){
464 2387 : auto it = std::find(mResubFids.begin(), mResubFids.end(), fid);
465 2387 : if (it != mResubFids.end()){
466 2113 : mResubFids.erase(it);
467 : }else{
468 274 : clog_debug("Fid not foud in mReSubFids");
469 : }
470 2387 : }
471 :
472 : /**
473 : * @brief Remove socket object
474 : *
475 : * This function removes sockets form the socketContextMaps and thereby destroying it.
476 : * Different cases need to be considered to take care of all references to this socket and to update the corresponding information.
477 : *
478 : * SubSocket objects must not be destroyed here, since they contain a send and listen socket that are not cleaned up yet.
479 : * Instead all references to the socket are removed and they object is put into a different list to destroy it later.
480 : *
481 : * @param socket: void pointer to a netio_socket
482 : * @param connection_refused: Status variable if the socket is removed because the connection was refused
483 : */
484 139 : void FelixClientContextHandler::removeSocket(void* socket, bool connection_refused){
485 139 : auto socketFids = getFidsBySocket(socket);
486 139 : if (getConnectionType(socket) == PUBSUB ){
487 140 : for (auto& fid : socketFids)
488 : {
489 49 : if(!terminatingFelixClient.load()){ //necessary?
490 49 : auto it = std::find(mResubFids.begin(), mResubFids.end(), fid);
491 49 : if (it == mResubFids.end()){
492 4 : mResubFids.push_back(fid);
493 : }
494 : }else{
495 0 : clog_debug("Already terminating felix-client not adding fids to mResubFids");
496 : }
497 49 : infoByFid.at(fid)->removeSocket();
498 49 : infoByFid.at(fid)->setSubscriptionState(UNSUB);
499 : }
500 :
501 91 : if(!connection_refused){
502 46 : mSubSocketsToDelete.push_back(std::move(socketContextMap[socket]->socket));
503 : }
504 91 : std::unique_lock<std::mutex> lock(sub_mtx);
505 91 : socketContextMap.erase(socket);
506 91 : } else {
507 48 : {
508 48 : std::unique_lock<std::mutex> lck(socketContextMap[socket].get()->mMux);
509 96 : for (auto& fid : socketFids)
510 : {
511 48 : if(!terminatingFelixClient.load()){
512 48 : auto it = std::find(mResubFids.begin(), mResubFids.end(), fid);
513 48 : if (it == mResubFids.end()){
514 2 : mResubFids.push_back(fid);
515 : }
516 : }else{
517 0 : clog_debug("Already terminating felix-client not adding fids to mResubFids");
518 : }
519 48 : infoByFid.at(fid)->removeSocket();
520 : }
521 0 : }
522 48 : std::unique_lock<std::mutex> lock(send_mtx);
523 48 : socketContextMap.erase(socket);
524 48 : }
525 139 : }
526 :
527 :
528 : /**
529 : * @brief Update all information related to a subscription when unsubscribing
530 : *
531 : * This function erases the FID from the corresponding socket and the socket is removed from the fid_info.
532 : *
533 : * @param fid: FID
534 : */
535 2013 : void FelixClientContextHandler::updateFidsWhenUnsub(uint64_t fid){
536 2013 : auto fids = &socketContextMap[mGetVoidSocketPointer(*infoByFid[fid]->getSocket())].get()->subFids;
537 2013 : auto it = std::find(fids->begin(), fids->end(), fid);
538 2013 : if (it != fids->end()){
539 2013 : fids->erase(it);
540 2013 : infoByFid[fid]->removeSocket();
541 : }else{
542 0 : clog_debug("Fid not foud in SubFids");
543 : }
544 2013 : }
545 :
546 :
547 : /**
548 : * @brief Updates the info from bus (usually before subscribing) or creates new felix_client_info object if it does not exist yet
549 : *
550 : * @param fid: FID
551 : * @param bus: pointer to FelixBus object
552 : * @param forRegister: determines if FID is used to get info from register (which assigns a different callback)
553 : */
554 18511 : void FelixClientContextHandler::createOrUpdateInfo(uint64_t fid, felixbus::FelixBus* bus, bool forRegister){
555 18511 : if (infoByFid.count(fid)) {
556 16119 : if((fid & 0x8000) && getSocketState(fid) != DISCONNECTED){
557 : return;
558 : }
559 2185 : try {
560 2185 : mUpdateInfo(fid, bus);
561 0 : } catch(std::exception& e){
562 0 : clog_info("Error while updating info from bus: %s. FID: 0x%x", e.what(), fid);
563 0 : throw;
564 0 : }
565 :
566 : } else {
567 2392 : try {
568 2392 : mCreateInfo(fid, bus);
569 9 : } catch(std::exception& e){
570 9 : clog_info("Error while creating info from bus: %s. FID: 0x%x", e.what(), fid);
571 9 : throw FelixClientResourceNotAvailableException();
572 9 : }
573 : }
574 4568 : if(forRegister){
575 17 : infoByFid[fid]->setForRegister(forRegister);
576 : }
577 : }
578 :
579 : /**
580 : * @brief Creates new felix_client_info object
581 : *
582 : * The information is retrieved from the bus file. A mutex prevents concurrent access to the bus object.
583 : * Additionally time for the bus read is meassured and a warning is printed is the access took more than 1s.
584 : *
585 : * @param fid: FID
586 : * @param bus: pointer to FelixBus object
587 : */
588 2392 : void FelixClientContextHandler::mCreateInfo(uint64_t fid, felixbus::FelixBus* bus){
589 2392 : std::error_code ec;
590 2392 : felixbus::FelixBusInfo bus_info;
591 2392 : {
592 2392 : struct timespec t0, t1;
593 2392 : clock_gettime(CLOCK_MONOTONIC_RAW, &t0);
594 2392 : std::lock_guard<std::mutex> lock(info_mtx);
595 2392 : bus_info = bus->get_info(fid, ec);
596 2392 : clock_gettime(CLOCK_MONOTONIC_RAW, &t1);
597 2392 : double bus_useconds = (t1.tv_sec - t0.tv_sec)*1e6 + (t1.tv_nsec - t0.tv_nsec)*1e-3;
598 2392 : if(bus_useconds > 1000000){
599 0 : clog_warn("Bus access took more than one second: %dus.", bus_useconds);
600 : }
601 0 : }
602 2392 : if (!ec) {
603 2383 : infoByFid[fid] = std::make_unique<FelixClientContext>();
604 2383 : if(bus_info.ip == ""){throw std::invalid_argument("Elink not in bus");}
605 2383 : infoByFid[fid]->setInfos(bus_info.ip, bus_info.port, bus_info.netio_pages, bus_info.netio_pagesize, 0.9 * bus_info.netio_pagesize, bus_info.unbuffered, bus_info.pubsub, bus_info.raw_tcp);
606 : } else {
607 9 : throw FelixClientResourceNotAvailableException();//d::exception(ec.message());
608 : }
609 2383 : clog_debug("Connection: %d type: %d for FID: 0x%x", bus_info.pubsub, bus_info.unbuffered, fid);
610 2383 : clog_debug("IP: %s Port: %d", bus_info.ip.c_str(), bus_info.port);
611 2392 : }
612 :
613 : /**
614 : * @brief Updates felix_client_info object
615 : *
616 : * The information is retrieved from the bus file. A mutex prevents concurrent access to the bus object.
617 : * Additionally time for the bus read is meassured and a warning is printed is the access took more than 1s.
618 : *
619 : * @param fid: FID
620 : * @param bus: pointer to FelixBus object
621 : */
622 2185 : void FelixClientContextHandler::mUpdateInfo(uint64_t fid, felixbus::FelixBus* bus){
623 2185 : std::error_code ec;
624 2185 : felixbus::FelixBusInfo bus_info;
625 2185 : {
626 2185 : struct timespec t0, t1;
627 2185 : clock_gettime(CLOCK_MONOTONIC_RAW, &t0);
628 2185 : std::lock_guard<std::mutex> lock(info_mtx);
629 2185 : bus_info = bus->get_info(fid, ec);
630 2185 : clock_gettime(CLOCK_MONOTONIC_RAW, &t1);
631 2185 : double bus_useconds = (t1.tv_sec - t0.tv_sec)*1e6 + (t1.tv_nsec - t0.tv_nsec)*1e-3;
632 2185 : if(bus_useconds > 1000000){
633 0 : clog_warn("Bus access took more than one second: %dus.", bus_useconds);
634 : }
635 0 : }
636 2185 : if (!ec) {
637 2185 : auto info = std::move(infoByFid[fid]);
638 2185 : if (info.get() == NULL){
639 0 : mCreateInfo(fid, bus);
640 0 : return;
641 : }
642 2185 : if(bus_info.ip == ""){throw std::invalid_argument("Elink not in bus");}
643 2185 : info->setInfos(bus_info.ip, bus_info.port, bus_info.netio_pages, bus_info.netio_pagesize, 0.9 * bus_info.netio_pagesize, bus_info.unbuffered, bus_info.pubsub, bus_info.raw_tcp);
644 2185 : infoByFid[fid] = std::move(info);
645 2185 : } else {
646 0 : clog_error("Error while reading bus: %s", ec.message().c_str());
647 0 : throw FelixClientResourceNotAvailableException();
648 : }
649 2185 : }
650 :
651 1824 : void FelixClientContextHandler::setSocketToAllUnsubscribed(void* socket){
652 1824 : socketContextMap[socket]->closingConnection = true;
653 1824 : }
654 :
655 2484 : bool FelixClientContextHandler::addOrCreateSocket(uint64_t fid){ //returns true if a new socket was created and needs to be initialized
656 2484 : if(mAddSocket(fid)){return false;}
657 550 : else {mCreateSocket(fid);}
658 550 : return true;
659 : }
660 :
661 : /**
662 : * @brief Adds socket to felix_client_info
663 : *
664 : * Sockets are identfied by the endpoint they connect to. If a socket to the requested endpoint already exists, this socket is
665 : * added to the felix_client_info object of the particular FID. The stats is set to SUB automatically, since the subscription should
666 : * be fast and there is no exact feedback from the derver (felix-tohost) side when the subscription is actually processed.
667 : *
668 : * @param fid: FID
669 : */
670 2484 : bool FelixClientContextHandler::mAddSocket(uint64_t fid){
671 2484 : std::string fid_address = getAddress(fid);
672 : //Socket already exists
673 2555 : for(auto&& ctx : socketContextMap){
674 2005 : if(ctx.second->address == fid_address && !ctx.second->closingConnection){
675 1934 : infoByFid[fid]->setSocket(ctx.second->socket.get());
676 1934 : infoByFid[fid]->setSubscriptionState(SUB);
677 1934 : ctx.second->subFids.push_back(fid);
678 2484 : return true;
679 : }
680 : }
681 : return false;
682 2484 : }
683 :
684 : /**
685 : * @brief Create new socket
686 : *
687 : * The type of socket is determined by the felix-bus and the information provided for a particular FID.
688 : * Afterwars the socket is added to the socketContextMap that holds all socket objects.
689 : *
690 : * @param fid: FID
691 : */
692 550 : void FelixClientContextHandler::mCreateSocket(uint64_t fid){
693 550 : std::unique_ptr<SocketWrapper> sw_ptr;
694 550 : if(infoByFid[fid]->getType() == BUFFERED && infoByFid[fid]->getConnectionType() == PUBSUB){
695 260 : sw_ptr = std::make_unique<SocketWrapper>(SocketWrapper{netio_subscribe_socket{}});
696 420 : } else if (infoByFid[fid]->getType() == UNBUFFERED && infoByFid[fid]->getConnectionType() == PUBSUB){
697 216 : sw_ptr = std::make_unique<SocketWrapper>(SocketWrapper{netio_unbuffered_subscribe_socket{}});
698 312 : }else if(infoByFid[fid]->getType() == BUFFERED && infoByFid[fid]->getConnectionType() == SENDRECV){
699 330 : sw_ptr = std::make_unique<SocketWrapper>(SocketWrapper{netio_buffered_send_socket{}});
700 147 : } else if(infoByFid[fid]->getType() == UNBUFFERED && infoByFid[fid]->getConnectionType() == SENDRECV){
701 294 : sw_ptr = std::make_unique<SocketWrapper>(SocketWrapper{netio_send_socket{}});
702 : }else{
703 0 : return; //should not enter here
704 : }
705 550 : socketContextMap[static_cast<void*>(sw_ptr.get())] = std::make_unique<SocketContext>();
706 550 : auto* sctx = socketContextMap[static_cast<void*>(sw_ptr.get())].get();
707 550 : sctx->state = CONNECTING;
708 550 : sctx->closingConnection = false;
709 550 : sctx->subFids = std::vector<uint64_t>{fid};
710 550 : sctx->socket = std::move(sw_ptr);
711 550 : sctx->address = getAddress(fid);
712 550 : infoByFid[fid]->setSocket(sctx->socket.get());
713 550 : clog_debug("Create Socket. FID: 0x%x. Socket 0x%p should be 0x%p", fid, sctx->socket.get(), infoByFid[fid]->getSocket());
714 : return;
715 550 : }
716 :
717 : /**
718 : * @brief Returns a void pointer to all kinds of sockets
719 : *
720 : * @param wrapper: Pointer to SocketWrapper objects that contains requested socket
721 : */
722 34940 : void* FelixClientContextHandler::mGetVoidSocketPointer(SocketWrapper& wrapper) {
723 34940 : return std::visit([](auto& containedValueRef) {
724 : return static_cast<void*>(&containedValueRef);
725 34940 : }, wrapper);
726 : }
727 :
728 : /**
729 : * @brief deletes subscribe sockets
730 : *
731 : * Subsocket objects must not be destroyed right away, when they are erased.
732 : * This function takes care about destroying the objects later, when it is save to do so.
733 : *
734 : * @param socket: Pointer to subscribe socket
735 : */
736 0 : void FelixClientContextHandler::deleteSubSocket(void* socket){
737 0 : for( std::vector<std::unique_ptr<SocketWrapper>> ::iterator iter = mSubSocketsToDelete.begin(); iter != mSubSocketsToDelete.end(); ++iter )
738 : {
739 0 : if( mGetVoidSocketPointer(*iter->get()) == socket )
740 : {
741 0 : mSubSocketsToDelete.erase(iter);
742 0 : break;
743 : }
744 : }
745 0 : }
746 :
747 14519 : void FelixClientContextHandler::pullSendTask(send_var_t& task){
748 14519 : mSendQueue->popTask(task);
749 14519 : }
750 :
751 4085 : void FelixClientContextHandler::pullSubTask(sub_var_t& task){
752 4085 : mSubQueue->popTask(task);
753 4085 : }
|