LCOV - code coverage report
Current view: top level - felix-client/src - felix_client_context_handler.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 276 317 87.1 %
Date: 2025-06-10 03:23:28 Functions: 51 57 89.5 %

          Line data    Source code
       1             : #include "felix/felix_client_context_handler.hpp"
       2             : #include "felix/felix_client_exception.hpp"
       3             : 
       4             : #include "clog.h"
       5             : 
       6             : /**
       7             :  * @brief Get netio_socket_type by FID
       8             :  *
       9             :  * The information is stored in the info object or this FID and is read from the bus when the info object is created.
      10             :  * @param fid: FID
      11             :  */
      12        8252 : netio_socket_type FelixClientContextHandler::getType(uint64_t fid){
      13        8252 :     return infoByFid[fid]->getType();
      14             : }
      15             : 
      16             : /**
      17             :  * @brief Get netio_socket_type from socket
      18             :  *
      19             :  * This function returns the netio_socket_type of a void pointer to a socket.
      20             :  * If the socket does not exists, it returns BUFFERED since it has to have a valid return value.
      21             :  * @param socket: void pointer to a netio_socket
      22             :  */
      23           0 : netio_socket_type FelixClientContextHandler::getType(void* socket){
      24           0 :     return std::visit(
      25           0 :     overloaded{[](netio_subscribe_socket& s) {
      26             :             return BUFFERED;
      27             :         },
      28             :         [](netio_unbuffered_subscribe_socket& s) {
      29             :             return UNBUFFERED;
      30             :         },
      31             :         [](netio_send_socket& s) {
      32             :             return UNBUFFERED;
      33             :         },
      34             :         [](netio_buffered_send_socket& s) {
      35             :             return BUFFERED;
      36             :         },
      37             :         [](std::monostate& s) {
      38             :             return BUFFERED;
      39             :         },
      40           0 :     }, *socketContextMap[socket]->socket);
      41             : }
      42             : 
      43             : /**
      44             :  * @brief Get netio_socket_connection by FID
      45             :  *
      46             :  * The information is stored in the info object or this FID and is read from the bus when the info object is created.
      47             :  * A connection type can either be of type PUBSUB or SENDRECV.
      48             :  * @param fid: FID
      49             :  */
      50        2308 : netio_socket_connection FelixClientContextHandler::getConnectionType(uint64_t fid){
      51        2308 :     return infoByFid[fid]->getConnectionType();
      52             : }
      53             : 
      54             : /**
      55             :  * @brief Get netio_socket_connection from socket
      56             :  *
      57             :  * This function returns the netio_socket_connection of a void pointer to a socket.
      58             :  * A connection type can either be of type PUBSUB or SENDRECV.
      59             :  * If the socket does not exists, it returns SENDRECV since it has to have a valid return value.
      60             :  * @param socket: void pointer to a netio_socket
      61             :  */
      62        3756 : netio_socket_connection FelixClientContextHandler::getConnectionType(void* socket){
      63        3756 :     return std::visit(
      64        3756 :     overloaded{[](netio_subscribe_socket& s) {
      65             :             return PUBSUB;
      66             :         },
      67             :         [](netio_unbuffered_subscribe_socket& s) {
      68             :             return PUBSUB;
      69             :         },
      70             :         [](netio_send_socket& s) {
      71             :             return SENDRECV;
      72             :         },
      73             :         [](netio_buffered_send_socket& s) {
      74             :             return SENDRECV;
      75             :         },
      76             :         [](std::monostate& s) {
      77             :             return SENDRECV;
      78             :         },
      79        3756 :     }, *socketContextMap[socket]->socket);
      80             : }
      81             : 
      82             : /**
      83             :  * @brief Get netio_socket_state by FID
      84             :  *
      85             :  * Returns the state of a socket that is associated to this FID. If the FID has no associated socket the function returns DISCONNECTED.
      86             :  * We must portect this call with a mutex since send_sockets are removed from the socketContextMap when a connection is closed.
      87             :  * @param fid: FID
      88             :  */
      89       11472 : netio_socket_state FelixClientContextHandler::getSocketState(uint64_t fid){
      90       11472 :     std::unique_lock<std::mutex> lck(send_mtx);
      91       11472 :     SocketWrapper* sw = infoByFid[fid]->getSocket();
      92       11472 :     return std::visit(
      93       11472 :         overloaded{[](std::monostate& s) {
      94             :                 return DISCONNECTED;
      95       10668 :             },[this](auto& s){
      96       10668 :                return socketContextMap[static_cast<void*>(&s)]->state.load();
      97             :             }
      98       11472 :         }, *sw);
      99       11472 : }
     100             : 
     101             : /**
     102             :  * @brief Get subscribe_state of a FID
     103             :  *
     104             :  * Returns the ssubscribe_state of a FID.
     105             :  * @param fid: FID
     106             :  */
     107        6624 : subscribe_state FelixClientContextHandler::getSubscriptionState(uint64_t fid){
     108        6624 :     return infoByFid[fid]->getSubscriptionState();
     109             : }
     110             : 
     111             : /**
     112             :  * @brief Returns a map with timestamps to measure subscription times
     113             :  *
     114             :  * The first timestamp is taken in the subscribe function, called by the user, the second one when the subscribe is processed by the eventloop.
     115             :  * @param fid: FID
     116             :  */
     117        2177 : std::unordered_map<std::string, timespec> FelixClientContextHandler::getSubscriptionTimes(uint64_t fid){
     118        2177 :     return infoByFid[fid]->getSubscriptionTimes();
     119             : }
     120             : 
     121         553 : uint FelixClientContextHandler::getWatermark(uint64_t fid){
     122         553 :     return infoByFid[fid]->getWatermark();
     123             : }
     124             : 
     125        1025 : uint FelixClientContextHandler::getPagesize(uint64_t fid){
     126        1025 :     return infoByFid[fid]->getPagesize();
     127             : }
     128             : 
     129         575 : uint FelixClientContextHandler::getPages(uint64_t fid){
     130         575 :     return infoByFid[fid]->getPages();
     131             : }
     132             : 
     133        1968 : uint FelixClientContextHandler::getPort(uint64_t fid){
     134        1968 :     return infoByFid[fid]->getPort();
     135             : }
     136             : 
     137        1968 : std::string FelixClientContextHandler::getIp(uint64_t fid){
     138        1968 :     return infoByFid[fid]->getIp();
     139             : }
     140             : 
     141        5472 : std::string FelixClientContextHandler::getAddress(uint64_t fid){
     142       16416 :     return infoByFid[fid]->getIp() + ":" + std::to_string(infoByFid[fid]->getPort());
     143             : }
     144             : 
     145        3762 : std::vector<uint64_t> FelixClientContextHandler::getFidsToResubscribe(){
     146        3762 :     return mResubFids;
     147             : }
     148             : 
     149        6268 : std::vector<uint64_t> FelixClientContextHandler::getFidsBySocket(void* socket){
     150        6268 :     return socketContextMap[socket]->subFids;
     151             : }
     152             : 
     153             : 
     154             : /**
     155             :  * @brief Get vector of all subscribed FIDs
     156             :  *
     157             :  * This function returns a vector of all FIDs that are subscribed via a subscribe socket right now.
     158             :  * This list is used in the destructor to unsubscribe and close a connection properly before we delete the associated subscribe socket.
     159             :  */
     160         643 : std::vector<uint64_t> FelixClientContextHandler::getFidsToUnsubscribe(){
     161         643 :     std::vector<uint64_t> allFids;
     162        1352 :     std::for_each(socketContextMap.begin(), socketContextMap.end(), [&allFids](auto& ctx_it){std::copy(ctx_it.second->subFids.begin(), ctx_it.second->subFids.end(), std::back_inserter(allFids));});
     163        1286 :     std::vector<uint64_t> fidsToUnsub(allFids.size());
     164        3201 :     auto end_it = std::copy_if(allFids.begin(), allFids.end(), fidsToUnsub.begin(), [this](auto& fid){return (this->infoByFid[fid]->getSubscriptionState() == SUB || this->infoByFid[fid]->getSubscriptionState() == SUBING)&&( this->infoByFid[fid]->getConnectionType() == PUBSUB);});
     165         643 :     fidsToUnsub.resize(std::distance(fidsToUnsub.begin(), end_it));
     166         643 :     return fidsToUnsub;
     167         643 : }
     168             : 
     169             : 
     170             : /**
     171             :  * @brief Get vector of all send sockets
     172             :  *
     173             :  * This function returns a vector of pointers to all send sockets that are connected right now.
     174             :  * The function iterates over all sockets in the socketContextMap and adds the pointer of netio_send_socket (also in buffered case) to the vector.
     175             :  * This list is used in the destructor to properly shutdown the connection to the server side via netio_disconnect() before deleting the socket object.
     176             :  */
     177         595 : std::vector<netio_send_socket*> FelixClientContextHandler::getSendSockets(){
     178         595 :     std::vector<netio_send_socket*> sendSockets;
     179        1256 :     for (auto& ctx_it : socketContextMap){
     180         661 :         netio_send_socket* socket = std::visit(
     181         661 :         overloaded{[](netio_subscribe_socket& s) {
     182             :                 return static_cast<netio_send_socket*>(NULL);
     183             :             },
     184             :             [](netio_unbuffered_subscribe_socket& s) {
     185             :                 return static_cast<netio_send_socket*>(NULL);
     186             :             },
     187             :             [](netio_send_socket& s) {
     188             :                 return &s;
     189             :             },
     190         267 :             [](netio_buffered_send_socket& s) {
     191         267 :                 return &(s.send_socket);
     192             :             },
     193             :             [](std::monostate& s) {
     194             :                 return static_cast<netio_send_socket*>(NULL);
     195             :             },
     196         661 :         }, *ctx_it.second->socket);
     197         661 :         if(socket){
     198         501 :             sendSockets.push_back(socket);
     199             :         }
     200             :     }
     201         595 :     return sendSockets;
     202           0 : }
     203             : 
     204             : /**
     205             :  * @brief Returns a map of void pointers to subscribe socket and the associated netio_socket_type
     206             :  *
     207             :  * Since we need to distinguish between buffered and unbuffered subscribe sockets when we want to clean them up, this function returns void pointer to
     208             :  * all subscribe sockets and their netio_socket_type stored in map.
     209             :  * This map/list is used in the destructor to properly shutdown the subscribe socket after all FIDs are unsubscribed.
     210             :  */
     211           0 : std::unordered_map<void*, netio_socket_type> FelixClientContextHandler::getSubSocketsToDelete(){
     212           0 :     std::unordered_map<void*, netio_socket_type> voidSubSocketsToDelete;
     213           0 :     for (auto& it :mSubSocketsToDelete){
     214           0 :         netio_socket_type type = std::visit(
     215           0 :             overloaded{[](netio_subscribe_socket& s) {
     216             :                 return BUFFERED;
     217             :             },
     218             :             [](netio_unbuffered_subscribe_socket& s) {
     219             :                 return UNBUFFERED;
     220             :             },
     221             :             [](netio_send_socket& s) {
     222             :                 return UNBUFFERED;
     223             :             },
     224             :             [](netio_buffered_send_socket& s) {
     225             :                 return BUFFERED;
     226             :             },
     227             :             [](std::monostate& s) {
     228             :                 return BUFFERED;
     229             :             },
     230           0 :         }, *it);
     231           0 :         voidSubSocketsToDelete[mGetVoidSocketPointer(*it)] = type;
     232             :     }
     233           0 :     return voidSubSocketsToDelete;
     234           0 : }
     235             : 
     236             : 
     237         351 : bool FelixClientContextHandler::isForRegister(uint64_t fid){
     238         351 :     return infoByFid[fid]->getForRegister();
     239             : }
     240             : 
     241             : /**
     242             :  * @brief Returns true is FID is subscribed
     243             :  *
     244             :  * @param fid: FID
     245             :  */
     246        1150 : bool FelixClientContextHandler::isSubscribed(uint64_t fid){
     247        1150 :     if (infoByFid.count(fid) > 0){
     248        1150 :         return (infoByFid[fid]->getSubscriptionState() == SUB);
     249             :     } else {
     250           0 :         return false;
     251             :     }
     252             : }
     253             : 
     254             : /**
     255             :  * @brief Function blocks until all FIDs are subscribed or the timeout expired.
     256             :  *
     257             :  * The function can be used to wait for the subscription of one or more FIDs was successful or the timeot expired.
     258             :  * To achieve this, the function adds a multiWaitCtx object to all FID info objects, which has in internal counter that is increased for every
     259             :  * subscription that was successful. The conditional variable "multiWaitCtx->allSubscribed" uses the std function wait_for to determine if the timeout expires
     260             :  * or the condition is met.
     261             :  * Note: The wait_for() function requires a unique_lock on a mutex. Since we do not need to synchronize something, it is sufficient to use a local mutex/lock
     262             :  * @param fids: vector fo FIDs tha are checked for their subscription state
     263             :  * @param timeoutMs:timeout in ms after which the function returns even when not all FIDs are subscribed.
     264             :  */
     265         436 : bool FelixClientContextHandler::isSubscribed(std::vector<uint64_t> fids, uint64_t timeoutMs){
     266         436 :     auto multiWaitCtx = std::make_unique<MultiFidsWaitContext>();
     267         436 :     multiWaitCtx->expectedSize = fids.size();
     268        2018 :     for(auto fid : fids){
     269        1582 :         if (infoByFid.count(fid) > 0){
     270        1582 :             if(infoByFid[fid]->getSubscriptionState() != SUB){
     271        1582 :                 infoByFid[fid]->setWaitContext(multiWaitCtx.get());
     272             :             }
     273           0 :             else{multiWaitCtx->count++;}
     274             :         }
     275             :     }
     276         436 :     clog_debug("Waiting for %d fids to subscribe. Count: %d", multiWaitCtx->expectedSize, multiWaitCtx->count);
     277         436 :     bool status = true;
     278         436 :     if(multiWaitCtx->count < multiWaitCtx->expectedSize ){
     279         432 :         auto timeExpiredMs = std::chrono::milliseconds(0);
     280         432 :         std::mutex m;
     281         432 :         std::unique_lock<std::mutex> lck(m);
     282         432 :         auto t0 = std::chrono::steady_clock::now();
     283        1308 :         while (multiWaitCtx->count != multiWaitCtx->expectedSize && timeExpiredMs < std::chrono::milliseconds(timeoutMs)){
     284         444 :             status = (multiWaitCtx->allSubscribed.wait_for(lck, std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::milliseconds(timeoutMs) - timeExpiredMs)) == std::cv_status::no_timeout);
     285         444 :             timeExpiredMs = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - t0);
     286             :         }
     287         432 :     }
     288        2018 :     for (auto fid : fids){
     289        1582 :         infoByFid[fid]->clearWaitContext();
     290             :     }
     291         436 :     return status;
     292         892 : }
     293             : 
     294             : /**
     295             :  * @brief Checks if all FIDs are unsubscribed
     296             :  *
     297             :  * Since this function iterates over all sockets, we need to make sure that no socket is removed from the socketContextMap while we are are iterating.
     298             :  * Because the socketContextMap contains sockets of all types, a scoped_lock is used to lock the send_mtx and sub_mtx.
     299             :  */
     300        2704 : bool FelixClientContextHandler::areAllFidsUnsubscribed(){
     301        2704 :     std::scoped_lock lock(send_mtx, sub_mtx);
     302        3383 :     for (auto& ctx_it : socketContextMap){
     303        2836 :         clog_debug("socket 0x%p has state 0x%p", ctx_it.first, netio_socket_state_name(ctx_it.second->state.load()));
     304        2836 :         if(ctx_it.second->state != DISCONNECTED && getConnectionType(ctx_it.first) == PUBSUB){
     305        2704 :             return false;
     306             :         }
     307             :     }
     308             :     return true;
     309        2704 : }
     310             : 
     311             : 
     312             : /**
     313             :  * @brief Checks if all FIDs associated with a particular socket are unsubscribed
     314             :  *
     315             :  * This function checks if there are FIDs associated to the socket, that are not ualready nscubscribed or currently unsubscribing.
     316             :  * @param socket: void pointer to a netio_socket. (should be a subscribe_socket but will also work with send_socket)
     317             :  */
     318        2167 : bool FelixClientContextHandler::areAllFidsUnsubscribed(void* socket){
     319        2167 :     auto fids = getFidsBySocket(socket);
     320       93042 :     int i = std::count_if(fids.begin(), fids.end(), [this](auto& fid){return (this->infoByFid[fid]->getSubscriptionState() != UNSUB && this->infoByFid[fid]->getSubscriptionState() != UNSUBING);});
     321        2167 :     clog_debug("Remaining subscritions: %d", i);
     322        2167 :     return i;
     323        2167 : }
     324             : 
     325             : 
     326             : /**
     327             :  * @brief Check if felix-client can subscribe to this FID
     328             :  *
     329             :  * FIDs are not eligible to subscribe if we don't have a felix_client_info object for this FID or if the FID was unsubscribed before.
     330             :  *
     331             :  * TODO:Maybe we also need to check what we want to do here if the FID is in the process of unsubscribing but not done yet
     332             :  *
     333             :  * @param fid: FID
     334             :  */
     335        2316 : bool FelixClientContextHandler::canSubscribe(uint64_t fid){
     336        2316 :     if (infoByFid.count(fid) == 0 || (infoByFid[fid]->getSubscriptionState() == UNSUB)) {
     337        2288 :         return true;
     338             :     } else {
     339             :         return false;
     340             :     }
     341             : }
     342             : 
     343             : 
     344             : /**
     345             :  * @brief Checks if all felix_client_info object for this FID already exists.
     346             :  *
     347             :  * @param fid: FID
     348             :  */
     349        5410 : bool FelixClientContextHandler::exists(uint64_t fid){
     350        5410 :     return infoByFid.count(fid);
     351             : }
     352             : 
     353             : /**
     354             :  * @brief checks if all socket for this void pointer still exists.
     355             :  *
     356             :  * @param socket: void pointer to socket of any type.
     357             :  */
     358        3824 : bool FelixClientContextHandler::exists(void* socket){
     359        3824 :     return socketContextMap.count(socket);
     360             : }
     361             : 
     362             : 
     363             : /**
     364             :  * @brief Function that waits for a socket associated to this FID to be connected or timeout
     365             :  *
     366             :  * This function retrives the socket associated to this FID and checks its connection state.
     367             :  * If it is currently connecting, it will wait on a condition variable that is notified in the on_connection_established() callback.
     368             :  * If hte connection is not established within the specified timeout, the function returns fails for not connected.
     369             :  *
     370             :  * @param fid: FID
     371             :  * @param timeoutMs: timeout [ms]
     372             :  *
     373             :  */
     374         529 : bool FelixClientContextHandler::waitConnected(uint64_t fid, uint64_t timeoutMs){
     375         529 :     if (infoByFid.count(fid) > 0){
     376         529 :         SocketWrapper* sw = infoByFid[fid]->getSocket();
     377         529 :         return std::visit(
     378         529 :         overloaded{[](std::monostate& s) {
     379             :                 return false;
     380         529 :             },[this, timeoutMs, fid](auto& s){
     381         529 :                 if(socketContextMap[static_cast<void*>(&s)].get()->state.load()==CONNECTED){
     382             :                     return true;
     383         529 :                 } else if(socketContextMap[static_cast<void*>(&s)].get()->state.load()==DISCONNECTED){
     384             :                     return false;
     385             :                 }else{
     386         529 :                     auto timeExpiredMs = std::chrono::milliseconds(0);
     387         529 :                     std::unique_lock<std::mutex> lck(socketContextMap[static_cast<void*>(&s)].get()->mMux);
     388         529 :                     clog_debug("Start waiting, fid: 0x%x with socket 0x%p", fid, socketContextMap[static_cast<void*>(&s)].get());
     389         529 :                     auto t0 = std::chrono::steady_clock::now();
     390        1058 :                     while(socketContextMap[static_cast<void*>(&s)].get()->state.load()==CONNECTING && timeExpiredMs < std::chrono::milliseconds(timeoutMs)){
     391         529 :                         socketContextMap[static_cast<void*>(&s)].get()->mConnectionWait.wait_for(lck, std::chrono::milliseconds(timeoutMs));
     392         529 :                         timeExpiredMs = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - t0);
     393             :                     }
     394         529 :                     return (socketContextMap[static_cast<void*>(&s)].get()->state.load() == CONNECTED);
     395         529 :                 }
     396             :             }
     397             :         }, *sw);
     398             :     } else {
     399           0 :         return false;
     400             :     }
     401             : }
     402             : 
     403        4443 : void FelixClientContextHandler::setSubscriptionTimes(uint64_t fid, std::string name, timespec ts){
     404        8886 :     return infoByFid[fid]->setSubscriptionTimes(name, ts);
     405             : }
     406             : 
     407             : /**
     408             :  * @brief Setting socket state
     409             :  *
     410             :  * If the new state is CONNECTED or DISCONNECTED, the a condition variable is notified to inform the subscribing thread if the operation was successful or not
     411             :  *
     412             :  * @param fid: FID
     413             :  * @param s: New socket state
     414             :  *
     415             :  */
     416           0 : void FelixClientContextHandler::setSocketState(uint64_t fid, netio_socket_state s){
     417           0 :     if (infoByFid.count(fid) > 0){
     418           0 :         SocketWrapper* sw = infoByFid[fid]->getSocket();
     419           0 :         clog_debug("For socket: 0x%p new state %d", mGetVoidSocketPointer(*sw), s);
     420           0 :         socketContextMap[mGetVoidSocketPointer(*sw)]->state = s;
     421           0 :         if (s == CONNECTED || s == DISCONNECTED){socketContextMap[mGetVoidSocketPointer(*sw)]->mConnectionWait.notify_all();}
     422             :     }
     423           0 : }
     424             : 
     425             : /**
     426             :  * @brief Setting socket state
     427             :  *
     428             :  * If the new state is CONNECTED or DISCONNECTED, the a condition variable is notified to inform the subscribing thread if the operation was successful or not
     429             :  *
     430             :  * @param socket: void pointer to a netio_socket
     431             :  * @param s: New socket state
     432             :  *
     433             :  */
     434        1693 : void FelixClientContextHandler::setSocketState(void* socket, netio_socket_state s){
     435        1693 :     socketContextMap[socket]->state = s;
     436        1693 :     if (s == CONNECTED || s == DISCONNECTED){socketContextMap[socket]->mConnectionWait.notify_all();}
     437        1693 : }
     438             : 
     439        9154 : void FelixClientContextHandler::setSubscriptionState(uint64_t fid, subscribe_state s){
     440        9154 :     infoByFid[fid]->setSubscriptionState(s);
     441        9154 : }
     442             : 
     443             : 
     444             : /**
     445             :  * @brief Adds FID to a set of FIDs that need to subscribe
     446             :  *
     447             :  * FIDs get either added to the list during the first subscription, if they are not subscribed yet or if the server side shut down the connection.
     448             :  * Felix-client will try to subscribe all FIDs in this list periodically until the subscription is successful.
     449             :  *
     450             :  * @param fid: FID
     451             :  */
     452        2169 : void FelixClientContextHandler::addToFidsToResubscribe(uint64_t fid){
     453        2169 :     auto it = std::find(mResubFids.begin(), mResubFids.end(), fid);
     454        2169 :     if (it == mResubFids.end()){
     455        2169 :         mResubFids.push_back(fid);
     456             :     }
     457        2169 : }
     458             : 
     459        2700 : void FelixClientContextHandler::removeFromFidsToResubscribe(uint64_t fid){
     460        2700 :     auto it = std::find(mResubFids.begin(), mResubFids.end(), fid);
     461        2700 :     if (it != mResubFids.end()){
     462        2181 :         mResubFids.erase(it);
     463             :     }else{
     464         519 :         clog_debug("Fid not foud in mReSubFids");
     465             :     }
     466        2700 : }
     467             : 
     468             : /**
     469             :  * @brief Remove socket object
     470             :  *
     471             :  * This function removes sockets form the socketContextMaps and thereby destroying it.
     472             :  * Different cases need to be considered to take care of all references to this socket and to update the corresponding information.
     473             :  *
     474             :  * SubSocket objects must not be destroyed here, since they contain a send and listen socket that are not cleaned up yet.
     475             :  * Instead all references to the socket are removed and they object is put into a different list to destroy it later.
     476             :  *
     477             :  * @param socket: void pointer to a netio_socket
     478             :  * @param connection_refused: Status variable if the socket is removed because the connection was refused
     479             :  */
     480         307 : void FelixClientContextHandler::removeSocket(void* socket, bool connection_refused){
     481         307 :     auto socketFids = getFidsBySocket(socket);
     482         307 :     if (getConnectionType(socket) == PUBSUB ){
     483         298 :         for (auto& fid : socketFids)
     484             :         {
     485         107 :             if(!terminatingFelixClient.load()){ //necessary?
     486         107 :                 auto it = std::find(mResubFids.begin(), mResubFids.end(), fid);
     487         107 :                 if (it == mResubFids.end()){
     488          10 :                     mResubFids.push_back(fid);
     489             :                 }
     490             :             }else{
     491           0 :                 clog_debug("Already terminating felix-client not adding fids to mResubFids");
     492             :             }
     493         107 :             infoByFid.at(fid)->removeSocket();
     494         107 :             infoByFid.at(fid)->setSubscriptionState(UNSUB);
     495             :         }
     496             : 
     497         191 :         if(!connection_refused){
     498          94 :             mSubSocketsToDelete.push_back(std::move(socketContextMap[socket]->socket));
     499             :         }
     500         191 :         std::unique_lock<std::mutex> lock(sub_mtx);
     501         191 :         socketContextMap.erase(socket);
     502         191 :     } else {
     503         116 :         {
     504         116 :             std::unique_lock<std::mutex> lck(socketContextMap[socket].get()->mMux);
     505         232 :             for (auto& fid : socketFids)
     506             :             {
     507         116 :                 if(!terminatingFelixClient.load()){
     508         116 :                     auto it = std::find(mResubFids.begin(), mResubFids.end(), fid);
     509         116 :                     if (it == mResubFids.end()){
     510           6 :                         mResubFids.push_back(fid);
     511             :                     }
     512             :                 }else{
     513           0 :                     clog_debug("Already terminating felix-client not adding fids to mResubFids");
     514             :                 }
     515         116 :                 infoByFid.at(fid)->removeSocket();
     516             :             }
     517           0 :         }
     518         116 :         std::unique_lock<std::mutex> lock(send_mtx);
     519         116 :         socketContextMap.erase(socket);
     520         116 :     }
     521         307 : }
     522             : 
     523             : 
     524             : /**
     525             :  * @brief Update all information related to a subscription when unsubscribing
     526             :  *
     527             :  * This function erases the FID from the corresponding socket and the socket is removed from the fid_info.
     528             :  *
     529             :  * @param fid: FID
     530             :  */
     531        2167 : void FelixClientContextHandler::updateFidsWhenUnsub(uint64_t fid){
     532        2167 :     auto fids = &socketContextMap[mGetVoidSocketPointer(*infoByFid[fid]->getSocket())].get()->subFids;
     533        2167 :     auto it = std::find(fids->begin(), fids->end(), fid);
     534        2167 :     if (it != fids->end()){
     535        2167 :         fids->erase(it);
     536        2167 :         infoByFid[fid]->removeSocket();
     537             :     }else{
     538           0 :         clog_warn("Fid not foud in SubFids");
     539             :     }
     540        2167 : }
     541             : 
     542             : 
     543             : /**
     544             :  * @brief Updates the info from bus (usually before subscribing) or creates new felix_client_info object if it does not exist yet
     545             :  *
     546             :  * @param fid: FID
     547             :  * @param bus: pointer to FelixBus object
     548             :  * @param forRegister: determines if FID is used to get info from register (which assigns a different callback)
     549             :  */
     550        5405 : void FelixClientContextHandler::createOrUpdateInfo(uint64_t fid, felixbus::FelixBus* bus, bool forRegister){
     551        5405 :     if (infoByFid.count(fid)) {
     552        2701 :         if((fid & 0x8000) && getSocketState(fid) != DISCONNECTED){
     553             :             return;
     554             :         }
     555        2332 :         try {
     556        2332 :             mUpdateInfo(fid, bus);
     557           4 :         } catch(std::exception& e){
     558           4 :             clog_info("Error while updating info from bus: %s. FID: 0x%x", e.what(), fid);
     559           4 :             throw;
     560           4 :         }
     561             : 
     562             :     } else {
     563        2704 :         try {
     564        2704 :             mCreateInfo(fid, bus);
     565          12 :         } catch(std::exception& e){
     566          12 :             clog_info("Error while creating info from bus: %s. FID: 0x%x", e.what(), fid);
     567          12 :             throw FelixClientResourceNotAvailableException();
     568          12 :         }
     569             :     }
     570        5020 :     if(forRegister){
     571          34 :         infoByFid[fid]->setForRegister(forRegister);
     572             :     }
     573             : }
     574             : 
     575             : /**
     576             :  * @brief Creates new felix_client_info object
     577             :  *
     578             :  * The information is retrieved from the bus file. A mutex prevents concurrent access to the bus object.
     579             :  * Additionally time for the bus read is measured and a warning is printed is the access took more than 1s.
     580             :  *
     581             :  * @param fid: FID
     582             :  * @param bus: pointer to FelixBus object
     583             :  */
     584        2704 : void FelixClientContextHandler::mCreateInfo(uint64_t fid, felixbus::FelixBus* bus){
     585        2704 :     std::error_code ec;
     586        2704 :     felixbus::FelixBusInfo bus_info;
     587        2704 :     {
     588        2704 :         struct timespec t0, t1;
     589        2704 :         clock_gettime(CLOCK_MONOTONIC_RAW, &t0);
     590        2704 :         std::lock_guard<std::mutex> lock(info_mtx);
     591        2704 :         bus_info = bus->get_info(fid, ec);
     592        2704 :         clock_gettime(CLOCK_MONOTONIC_RAW, &t1);
     593        2704 :         double bus_useconds = (t1.tv_sec - t0.tv_sec)*1e6 + (t1.tv_nsec - t0.tv_nsec)*1e-3;
     594        2704 :         if(bus_useconds > 1000000){
     595           0 :             clog_warn("Bus access took more than one second: %dus.", bus_useconds);
     596             :         }
     597           0 :     }
     598        2704 :     if (!ec) {
     599        2692 :         infoByFid[fid] = std::make_unique<FelixClientContext>();
     600        2692 :         if(bus_info.ip == ""){throw std::invalid_argument("Elink not in bus");}
     601        2692 :         infoByFid[fid]->setInfos(bus_info.ip, bus_info.port, bus_info.netio_pages, bus_info.netio_pagesize, 0.9 * bus_info.netio_pagesize, bus_info.unbuffered, bus_info.pubsub);
     602             : 
     603             :     } else {
     604          12 :         throw FelixClientResourceNotAvailableException();//d::exception(ec.message());
     605             :     }
     606        2692 :     clog_debug("Connection: %d type: %d for FID: 0x%x", bus_info.pubsub, bus_info.unbuffered, fid);
     607        2692 :     clog_debug("IP: %s Port: %d", bus_info.ip.c_str(), bus_info.port);
     608        2704 : }
     609             : 
     610             : /**
     611             :  * @brief Updates felix_client_info object
     612             :  *
     613             :  * The information is retrieved from the bus file. A mutex prevents concurrent access to the bus object.
     614             :  * Additionally time for the bus read is measured and a warning is printed is the access took more than 1s.
     615             :  *
     616             :  * @param fid: FID
     617             :  * @param bus: pointer to FelixBus object
     618             :  */
     619        2332 : void FelixClientContextHandler::mUpdateInfo(uint64_t fid, felixbus::FelixBus* bus){
     620        2332 :     std::error_code ec;
     621        2332 :     felixbus::FelixBusInfo bus_info;
     622        2332 :     {
     623        2332 :         struct timespec t0, t1;
     624        2332 :         clock_gettime(CLOCK_MONOTONIC_RAW, &t0);
     625        2332 :         std::lock_guard<std::mutex> lock(info_mtx);
     626        2332 :         bus_info = bus->get_info(fid, ec);
     627        2332 :         clock_gettime(CLOCK_MONOTONIC_RAW, &t1);
     628        2332 :         double bus_useconds = (t1.tv_sec - t0.tv_sec)*1e6 + (t1.tv_nsec - t0.tv_nsec)*1e-3;
     629        2332 :         if(bus_useconds > 1000000){
     630           0 :             clog_warn("Bus access took more than one second: %dus.", bus_useconds);
     631             :         }
     632           0 :     }
     633        2332 :     if (!ec) {
     634        2328 :         auto info = std::move(infoByFid[fid]);
     635        2328 :         if (info.get() == NULL){
     636           0 :             mCreateInfo(fid, bus);
     637           0 :             return;
     638             :         }
     639        2328 :         if(bus_info.ip == ""){throw std::invalid_argument("Elink not in bus");}
     640        2328 :         info->setInfos(bus_info.ip, bus_info.port, bus_info.netio_pages, bus_info.netio_pagesize,  0.9 * bus_info.netio_pagesize, bus_info.unbuffered, bus_info.pubsub);
     641        2328 :         infoByFid[fid] = std::move(info);
     642        2328 :     } else {
     643           4 :         clog_error("Error while reading bus: %s", ec.message().c_str());
     644           4 :         throw FelixClientResourceNotAvailableException();
     645             :     }
     646        2332 : }
     647             : 
     648        1923 : void FelixClientContextHandler::setSocketToAllUnsubscribed(void* socket){
     649        1923 :     socketContextMap[socket]->closingConnection = true;
     650        1923 : }
     651             : 
     652        2915 : bool FelixClientContextHandler::addOrCreateSocket(uint64_t fid){ //returns true if a new socket was created and needs to be initialized
     653        2915 :     if(mAddSocket(fid)){return false;}
     654         968 :     else {mCreateSocket(fid);}
     655         968 :     return true;
     656             : }
     657             : 
     658             : /**
     659             :  * @brief Adds socket to felix_client_info
     660             :  *
     661             :  * Sockets are identified by the endpoint they connect to. If a socket to the requested endpoint already exists, this socket is
     662             :  * added to the felix_client_info object of the particular FID. The stats is set to SUB automatically, since the subscription should
     663             :  * be fast and there is no exact feedback from the derver (felix-tohost) side when the subscription is actually processed.
     664             :  *
     665             :  * @param fid: FID
     666             :  */
     667        2915 : bool FelixClientContextHandler::mAddSocket(uint64_t fid){
     668        2915 :     std::string fid_address = getAddress(fid);
     669             :     //Socket already exists
     670        3073 :     for(auto&& ctx : socketContextMap){
     671        2105 :         if(ctx.second->address == fid_address && !ctx.second->closingConnection){
     672        1947 :             infoByFid[fid]->setSocket(ctx.second->socket.get());
     673        1947 :             infoByFid[fid]->setSubscriptionState(SUB);
     674        1947 :             ctx.second->subFids.push_back(fid);
     675        2915 :             return true;
     676             :         }
     677             :     }
     678             :     return false;
     679        2915 : }
     680             : 
     681             : /**
     682             :  * @brief Create new socket
     683             :  *
     684             :  * The type of socket is determined by the felix-bus and the information provided for a particular FID.
     685             :  * Afterwars the socket is added to the socketContextMap that holds all socket objects.
     686             :  *
     687             :  * @param fid: FID
     688             :  */
     689         968 : void FelixClientContextHandler::mCreateSocket(uint64_t fid){
     690         968 :     std::unique_ptr<SocketWrapper> sw_ptr;
     691         968 :     if(infoByFid[fid]->getType() == BUFFERED && infoByFid[fid]->getConnectionType() == PUBSUB){
     692         218 :         sw_ptr = std::make_unique<SocketWrapper>(SocketWrapper{netio_subscribe_socket{}});
     693         750 :     } else if (infoByFid[fid]->getType() == UNBUFFERED && infoByFid[fid]->getConnectionType() == PUBSUB){
     694         133 :         sw_ptr = std::make_unique<SocketWrapper>(SocketWrapper{netio_unbuffered_subscribe_socket{}});
     695         617 :     }else if(infoByFid[fid]->getType() == BUFFERED && infoByFid[fid]->getConnectionType() == SENDRECV){
     696         335 :         sw_ptr = std::make_unique<SocketWrapper>(SocketWrapper{netio_buffered_send_socket{}});
     697         282 :     } else if(infoByFid[fid]->getType() == UNBUFFERED && infoByFid[fid]->getConnectionType() == SENDRECV){
     698         282 :         sw_ptr = std::make_unique<SocketWrapper>(SocketWrapper{netio_send_socket{}});
     699             :     }else{
     700           0 :         return; //should not enter here
     701             :     }
     702         968 :     socketContextMap[static_cast<void*>(sw_ptr.get())] = std::make_unique<SocketContext>();
     703         968 :     auto* sctx = socketContextMap[static_cast<void*>(sw_ptr.get())].get();
     704         968 :     sctx->state = CONNECTING;
     705         968 :     sctx->closingConnection = false;
     706         968 :     sctx->subFids = std::vector<uint64_t>{fid};
     707         968 :     sctx->socket = std::move(sw_ptr);
     708         968 :     sctx->address = getAddress(fid);
     709         968 :     infoByFid[fid]->setSocket(sctx->socket.get());
     710         968 :     clog_debug("Create Socket. FID: 0x%x. Socket 0x%p should be 0x%p", fid, sctx->socket.get(), infoByFid[fid]->getSocket());
     711             :     return;
     712         968 : }
     713             : 
     714             : /**
     715             :  * @brief Returns a void pointer to all kinds of sockets
     716             :  *
     717             :  * @param wrapper: Pointer to SocketWrapper objects that contains requested socket
     718             :  */
     719        9041 : void* FelixClientContextHandler::mGetVoidSocketPointer(SocketWrapper& wrapper) {
     720        9041 :     return std::visit([](auto& containedValueRef) {
     721             :         return static_cast<void*>(&containedValueRef);
     722        9041 :     }, wrapper);
     723             : }
     724             : 
     725             : /**
     726             :  * @brief deletes subscribe sockets
     727             :  *
     728             :  * Subsocket objects must not be destroyed right away, when they are erased.
     729             :  * This function takes care about destroying the objects later, when it is save to do so.
     730             :  *
     731             :  * @param socket: Pointer to subscribe socket
     732             :  */
     733           0 : void FelixClientContextHandler::deleteSubSocket(void* socket){
     734           0 :     for(  std::vector<std::unique_ptr<SocketWrapper>> ::iterator iter = mSubSocketsToDelete.begin(); iter != mSubSocketsToDelete.end(); ++iter )
     735             :     {
     736           0 :         if( mGetVoidSocketPointer(*iter->get()) == socket )
     737             :         {
     738           0 :             mSubSocketsToDelete.erase(iter);
     739           0 :             break;
     740             :         }
     741             :     }
     742           0 : }
     743             : 
     744        1471 : void FelixClientContextHandler::pullSendTask(send_var_t& task){
     745        1471 :     mSendQueue->popTask(task);
     746        1471 : }
     747             : 
     748        4287 : void FelixClientContextHandler::pullSubTask(sub_var_t& task){
     749        4287 :     mSubQueue->popTask(task);
     750        4287 : }

Generated by: LCOV version 1.0