LCOV - code coverage report
Current view: top level - felix-client/src - felix_client_context_handler.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 272 317 85.8 %
Date: 2025-08-12 04:15:35 Functions: 52 58 89.7 %

          Line data    Source code
       1             : #include "felix/felix_client_context_handler.hpp"
       2             : #include "felix/felix_client_exception.hpp"
       3             : 
       4             : #include "clog.h"
       5             : 
       6             : /**
       7             :  * @brief Get netio_socket_type by FID
       8             :  *
       9             :  * The information is stored in the info object or this FID and is read from the bus when the info object is created.
      10             :  * @param fid: FID
      11             :  */
      12       20868 : netio_socket_type FelixClientContextHandler::getType(uint64_t fid){
      13       20868 :     return infoByFid[fid]->getType();
      14             : }
      15             : 
      16             : /**
      17             :  * @brief Get netio_socket_type from socket
      18             :  *
      19             :  * This function returns the netio_socket_type of a void pointer to a socket.
      20             :  * If the socket does not exists, it returns BUFFERED since it has to have a valid return value.
      21             :  * @param socket: void pointer to a netio_socket
      22             :  */
      23           0 : netio_socket_type FelixClientContextHandler::getType(void* socket){
      24           0 :     return std::visit(
      25           0 :     overloaded{[](netio_subscribe_socket& s) {
      26             :             return BUFFERED;
      27             :         },
      28             :         [](netio_unbuffered_subscribe_socket& s) {
      29             :             return UNBUFFERED;
      30             :         },
      31             :         [](netio_send_socket& s) {
      32             :             return UNBUFFERED;
      33             :         },
      34             :         [](netio_buffered_send_socket& s) {
      35             :             return BUFFERED;
      36             :         },
      37             :         [](std::monostate& s) {
      38             :             return BUFFERED;
      39             :         },
      40           0 :     }, *socketContextMap[socket]->socket);
      41             : }
      42             : 
      43             : /**
      44             :  * @brief Get netio_socket_connection by FID
      45             :  *
      46             :  * The information is stored in the info object or this FID and is read from the bus when the info object is created.
      47             :  * A connection type can either be of type PUBSUB or SENDRECV.
      48             :  * @param fid: FID
      49             :  */
      50        1280 : netio_socket_connection FelixClientContextHandler::getConnectionType(uint64_t fid){
      51        1280 :     return infoByFid[fid]->getConnectionType();
      52             : }
      53             : 
      54             : /**
      55             :  * @brief Get netio_socket_connection from socket
      56             :  *
      57             :  * This function returns the netio_socket_connection of a void pointer to a socket.
      58             :  * A connection type can either be of type PUBSUB or SENDRECV.
      59             :  * If the socket does not exists, it returns SENDRECV since it has to have a valid return value.
      60             :  * @param socket: void pointer to a netio_socket
      61             :  */
      62        6891 : netio_socket_connection FelixClientContextHandler::getConnectionType(void* socket){
      63        6891 :     return std::visit(
      64        6891 :     overloaded{[](netio_subscribe_socket& s) {
      65             :             return PUBSUB;
      66             :         },
      67             :         [](netio_unbuffered_subscribe_socket& s) {
      68             :             return PUBSUB;
      69             :         },
      70             :         [](netio_send_socket& s) {
      71             :             return SENDRECV;
      72             :         },
      73             :         [](netio_buffered_send_socket& s) {
      74             :             return SENDRECV;
      75             :         },
      76             :         [](std::monostate& s) {
      77             :             return SENDRECV;
      78             :         },
      79        6891 :     }, *socketContextMap[socket]->socket);
      80             : }
      81             : 
      82             : /**
      83             :  * @brief Get netio_socket_state by FID
      84             :  *
      85             :  * Returns the state of a socket that is associated to this FID. If the FID has no associated socket the function returns DISCONNECTED.
      86             :  * We must portect this call with a mutex since send_sockets are removed fromt he socketContextMap when a connection is closed.
      87             :  * @param fid: FID
      88             :  */
      89       62876 : netio_socket_state FelixClientContextHandler::getSocketState(uint64_t fid){
      90       62876 :     std::unique_lock<std::mutex> lck(send_mtx);
      91       62876 :     SocketWrapper* sw = infoByFid[fid]->getSocket();
      92       62876 :     return std::visit(
      93       62876 :         overloaded{[](std::monostate& s) {
      94             :                 return DISCONNECTED;
      95       62473 :             },[this](auto& s){
      96       62473 :                return socketContextMap[static_cast<void*>(&s)]->state.load();
      97             :             }
      98       62876 :         }, *sw);
      99       62876 : }
     100             : 
     101             : /**
     102             :  * @brief Get subscribe_state of a FID
     103             :  *
     104             :  * Returns the ssubscribe_state of a FID.
     105             :  * @param fid: FID
     106             :  */
     107       19016 : subscribe_state FelixClientContextHandler::getSubscriptionState(uint64_t fid){
     108       19016 :     return infoByFid[fid]->getSubscriptionState();
     109             : }
     110             : 
     111             : /**
     112             :  * @brief Returns a map with timestamps to measure subscription times
     113             :  *
     114             :  * The first timestamp is taken in the scubribe function, called by the user, the second one when the subscribe is processed by the eventloop.
     115             :  * @param fid: FID
     116             :  */
     117        2387 : std::unordered_map<std::string, timespec> FelixClientContextHandler::getSubscriptionTimes(uint64_t fid){
     118        2387 :     return infoByFid[fid]->getSubscriptionTimes();
     119             : }
     120             : 
     121         295 : uint FelixClientContextHandler::getWatermark(uint64_t fid){
     122         295 :     return infoByFid[fid]->getWatermark();
     123             : }
     124             : 
     125         321 : uint FelixClientContextHandler::getPagesize(uint64_t fid){
     126         321 :     return infoByFid[fid]->getPagesize();
     127             : }
     128             : 
     129         321 : uint FelixClientContextHandler::getPages(uint64_t fid){
     130         321 :     return infoByFid[fid]->getPages();
     131             : }
     132             : 
     133       15023 : uint FelixClientContextHandler::getPort(uint64_t fid){
     134       15023 :     return infoByFid[fid]->getPort();
     135             : }
     136             : 
     137       15023 : std::string FelixClientContextHandler::getIp(uint64_t fid){
     138       15023 :     return infoByFid[fid]->getIp();
     139             : }
     140             : 
     141        3631 : std::string FelixClientContextHandler::getAddress(uint64_t fid){
     142       10893 :     return infoByFid[fid]->getIp() + ":" + std::to_string(infoByFid[fid]->getPort());
     143             : }
     144             : 
     145         442 : bool FelixClientContextHandler::isTcp(uint64_t fid) {
     146         442 :     return infoByFid[fid]->isTcp();
     147             : }
     148             : 
     149        3197 : std::vector<uint64_t> FelixClientContextHandler::getFidsToResubscribe(){
     150        3197 :     return mResubFids;
     151             : }
     152             : 
     153        4259 : std::vector<uint64_t> FelixClientContextHandler::getFidsBySocket(void* socket){
     154        4259 :     return socketContextMap[socket]->subFids;
     155             : }
     156             : 
     157             : 
     158             : /**
     159             :  * @brief Get vector of all subscribed FIDs
     160             :  *
     161             :  * This function returns a vector of all FIDs that are subscribed via a subscribe socket right now.
     162             :  * This list is used in the destructor to unsubscribe and close a connection properly before we delete the associated subscribe socket.
     163             :  */
     164         562 : std::vector<uint64_t> FelixClientContextHandler::getFidsToUnsubscribe(){
     165         562 :     std::vector<uint64_t> allFids;
     166        1159 :     std::for_each(socketContextMap.begin(), socketContextMap.end(), [&allFids](auto& ctx_it){std::copy(ctx_it.second->subFids.begin(), ctx_it.second->subFids.end(), std::back_inserter(allFids));});
     167         562 :     std::vector<uint64_t> fidsToUnsub(allFids.size());
     168        2843 :     auto end_it = std::copy_if(allFids.begin(), allFids.end(), fidsToUnsub.begin(), [this](auto& fid){return (this->infoByFid[fid]->getSubscriptionState() == SUB || this->infoByFid[fid]->getSubscriptionState() == SUBING)&&( this->infoByFid[fid]->getConnectionType() == PUBSUB);});
     169         562 :     fidsToUnsub.resize(std::distance(fidsToUnsub.begin(), end_it));
     170         562 :     return fidsToUnsub;
     171         562 : }
     172             : 
     173             : 
     174             : /**
     175             :  * @brief Get vector of all send sockets
     176             :  *
     177             :  * This function returns a vector of pointers to all send sockets that are connected right now.
     178             :  * The function iterates over all sockets in the socketContextMap and adds the pointer of netio_send_socket (also in buffered case) to the vector.
     179             :  * This list is used in the destructor to properly shutdown the connection to the server side via netio_disconnect() before deleting the socket object.
     180             :  */
     181         423 : std::vector<netio_send_socket*> FelixClientContextHandler::getSendSockets(){
     182         423 :     std::vector<netio_send_socket*> sendSockets;
     183         876 :     for (auto& ctx_it : socketContextMap){
     184         453 :         netio_send_socket* socket = std::visit(
     185         453 :         overloaded{[](netio_subscribe_socket& s) {
     186             :                 return static_cast<netio_send_socket*>(NULL);
     187             :             },
     188             :             [](netio_unbuffered_subscribe_socket& s) {
     189             :                 return static_cast<netio_send_socket*>(NULL);
     190             :             },
     191             :             [](netio_send_socket& s) {
     192             :                 return &s;
     193             :             },
     194         141 :             [](netio_buffered_send_socket& s) {
     195         141 :                 return &(s.send_socket);
     196             :             },
     197             :             [](std::monostate& s) {
     198             :                 return static_cast<netio_send_socket*>(NULL);
     199             :             },
     200         453 :         }, *ctx_it.second->socket);
     201         453 :         if(socket){
     202         264 :             sendSockets.push_back(socket);
     203             :         }
     204             :     }
     205         423 :     return sendSockets;
     206           0 : }
     207             : 
     208             : /**
     209             :  * @brief Returns a map of void pointers to subscribe socket and the associated netio_socket_type
     210             :  *
     211             :  * Since we need to distinguish between buffered and unbuffered subscribe sockets when we want to clean them up, this function returns void pointer to
     212             :  * all subscribe sockets and their netio_socket_type stored in map.
     213             :  * This map/list is used in the destructor to properly shutdown the subscribe socket after all FIDs are unsubscribed.
     214             :  */
     215           0 : std::unordered_map<void*, netio_socket_type> FelixClientContextHandler::getSubSocketsToDelete(){
     216           0 :     std::unordered_map<void*, netio_socket_type> voidSubSocketsToDelete;
     217           0 :     for (auto& it :mSubSocketsToDelete){
     218           0 :         netio_socket_type type = std::visit(
     219           0 :             overloaded{[](netio_subscribe_socket& s) {
     220             :                 return BUFFERED;
     221             :             },
     222             :             [](netio_unbuffered_subscribe_socket& s) {
     223             :                 return UNBUFFERED;
     224             :             },
     225             :             [](netio_send_socket& s) {
     226             :                 return UNBUFFERED;
     227             :             },
     228             :             [](netio_buffered_send_socket& s) {
     229             :                 return BUFFERED;
     230             :             },
     231             :             [](std::monostate& s) {
     232             :                 return BUFFERED;
     233             :             },
     234           0 :         }, *it);
     235           0 :         voidSubSocketsToDelete[mGetVoidSocketPointer(*it)] = type;
     236             :     }
     237           0 :     return voidSubSocketsToDelete;
     238           0 : }
     239             : 
     240             : 
     241         238 : bool FelixClientContextHandler::isForRegister(uint64_t fid){
     242         238 :     return infoByFid[fid]->getForRegister();
     243             : }
     244             : 
     245             : /**
     246             :  * @brief Returns true is FID is subscribed
     247             :  *
     248             :  * @param fid: FID
     249             :  */
     250         479 : bool FelixClientContextHandler::isSubscribed(uint64_t fid){
     251         479 :     if (infoByFid.count(fid) > 0){
     252         479 :         return (infoByFid[fid]->getSubscriptionState() == SUB);
     253             :     } else {
     254             :         return false;
     255             :     }
     256             : }
     257             : 
     258             : /**
     259             :  * @brief Function blocks until all FIDs are subscribed or the timeout expired.
     260             :  *
     261             :  * The function can be used to wait for the subscription of one or more FIDs was successful or the timeot expired.
     262             :  * To achieve this, the function adds a multiWaitCtx object to all FID info objects, which has in internal counter that is increased for every
     263             :  * subscription that was succesful. The conditional variale "multiWaitCtx->allSubscribed" uses the std function wait_for to determine if the timeout expires
     264             :  * or the condition is met.
     265             :  * Note: The wait_for() function requires a unique_lock on a mutex. Since we do not realy need to synchronise something, it is sufficient to use a local mutex/lock
     266             :  * @param fids: vector fo FIDs tha are checked for their subscription state
     267             :  * @param timeoutMs:timeout in ms after which the funciton returns even when not all FIDs are subscribed.
     268             :  */
     269         420 : bool FelixClientContextHandler::isSubscribed(std::vector<uint64_t> fids, uint64_t timeoutMs){
     270         420 :     auto multiWaitCtx = std::make_unique<MultiFidsWaitContext>();
     271         420 :     multiWaitCtx->expectedSize = fids.size();
     272        1985 :     for(auto fid : fids){
     273        1565 :         if (infoByFid.count(fid) > 0){
     274        1565 :             if(infoByFid[fid]->getSubscriptionState() != SUB){
     275        1565 :                 infoByFid[fid]->setWaitContext(multiWaitCtx.get());
     276             :             }
     277           0 :             else{multiWaitCtx->count++;}
     278             :         }
     279             :     }
     280         420 :     clog_debug("Waiting for %d fids to subscribe. Count: %d", multiWaitCtx->expectedSize, multiWaitCtx->count);
     281         420 :     bool status = true;
     282         420 :     if(multiWaitCtx->count < multiWaitCtx->expectedSize ){
     283         417 :         auto timeExpiredMs = std::chrono::milliseconds(0);
     284         417 :         std::mutex m;
     285         417 :         std::unique_lock<std::mutex> lck(m);
     286         417 :         auto t0 = std::chrono::steady_clock::now();
     287        1256 :         while (multiWaitCtx->count != multiWaitCtx->expectedSize && timeExpiredMs < std::chrono::milliseconds(timeoutMs)){
     288         422 :             status = (multiWaitCtx->allSubscribed.wait_for(lck, std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::milliseconds(timeoutMs) - timeExpiredMs)) == std::cv_status::no_timeout);
     289         422 :             timeExpiredMs = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - t0);
     290             :         }
     291         417 :     }
     292        1985 :     for (auto fid : fids){
     293        1565 :         infoByFid[fid]->clearWaitContext();
     294             :     }
     295         420 :     return status;
     296         847 : }
     297             : 
     298             : /**
     299             :  * @brief Checks if all FIDs are unsubscribed
     300             :  *
     301             :  * Since this function iterates over all sockets, we need to make sure that no socket is removed from the socketContextMap while we are are iterting.
     302             :  * Because the socketContextMap contains sockets of all types, a scoped_lock is used to lock the send_mtx and sub_mtx.
     303             :  */
     304        6170 : bool FelixClientContextHandler::areAllFidsUnsubscribed(){
     305        6170 :     std::scoped_lock lock(send_mtx, sub_mtx);
     306        6718 :     for (auto& ctx_it : socketContextMap){
     307        6434 :         clog_debug("socket 0x%p has state 0x%p", ctx_it.first, netio_socket_state_name(ctx_it.second->state.load()));
     308        6434 :         if(ctx_it.second->state != DISCONNECTED && getConnectionType(ctx_it.first) == PUBSUB){
     309        6170 :             return false;
     310             :         }
     311             :     }
     312             :     return true;
     313        6170 : }
     314             : 
     315             : 
     316             : /**
     317             :  * @brief Checks if all FIDs associated with a particular socket are unsubscribed
     318             :  *
     319             :  * This function checks if there are FIDs associated to the socket, that are not ualready nscubscribed or currently unsubscribing.
     320             :  * @param socket: void pointer to a netio_socket. (should be a subscribe_socket but will also work with send_socket)
     321             :  */
     322        2013 : bool FelixClientContextHandler::areAllFidsUnsubscribed(void* socket){
     323        2013 :     auto fids = getFidsBySocket(socket);
     324       90716 :     int i = std::count_if(fids.begin(), fids.end(), [this](auto& fid){return (this->infoByFid[fid]->getSubscriptionState() != UNSUB && this->infoByFid[fid]->getSubscriptionState() != UNSUBING);});
     325        2013 :     clog_debug("Remaining subscritions: %d", i);
     326        2013 :     return i;
     327        2013 : }
     328             : 
     329             : 
     330             : /**
     331             :  * @brief Check if felix-client can subscribe to this FID
     332             :  *
     333             :  * FIDs are not eligible to subscribe if we don't have a felix_client_info object for this FID or if the FID was unsubscribed before.
     334             :  *
     335             :  * TODO:Maybe we also need to check what we want to do here if the FID is in the process of unsubscribing but not done yet
     336             :  *
     337             :  * @param fid: FID
     338             :  */
     339        2188 : bool FelixClientContextHandler::canSubscribe(uint64_t fid){
     340        2188 :     if (infoByFid.count(fid) == 0 || (infoByFid[fid]->getSubscriptionState() == UNSUB)) {
     341        2164 :         return true;
     342             :     } else {
     343             :         return false;
     344             :     }
     345             : }
     346             : 
     347             : 
     348             : /**
     349             :  * @brief Checks if all felix_client_info object for this FID already exists.
     350             :  *
     351             :  * @param fid: FID
     352             :  */
     353       17521 : bool FelixClientContextHandler::exists(uint64_t fid){
     354       17521 :     return infoByFid.count(fid);
     355             : }
     356             : 
     357             : /**
     358             :  * @brief checks if all socket for this void pointer still exists.
     359             :  *
     360             :  * @param socket: void pointer to socket of any type.
     361             :  */
     362       16686 : bool FelixClientContextHandler::exists(void* socket){
     363       16686 :     return socketContextMap.count(socket);
     364             : }
     365             : 
     366             : 
     367             : /**
     368             :  * @brief Function that waits for a socket associated to this FID to be connected or timeout
     369             :  *
     370             :  * This function retrives the socket associated to this FID and checks its connection state.
     371             :  * If it is currently connecting, it will wait on a condition variable that is notified in the on_connection_establiched() callback.
     372             :  * If hte connection is not established within the spcified timeout, the function returns fals for not connected.
     373             :  *
     374             :  * @param fid: FID
     375             :  * @param timeoutMs: timeout [ms]
     376             :  *
     377             :  */
     378         274 : bool FelixClientContextHandler::waitConnected(uint64_t fid, uint64_t timeoutMs){
     379         274 :     if (infoByFid.count(fid) > 0){
     380         274 :         SocketWrapper* sw = infoByFid[fid]->getSocket();
     381         274 :         return std::visit(
     382         274 :         overloaded{[](std::monostate& s) {
     383             :                 return false;
     384         274 :             },[this, timeoutMs, fid](auto& s){
     385         274 :                 if(socketContextMap[static_cast<void*>(&s)].get()->state.load()==CONNECTED){
     386             :                     return true;
     387         274 :                 } else if(socketContextMap[static_cast<void*>(&s)].get()->state.load()==DISCONNECTED){
     388             :                     return false;
     389             :                 }else{
     390         274 :                     auto timeExpiredMs = std::chrono::milliseconds(0);
     391         274 :                     std::unique_lock<std::mutex> lck(socketContextMap[static_cast<void*>(&s)].get()->mMux);
     392         274 :                     clog_debug("Start waiting, fid: 0x%x with socket 0x%p", fid, socketContextMap[static_cast<void*>(&s)].get());
     393         274 :                     auto t0 = std::chrono::steady_clock::now();
     394         547 :                     while(socketContextMap[static_cast<void*>(&s)].get()->state.load()==CONNECTING && timeExpiredMs < std::chrono::milliseconds(timeoutMs)){
     395         273 :                         socketContextMap[static_cast<void*>(&s)].get()->mConnectionWait.wait_for(lck, std::chrono::milliseconds(timeoutMs));
     396         273 :                         timeExpiredMs = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - t0);
     397             :                     }
     398         274 :                     return (socketContextMap[static_cast<void*>(&s)].get()->state.load() == CONNECTED);
     399         274 :                 }
     400             :             }
     401             :         }, *sw);
     402             :     } else {
     403             :         return false;
     404             :     }
     405             : }
     406             : 
     407        4263 : void FelixClientContextHandler::setSubscriptionTimes(uint64_t fid, std::string name, timespec ts){
     408        8526 :     return infoByFid[fid]->setSubscriptionTimes(name, ts);
     409             : }
     410             : 
     411             : /**
     412             :  * @brief Setting socket state
     413             :  *
     414             :  * If the new state is CONNECTED or DISCONNECTED, the a condition variable is notified to inform the subscribing thread if the operation was successful or not
     415             :  *
     416             :  * @param fid: FID
     417             :  * @param s: New socket state
     418             :  *
     419             :  */
     420           0 : void FelixClientContextHandler::setSocketState(uint64_t fid, netio_socket_state s){
     421           0 :     if (infoByFid.count(fid) > 0){
     422           0 :         SocketWrapper* sw = infoByFid[fid]->getSocket();
     423           0 :         clog_debug("For socket: 0x%p new state %d", mGetVoidSocketPointer(*sw), s);
     424           0 :         socketContextMap[mGetVoidSocketPointer(*sw)]->state = s;
     425           0 :         if (s == CONNECTED || s == DISCONNECTED){socketContextMap[mGetVoidSocketPointer(*sw)]->mConnectionWait.notify_all();}
     426             :     }
     427           0 : }
     428             : 
     429             : /**
     430             :  * @brief Setting socket state
     431             :  *
     432             :  * If the new state is CONNECTED or DISCONNECTED, the a condition variable is notified to inform the subscribing thread if the operation was successful or not
     433             :  *
     434             :  * @param socket: void pointer to a netio_socket
     435             :  * @param s: New socket state
     436             :  *
     437             :  */
     438         918 : void FelixClientContextHandler::setSocketState(void* socket, netio_socket_state s){
     439         918 :     socketContextMap[socket]->state = s;
     440         918 :     if (s == CONNECTED || s == DISCONNECTED){socketContextMap[socket]->mConnectionWait.notify_all();}
     441         918 : }
     442             : 
     443        8485 : void FelixClientContextHandler::setSubscriptionState(uint64_t fid, subscribe_state s){
     444        8485 :     infoByFid[fid]->setSubscriptionState(s);
     445        8485 : }
     446             : 
     447             : 
     448             : /**
     449             :  * @brief Adds FID to a set of FIDs that need to subscribe
     450             :  *
     451             :  * FIDs get either added to the list during the first subscription, if they are not subcribed yet or if the server side shut down the connection.
     452             :  * Felix-client will try to subscribe all FIDs in this list periodically until the subscription is succesful.
     453             :  *
     454             :  * @param fid: FID
     455             :  */
     456        2107 : void FelixClientContextHandler::addToFidsToResubscribe(uint64_t fid){
     457        2107 :     auto it = std::find(mResubFids.begin(), mResubFids.end(), fid);
     458        2107 :     if (it == mResubFids.end()){
     459        2107 :         mResubFids.push_back(fid);
     460             :     }
     461        2107 : }
     462             : 
     463        2387 : void FelixClientContextHandler::removeFromFidsToResubscribe(uint64_t fid){
     464        2387 :     auto it = std::find(mResubFids.begin(), mResubFids.end(), fid);
     465        2387 :     if (it != mResubFids.end()){
     466        2113 :         mResubFids.erase(it);
     467             :     }else{
     468         274 :         clog_debug("Fid not foud in mReSubFids");
     469             :     }
     470        2387 : }
     471             : 
     472             : /**
     473             :  * @brief Remove socket object
     474             :  *
     475             :  * This function removes sockets form the socketContextMaps and thereby destroying it.
     476             :  * Different cases need to be considered to take care of all references to this socket and to update the corresponding information.
     477             :  *
     478             :  * SubSocket objects must not be destroyed here, since they contain a send and listen socket that are not cleaned up yet.
     479             :  * Instead all references to the socket are removed and they object is put into a different list to destroy it later.
     480             :  *
     481             :  * @param socket: void pointer to a netio_socket
     482             :  * @param connection_refused: Status variable if the socket is removed because the connection was refused
     483             :  */
     484         139 : void FelixClientContextHandler::removeSocket(void* socket, bool connection_refused){
     485         139 :     auto socketFids = getFidsBySocket(socket);
     486         139 :     if (getConnectionType(socket) == PUBSUB ){
     487         140 :         for (auto& fid : socketFids)
     488             :         {
     489          49 :             if(!terminatingFelixClient.load()){ //necessary?
     490          49 :                 auto it = std::find(mResubFids.begin(), mResubFids.end(), fid);
     491          49 :                 if (it == mResubFids.end()){
     492           4 :                     mResubFids.push_back(fid);
     493             :                 }
     494             :             }else{
     495           0 :                 clog_debug("Already terminating felix-client not adding fids to mResubFids");
     496             :             }
     497          49 :             infoByFid.at(fid)->removeSocket();
     498          49 :             infoByFid.at(fid)->setSubscriptionState(UNSUB);
     499             :         }
     500             : 
     501          91 :         if(!connection_refused){
     502          46 :             mSubSocketsToDelete.push_back(std::move(socketContextMap[socket]->socket));
     503             :         }
     504          91 :         std::unique_lock<std::mutex> lock(sub_mtx);
     505          91 :         socketContextMap.erase(socket);
     506          91 :     } else {
     507          48 :         {
     508          48 :             std::unique_lock<std::mutex> lck(socketContextMap[socket].get()->mMux);
     509          96 :             for (auto& fid : socketFids)
     510             :             {
     511          48 :                 if(!terminatingFelixClient.load()){
     512          48 :                     auto it = std::find(mResubFids.begin(), mResubFids.end(), fid);
     513          48 :                     if (it == mResubFids.end()){
     514           2 :                         mResubFids.push_back(fid);
     515             :                     }
     516             :                 }else{
     517           0 :                     clog_debug("Already terminating felix-client not adding fids to mResubFids");
     518             :                 }
     519          48 :                 infoByFid.at(fid)->removeSocket();
     520             :             }
     521           0 :         }
     522          48 :         std::unique_lock<std::mutex> lock(send_mtx);
     523          48 :         socketContextMap.erase(socket);
     524          48 :     }
     525         139 : }
     526             : 
     527             : 
     528             : /**
     529             :  * @brief Update all information related to a subscription when unsubscribing
     530             :  *
     531             :  * This function erases the FID from the corresponding socket and the socket is removed from the fid_info.
     532             :  *
     533             :  * @param fid: FID
     534             :  */
     535        2013 : void FelixClientContextHandler::updateFidsWhenUnsub(uint64_t fid){
     536        2013 :     auto fids = &socketContextMap[mGetVoidSocketPointer(*infoByFid[fid]->getSocket())].get()->subFids;
     537        2013 :     auto it = std::find(fids->begin(), fids->end(), fid);
     538        2013 :     if (it != fids->end()){
     539        2013 :         fids->erase(it);
     540        2013 :         infoByFid[fid]->removeSocket();
     541             :     }else{
     542           0 :         clog_debug("Fid not foud in SubFids");
     543             :     }
     544        2013 : }
     545             : 
     546             : 
     547             : /**
     548             :  * @brief Updates the info from bus (usually before subscribing) or creates new felix_client_info object if it does not exist yet
     549             :  *
     550             :  * @param fid: FID
     551             :  * @param bus: pointer to FelixBus object
     552             :  * @param forRegister: determines if FID is used to get info from register (which assigns a different callback)
     553             :  */
     554       18511 : void FelixClientContextHandler::createOrUpdateInfo(uint64_t fid, felixbus::FelixBus* bus, bool forRegister){
     555       18511 :     if (infoByFid.count(fid)) {
     556       16119 :         if((fid & 0x8000) && getSocketState(fid) != DISCONNECTED){
     557             :             return;
     558             :         }
     559        2185 :         try {
     560        2185 :             mUpdateInfo(fid, bus);
     561           0 :         } catch(std::exception& e){
     562           0 :             clog_info("Error while updating info from bus: %s. FID: 0x%x", e.what(), fid);
     563           0 :             throw;
     564           0 :         }
     565             : 
     566             :     } else {
     567        2392 :         try {
     568        2392 :             mCreateInfo(fid, bus);
     569           9 :         } catch(std::exception& e){
     570           9 :             clog_info("Error while creating info from bus: %s. FID: 0x%x", e.what(), fid);
     571           9 :             throw FelixClientResourceNotAvailableException();
     572           9 :         }
     573             :     }
     574        4568 :     if(forRegister){
     575          17 :         infoByFid[fid]->setForRegister(forRegister);
     576             :     }
     577             : }
     578             : 
     579             : /**
     580             :  * @brief Creates new felix_client_info object
     581             :  *
     582             :  * The information is retrieved from the bus file. A mutex prevents concurrent access to the bus object.
     583             :  * Additionally time for the bus read is meassured and a warning is printed is the access took more than 1s.
     584             :  *
     585             :  * @param fid: FID
     586             :  * @param bus: pointer to FelixBus object
     587             :  */
     588        2392 : void FelixClientContextHandler::mCreateInfo(uint64_t fid, felixbus::FelixBus* bus){
     589        2392 :     std::error_code ec;
     590        2392 :     felixbus::FelixBusInfo bus_info;
     591        2392 :     {
     592        2392 :         struct timespec t0, t1;
     593        2392 :         clock_gettime(CLOCK_MONOTONIC_RAW, &t0);
     594        2392 :         std::lock_guard<std::mutex> lock(info_mtx);
     595        2392 :         bus_info = bus->get_info(fid, ec);
     596        2392 :         clock_gettime(CLOCK_MONOTONIC_RAW, &t1);
     597        2392 :         double bus_useconds = (t1.tv_sec - t0.tv_sec)*1e6 + (t1.tv_nsec - t0.tv_nsec)*1e-3;
     598        2392 :         if(bus_useconds > 1000000){
     599           0 :             clog_warn("Bus access took more than one second: %dus.", bus_useconds);
     600             :         }
     601           0 :     }
     602        2392 :     if (!ec) {
     603        2383 :         infoByFid[fid] = std::make_unique<FelixClientContext>();
     604        2383 :         if(bus_info.ip == ""){throw std::invalid_argument("Elink not in bus");}
     605        2383 :         infoByFid[fid]->setInfos(bus_info.ip, bus_info.port, bus_info.netio_pages, bus_info.netio_pagesize, 0.9 * bus_info.netio_pagesize, bus_info.unbuffered, bus_info.pubsub, bus_info.raw_tcp);
     606             :     } else {
     607           9 :         throw FelixClientResourceNotAvailableException();//d::exception(ec.message());
     608             :     }
     609        2383 :     clog_debug("Connection: %d type: %d for FID: 0x%x", bus_info.pubsub, bus_info.unbuffered, fid);
     610        2383 :     clog_debug("IP: %s Port: %d", bus_info.ip.c_str(), bus_info.port);
     611        2392 : }
     612             : 
     613             : /**
     614             :  * @brief Updates felix_client_info object
     615             :  *
     616             :  * The information is retrieved from the bus file. A mutex prevents concurrent access to the bus object.
     617             :  * Additionally time for the bus read is meassured and a warning is printed is the access took more than 1s.
     618             :  *
     619             :  * @param fid: FID
     620             :  * @param bus: pointer to FelixBus object
     621             :  */
     622        2185 : void FelixClientContextHandler::mUpdateInfo(uint64_t fid, felixbus::FelixBus* bus){
     623        2185 :     std::error_code ec;
     624        2185 :     felixbus::FelixBusInfo bus_info;
     625        2185 :     {
     626        2185 :         struct timespec t0, t1;
     627        2185 :         clock_gettime(CLOCK_MONOTONIC_RAW, &t0);
     628        2185 :         std::lock_guard<std::mutex> lock(info_mtx);
     629        2185 :         bus_info = bus->get_info(fid, ec);
     630        2185 :         clock_gettime(CLOCK_MONOTONIC_RAW, &t1);
     631        2185 :         double bus_useconds = (t1.tv_sec - t0.tv_sec)*1e6 + (t1.tv_nsec - t0.tv_nsec)*1e-3;
     632        2185 :         if(bus_useconds > 1000000){
     633           0 :             clog_warn("Bus access took more than one second: %dus.", bus_useconds);
     634             :         }
     635           0 :     }
     636        2185 :     if (!ec) {
     637        2185 :         auto info = std::move(infoByFid[fid]);
     638        2185 :         if (info.get() == NULL){
     639           0 :             mCreateInfo(fid, bus);
     640           0 :             return;
     641             :         }
     642        2185 :         if(bus_info.ip == ""){throw std::invalid_argument("Elink not in bus");}
     643        2185 :         info->setInfos(bus_info.ip, bus_info.port, bus_info.netio_pages, bus_info.netio_pagesize,  0.9 * bus_info.netio_pagesize, bus_info.unbuffered, bus_info.pubsub, bus_info.raw_tcp);
     644        2185 :         infoByFid[fid] = std::move(info);
     645        2185 :     } else {
     646           0 :         clog_error("Error while reading bus: %s", ec.message().c_str());
     647           0 :         throw FelixClientResourceNotAvailableException();
     648             :     }
     649        2185 : }
     650             : 
     651        1824 : void FelixClientContextHandler::setSocketToAllUnsubscribed(void* socket){
     652        1824 :     socketContextMap[socket]->closingConnection = true;
     653        1824 : }
     654             : 
     655        2484 : bool FelixClientContextHandler::addOrCreateSocket(uint64_t fid){ //returns true if a new socket was created and needs to be initialized
     656        2484 :     if(mAddSocket(fid)){return false;}
     657         550 :     else {mCreateSocket(fid);}
     658         550 :     return true;
     659             : }
     660             : 
     661             : /**
     662             :  * @brief Adds socket to felix_client_info
     663             :  *
     664             :  * Sockets are identfied by the endpoint they connect to. If a socket to the requested endpoint already exists, this socket is
     665             :  * added to the felix_client_info object of the particular FID. The stats is set to SUB automatically, since the subscription should
     666             :  * be fast and there is no exact feedback from the derver (felix-tohost) side when the subscription is actually processed.
     667             :  *
     668             :  * @param fid: FID
     669             :  */
     670        2484 : bool FelixClientContextHandler::mAddSocket(uint64_t fid){
     671        2484 :     std::string fid_address = getAddress(fid);
     672             :     //Socket already exists
     673        2555 :     for(auto&& ctx : socketContextMap){
     674        2005 :         if(ctx.second->address == fid_address && !ctx.second->closingConnection){
     675        1934 :             infoByFid[fid]->setSocket(ctx.second->socket.get());
     676        1934 :             infoByFid[fid]->setSubscriptionState(SUB);
     677        1934 :             ctx.second->subFids.push_back(fid);
     678        2484 :             return true;
     679             :         }
     680             :     }
     681             :     return false;
     682        2484 : }
     683             : 
     684             : /**
     685             :  * @brief Create new socket
     686             :  *
     687             :  * The type of socket is determined by the felix-bus and the information provided for a particular FID.
     688             :  * Afterwars the socket is added to the socketContextMap that holds all socket objects.
     689             :  *
     690             :  * @param fid: FID
     691             :  */
     692         550 : void FelixClientContextHandler::mCreateSocket(uint64_t fid){
     693         550 :     std::unique_ptr<SocketWrapper> sw_ptr;
     694         550 :     if(infoByFid[fid]->getType() == BUFFERED && infoByFid[fid]->getConnectionType() == PUBSUB){
     695         260 :         sw_ptr = std::make_unique<SocketWrapper>(SocketWrapper{netio_subscribe_socket{}});
     696         420 :     } else if (infoByFid[fid]->getType() == UNBUFFERED && infoByFid[fid]->getConnectionType() == PUBSUB){
     697         216 :         sw_ptr = std::make_unique<SocketWrapper>(SocketWrapper{netio_unbuffered_subscribe_socket{}});
     698         312 :     }else if(infoByFid[fid]->getType() == BUFFERED && infoByFid[fid]->getConnectionType() == SENDRECV){
     699         330 :         sw_ptr = std::make_unique<SocketWrapper>(SocketWrapper{netio_buffered_send_socket{}});
     700         147 :     } else if(infoByFid[fid]->getType() == UNBUFFERED && infoByFid[fid]->getConnectionType() == SENDRECV){
     701         294 :         sw_ptr = std::make_unique<SocketWrapper>(SocketWrapper{netio_send_socket{}});
     702             :     }else{
     703           0 :         return; //should not enter here
     704             :     }
     705         550 :     socketContextMap[static_cast<void*>(sw_ptr.get())] = std::make_unique<SocketContext>();
     706         550 :     auto* sctx = socketContextMap[static_cast<void*>(sw_ptr.get())].get();
     707         550 :     sctx->state = CONNECTING;
     708         550 :     sctx->closingConnection = false;
     709         550 :     sctx->subFids = std::vector<uint64_t>{fid};
     710         550 :     sctx->socket = std::move(sw_ptr);
     711         550 :     sctx->address = getAddress(fid);
     712         550 :     infoByFid[fid]->setSocket(sctx->socket.get());
     713         550 :     clog_debug("Create Socket. FID: 0x%x. Socket 0x%p should be 0x%p", fid, sctx->socket.get(), infoByFid[fid]->getSocket());
     714             :     return;
     715         550 : }
     716             : 
     717             : /**
     718             :  * @brief Returns a void pointer to all kinds of sockets
     719             :  *
     720             :  * @param wrapper: Pointer to SocketWrapper objects that contains requested socket
     721             :  */
     722       34940 : void* FelixClientContextHandler::mGetVoidSocketPointer(SocketWrapper& wrapper) {
     723       34940 :     return std::visit([](auto& containedValueRef) {
     724             :         return static_cast<void*>(&containedValueRef);
     725       34940 :     }, wrapper);
     726             : }
     727             : 
     728             : /**
     729             :  * @brief deletes subscribe sockets
     730             :  *
     731             :  * Subsocket objects must not be destroyed right away, when they are erased.
     732             :  * This function takes care about destroying the objects later, when it is save to do so.
     733             :  *
     734             :  * @param socket: Pointer to subscribe socket
     735             :  */
     736           0 : void FelixClientContextHandler::deleteSubSocket(void* socket){
     737           0 :     for(  std::vector<std::unique_ptr<SocketWrapper>> ::iterator iter = mSubSocketsToDelete.begin(); iter != mSubSocketsToDelete.end(); ++iter )
     738             :     {
     739           0 :         if( mGetVoidSocketPointer(*iter->get()) == socket )
     740             :         {
     741           0 :             mSubSocketsToDelete.erase(iter);
     742           0 :             break;
     743             :         }
     744             :     }
     745           0 : }
     746             : 
     747       14519 : void FelixClientContextHandler::pullSendTask(send_var_t& task){
     748       14519 :     mSendQueue->popTask(task);
     749       14519 : }
     750             : 
     751        4085 : void FelixClientContextHandler::pullSubTask(sub_var_t& task){
     752        4085 :     mSubQueue->popTask(task);
     753        4085 : }

Generated by: LCOV version 1.0