LCOV - code coverage report
Current view: top level - src - pubsub.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 359 450 79.8 %
Date: 2025-11-26 02:09:04 Functions: 33 40 82.5 %

          Line data    Source code
       1             : #include <stdio.h>
       2             : #include <string.h>
       3             : #include <assert.h>
       4             : #include <netdb.h>
       5             : #include "log.h"
       6             : #include "netio/netio.h"
       7             : #include "netio/netio_tcp.h"
       8             : 
       9             : #if defined DEBUG || defined DEBUG_PUB
      10             : #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
      11             : #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
      12             : #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
      13             : #else
      14             : #define log_dbg(...)
      15             : #define log_trc(...)
      16             : #endif
      17             : 
      18             : 
      19             : char*
      20         157 : netio_domain_name_lookup(const char* domain_name)
      21             : {
      22         157 :   if(!domain_name) return NULL;
      23             :   struct sockaddr_in sock_address;
      24         157 :   char* ip_address = (char*)malloc(sizeof(char) * 17);
      25         157 :   int is_ip_address = inet_pton(AF_INET, domain_name, &(sock_address.sin_addr));
      26         157 :   if(!is_ip_address)
      27             :   {
      28           4 :     struct hostent* host = gethostbyname(domain_name);
      29           4 :     if(host)
      30             :     {
      31           2 :       strcpy(ip_address, inet_ntoa(*(struct in_addr *)host->h_addr));
      32             :     }
      33             :     else
      34             :     {
      35           2 :       char* _domain_name = malloc(sizeof(char) * (strlen(domain_name) + 1));
      36           2 :       strcpy(_domain_name, domain_name);
      37           2 :       log_error("The host cannot be resolved. Domain name set to %s", _domain_name);
      38           2 :       free(ip_address);
      39           2 :       return _domain_name;
      40             :     }
      41             :   }
      42             :   else
      43             :   {
      44         153 :     strcpy(ip_address, domain_name);
      45             :   }
      46             : 
      47             :   return ip_address;
      48             : }
      49             : 
      50             : static int
      51       10102 : cmp_subscription(const void* a, const void *b)
      52             : {
      53             :   struct netio_subscription* suba = (struct netio_subscription*)a;
      54             :   struct netio_subscription* subb = (struct netio_subscription*)b;
      55             : 
      56       10102 :         if(suba->tag == subb->tag) {
      57             :                 return 0;
      58             :         }
      59           0 :         return suba->tag > subb->tag ? 1 : -1;
      60             : }
      61             : 
      62             : static int
      63           6 : table_add_subscription(struct netio_subscription_table* table, netio_tag_t tag, struct netio_buffered_send_socket* socket)
      64             : {
      65           6 :     if(table->num_subscriptions == table->size) {
      66           0 :       log_error("Maximum number of subscriptions. New subscription for %lx dropped.", tag);
      67           0 :       return 1;
      68             :     }
      69             : 
      70             :     // TODO need to keep the list sorted
      71           6 :     table->subscriptions[table->num_subscriptions].tag = tag;
      72           6 :     table->subscriptions[table->num_subscriptions].socket = socket;
      73           6 :     table->subscriptions[table->num_subscriptions].again = 0;
      74             : 
      75             :     log_dbg("Adding connection in subscription table, tag=%lu, socket=%p",
      76             :       table->subscriptions[table->num_subscriptions].tag,
      77             :       table->subscriptions[table->num_subscriptions].socket
      78             :     );
      79             : 
      80           6 :     table->num_subscriptions++;
      81           6 :     table->ts++;
      82           6 :     log_info("New entry in buffered subscription table, tag=%lu in hex 0x%lx, socket=%p, total #subscription=%lu", tag, tag, socket, table->num_subscriptions);
      83           6 :     qsort(table->subscriptions, table->num_subscriptions, sizeof(struct netio_subscription), cmp_subscription);
      84           6 :     return 0;
      85             : }
      86             : 
      87             : 
      88             : /**
      89             :  * @brief Handle an unsubscription or a client disconnection.
      90             :  *
      91             :  * @param netio_subscription_table: the table of active subscriptions
      92             :  * @param socket:                   the unbuffered send socket used to send the data to the subscriber
      93             :  * @param tag:                      the tag for which an unsubscribe request has been received
      94             :  * @param closed_connection:        a flag to enable the removal of all the subscriptions associated to
      95             :  *                                  the send socket in response to a closed connection.
      96             :  */
      97             : static void
      98           5 : table_remove_subscription(struct netio_subscription_table* table, struct netio_buffered_send_socket* socket, netio_tag_t tag, uint8_t closed_connection)
      99             : {
     100             :   log_dbg("Total subscriptions: %lu", table->num_subscriptions);
     101             :   unsigned i=0;
     102             :   unsigned remaining_subscriptions_of_socket=0;
     103          10 :   for(unsigned int k = 0; k < table->num_subscriptions; ++k) {
     104           5 :     if(table->subscriptions[k].socket == socket){remaining_subscriptions_of_socket++;}
     105             :   }
     106             : 
     107             : 
     108          10 :   while(i<table->num_subscriptions) {
     109           5 :     if(table->subscriptions[i].socket == socket && (table->subscriptions[i].tag == tag || closed_connection)) {
     110             :       log_dbg("Removing entry in subscription table, tag=%lu, socket=%p index %lu becomes %u",
     111             :                 table->subscriptions[i].tag,
     112             :                 table->subscriptions[i].socket,
     113             :                 table->num_subscriptions-1,
     114             :                 i);
     115           5 :       table->subscriptions[i].tag = table->subscriptions[table->num_subscriptions-1].tag;
     116           5 :       table->subscriptions[i].socket = table->subscriptions[table->num_subscriptions-1].socket;
     117           5 :       table->subscriptions[i].again = table->subscriptions[table->num_subscriptions-1].again;
     118           5 :       table->num_subscriptions--;
     119           5 :       remaining_subscriptions_of_socket--;
     120           5 :       table->ts++;
     121             :     }
     122             :     else{
     123           0 :       i++;
     124             :       log_dbg("Skipping tag=%lu because the socket is %p", table->subscriptions[i].tag, table->subscriptions[i].socket);
     125             :     }
     126             :   }
     127           5 :   qsort(table->subscriptions, table->num_subscriptions, sizeof(struct netio_subscription), cmp_subscription);
     128             :   log_dbg("Remaining subscriptions: %lu",table->num_subscriptions);
     129           5 :   log_info("Removing entry from buffered subscription table, Fid=0x%lx, socket=%p, total #subscription=%lu", tag, socket, table->num_subscriptions);
     130           5 :   if(closed_connection==0 && remaining_subscriptions_of_socket==0){
     131           5 :     log_warn("Disconnecting endpoint with zero subscriptions");
     132           5 :     if (socket->send_socket.tcp_fi_mode == NETIO_MODE_TCP){
     133             :       // netio_disconnect(&socket->send_socket); // TODO prevents re-subscription
     134           3 :     } else if (socket->send_socket.tcp_fi_mode == NETIO_MODE_LIBFABRIC){
     135           3 :       netio_disconnect(&socket->send_socket);
     136             :     }
     137             :   }
     138           5 : }
     139             : 
     140             : 
     141             : static int
     142           5 : table_get_subs_by_socket(struct netio_subscription_table* table, struct netio_buffered_send_socket* socket)
     143             : {
     144             :   log_dbg("Subscription table lookup: Total subscriptions: %lu", table->num_subscriptions);
     145             :   unsigned remaining_subscriptions_of_socket=0;
     146           5 :   for(unsigned int k = 0; k < table->num_subscriptions; ++k) {
     147           0 :     if(table->subscriptions[k].socket == socket){remaining_subscriptions_of_socket++;}
     148             :   }
     149             :   log_dbg("Subscription table lookup: for socket %p: %lu", socket, remaining_subscriptions_of_socket);
     150           5 :   return remaining_subscriptions_of_socket;
     151             : }
     152             : 
     153             : static void
     154           0 : on_buffer_available(void* ptr)
     155             : {
     156             :   log_trc("a buffer became available, calling callback");
     157             :   struct netio_publish_socket* socket = (struct netio_publish_socket*)ptr;
     158           0 :   if(socket->cb_buffer_available) {
     159           0 :     socket->cb_buffer_available(socket);
     160             :   }
     161           0 : }
     162             : 
     163             : 
     164             : static void
     165           6 : pubsocket_on_connection_established(struct netio_buffered_send_socket* socket)
     166             : {
     167             :         log_dbg("publish socket established connection to remote, can publish now");
     168           6 :         struct netio_publish_socket* pubsocket = (struct netio_publish_socket*)socket->pub_socket;
     169             :   //add deferred subscriptions to the table
     170           6 :   struct deferred_subscription* sub = socket->send_socket.deferred_subs;
     171          12 :   while(sub){
     172           6 :     int ret = table_add_subscription(&pubsocket->subscription_table, sub->tag, socket);
     173           6 :     if(pubsocket->cb_subscribe && ret == 0){
     174           6 :       pubsocket->cb_subscribe(pubsocket, sub->tag, NULL, 0);
     175             :     }
     176           6 :     pop_subscription(&sub);
     177             :   }
     178             :   //user callback
     179           6 :   if(pubsocket->cb_connection_established) {
     180           6 :     pubsocket->cb_connection_established(pubsocket);
     181             :   }
     182           6 : }
     183             : 
     184             : 
     185             : static void
     186           5 : pubsocket_on_connection_closed(struct netio_buffered_send_socket* socket)
     187             : {
     188             :   log_dbg("published socket with buffered send socket %p and send socket %p: connection to remote was closed", socket, &socket->send_socket);
     189           5 :   struct netio_publish_socket* pubsocket = (struct netio_publish_socket*)socket->pub_socket;
     190           5 :   if(pubsocket->cb_connection_closed) {
     191           5 :     pubsocket->cb_connection_closed(pubsocket);
     192             :   }
     193             :   //Only if the connection was closed without unsubscribing first.
     194           5 :   if (table_get_subs_by_socket(&pubsocket->subscription_table, socket)){
     195             :     log_dbg("Removing a send socket for which a connection was closed without unsubscribing first");
     196             :     uint8_t connection_closed = 1;
     197           0 :     table_remove_subscription(&pubsocket->subscription_table, socket, 0, connection_closed);
     198             :   }
     199           5 : }
     200             : 
     201             : 
     202             : static struct netio_buffered_send_socket*
     203           6 : socket_list_add_or_lookup(struct netio_publish_socket* pubsocket,
     204             :                           struct netio_socket_list** list,
     205             :                           void* addr, size_t addrlen,
     206             :                           int port,
     207             :                           struct netio_context* ctx,
     208             :                           struct netio_buffered_socket_attr* attr)
     209             : {
     210           6 :   if(addrlen == 0) {
     211             :     return NULL;
     212             :   }
     213             : 
     214           6 :   struct netio_socket_list* entry = find_socket_by_address(*list, addr, addrlen, port);
     215             : 
     216           6 :   if ( entry == NULL ) {
     217           6 :     entry = add_socket_with_address(list, BSEND, addr, addrlen, port);
     218           6 :     struct netio_buffered_send_socket* bufsocket = entry->socket;
     219             : 
     220           6 :     if (pubsocket->tcp_fi_mode==NETIO_MODE_LIBFABRIC) {
     221           4 :       netio_buffered_send_socket_init(bufsocket, ctx, attr);
     222           4 :       bufsocket->pub_socket = pubsocket;
     223           4 :       bufsocket->cb_connection_established = pubsocket_on_connection_established;
     224           4 :       bufsocket->cb_connection_closed = pubsocket_on_connection_closed;
     225           4 :       netio_buffered_connect_rawaddr(bufsocket, entry->addr, entry->addrlen);
     226             :     }
     227           2 :     else if (pubsocket->tcp_fi_mode==NETIO_MODE_TCP){
     228           2 :       netio_buffered_send_tcp_socket_init(bufsocket, ctx, attr);
     229           2 :       bufsocket->pub_socket = pubsocket;
     230           2 :       bufsocket->cb_connection_established = pubsocket_on_connection_established;
     231           2 :       bufsocket->cb_connection_closed = pubsocket_on_connection_closed;
     232             :       log_dbg("Calling netio_connect_tcp for addr=<%s> port=%d", (char*)entry->addr, port);
     233           2 :       netio_connect_tcp(&bufsocket->send_socket, entry->addr, port);
     234             :     }
     235             :     else {
     236           0 :       log_error("socket_list_add_or_lookup called for unkown connection mode %d", pubsocket->tcp_fi_mode);
     237           0 :       remove_socket(list, bufsocket);
     238           0 :       return NULL;
     239             :     }
     240             : 
     241           6 :     bufsocket->signal_buffer_available.data = pubsocket;
     242           6 :     bufsocket->signal_buffer_available.cb = on_buffer_available;
     243             :   }
     244             : 
     245           6 :   struct netio_buffered_send_socket* ss = (struct netio_buffered_send_socket*)entry->socket;
     246           6 :   return ss;
     247             : }
     248             : 
     249             : 
     250             : static void
     251           6 : subscribe(struct netio_publish_socket* pubsocket, struct netio_recv_socket* recv_socket, void* addr, size_t addrlen, int port, netio_tag_t tag)
     252             : {
     253           6 :   struct netio_buffered_send_socket* socket = socket_list_add_or_lookup(pubsocket, &pubsocket->subscription_table.socket_list, addr, addrlen, port, pubsocket->ctx, &pubsocket->attr);
     254             : 
     255           6 :   if(socket->send_socket.recv_socket == NULL){
     256           6 :     socket->send_socket.recv_socket = recv_socket;
     257             :   }
     258             : 
     259           6 :   if (socket->send_socket.state == CONNECTED){
     260           0 :     table_add_subscription(&pubsocket->subscription_table, tag, socket);
     261           0 :     if(pubsocket->cb_subscribe) {
     262           0 :       pubsocket->cb_subscribe(pubsocket, tag, addr, addrlen);
     263             :     }
     264             :   } else {
     265           6 :     push_back_subscription(&socket->send_socket.deferred_subs, tag);
     266             :   }
     267           6 : }
     268             : 
     269             : 
     270             : static void
     271           5 : unsubscribe(struct netio_publish_socket* pubsocket, void* addr, size_t addrlen, int port, netio_tag_t tag)
     272             : {
     273           5 :   struct netio_socket_list* list = find_socket_by_address(pubsocket->subscription_table.socket_list, addr, addrlen, port);
     274           5 :   if(list == NULL){return;}
     275             : 
     276           5 :   struct netio_buffered_send_socket* socket = (struct netio_buffered_send_socket*)list->socket;
     277             :   uint8_t connection_closed = 0;
     278           5 :   table_remove_subscription(&pubsocket->subscription_table, socket, tag, connection_closed);
     279           5 :   pubsocket->subscription_table.ts++;
     280             : 
     281           5 :   if(pubsocket->cb_unsubscribe) {
     282           1 :     pubsocket->cb_unsubscribe(pubsocket, tag, addr, addrlen);
     283             :   }
     284             : 
     285             : }
     286             : 
     287             : 
     288             : static void
     289           6 : lsocket_on_connection_established(struct netio_recv_socket* socket)
     290             : {
     291             :   log_dbg("Buffered listen socket: on connection established");
     292           6 :   if(socket->tcp_fi_mode == NETIO_MODE_TCP){
     293             :     //libfabric buffers posted in on_listen_socket_cm_event
     294           2 :     socket->sub_msg_buffers = malloc(32*sizeof(struct netio_buffer*));
     295          66 :     for (int i = 0; i < 32; i++){
     296          64 :       socket->sub_msg_buffers[i] = malloc(sizeof(struct netio_buffer));
     297          64 :       socket->sub_msg_buffers[i]->size = sizeof(struct netio_subscription_message);
     298          64 :       socket->sub_msg_buffers[i]->data = malloc(socket->sub_msg_buffers[i]->size);
     299          64 :       netio_post_recv(socket, socket->sub_msg_buffers[i]);
     300             :     }
     301             :   }
     302           6 : }
     303             : 
     304             : 
     305             : static void
     306          11 : parse_subscription_message(struct netio_publish_socket* socket, struct netio_subscription_message* msg, struct netio_recv_socket* recv_socket)
     307             : {
     308          11 :     if (msg->action){
     309             :       log_dbg("subscription request for tag %lu, addrlen=%lu", msg->tag, msg->addrlen);
     310           6 :       subscribe(socket, recv_socket, msg->addr, msg->addrlen, msg->port, msg->tag);
     311             :     }
     312             :     else{
     313             :       log_dbg("unsubscription request for tag %lu, addrlen=%lu", msg->tag, msg->addrlen);
     314           5 :       unsubscribe(socket, msg->addr, msg->addrlen, msg->port, msg->tag);
     315             :     }
     316          11 : }
     317             : 
     318             : 
     319             : static void
     320          11 : lsocket_on_msg_received(struct netio_recv_socket* socket, struct netio_buffer* buf, void* data, size_t len)
     321             : {
     322             :     log_dbg("message received by recv socket %p", socket);
     323          11 :     struct netio_publish_socket* pubsocket = (struct netio_publish_socket*)socket->lsocket->usr;
     324          11 :     if(len != sizeof(struct netio_subscription_message)) {
     325           0 :       log_error("Illegal subscription message size %lu. Did you confuse tohost and from host links", len);
     326           0 :       netio_post_recv(socket, buf);
     327           0 :       return;
     328             :     }
     329          11 :     parse_subscription_message(pubsocket, (struct netio_subscription_message*)buf->data, socket);
     330          11 :     netio_post_recv(socket, buf);
     331             : }
     332             : 
     333             : 
     334             : static int
     335         287 : send_subscription_message(struct netio_subscribe_socket* socket, netio_tag_t tag, int action)
     336             : {
     337         287 :     if( action == NETIO_SUBSCRIBE ){
     338         146 :       log_info("Sending subscription for tag 0x%lx", tag);
     339         141 :     } else if ( action == NETIO_UNSUBSCRIBE ){
     340         141 :       log_info("Sending unsubscription for tag 0x%lx", tag);
     341             :     } else {
     342           0 :       log_error("Invalid subscription action %d", action);
     343           0 :       return 0;
     344             :     }
     345             :     int ret = 0;
     346         287 :     if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
     347         145 :       socket->msg.tag = tag;
     348         145 :       socket->msg.action = action;
     349         145 :       socket->buf.data = &socket->msg;
     350         145 :       socket->buf.size = sizeof(struct netio_subscription_message);
     351         145 :       ret = netio_send_inline_buffer(&socket->socket, &socket->buf);
     352         142 :     } else if (socket->tcp_fi_mode == NETIO_MODE_TCP){
     353             :       // look for available buf
     354             :       size_t id = 0;
     355        1076 :       while (socket->bufs[id].to_send) {
     356         934 :         id++;
     357             :       }
     358         142 :       log_info("tag id %d", id);
     359         142 :       socket->msgs[id] = socket->msgs[0]; // initialize msg to default
     360         142 :       socket->msgs[id].tag = tag;
     361         142 :       socket->msgs[id].action = action;
     362         142 :       socket->bufs[id].data = &socket->msgs[id];
     363         142 :       socket->bufs[id].size = sizeof(struct netio_subscription_message);
     364         142 :       socket->bufs[id].to_send = 1;
     365         142 :       ret = netio_send_buffer(&socket->socket, &socket->bufs[id]);
     366             :     } else {
     367           0 :       log_error("Socket connection type unsupported %d", socket->tcp_fi_mode);
     368             :       ret = 1;
     369             :     }
     370         287 :     log_info("send_subscription_message done");
     371         287 :     return ret;
     372             : }
     373             : 
     374             : 
     375             : static void
     376          53 : subsocket_on_connection_established(struct netio_send_socket* socket)
     377             : {
     378             :     log_dbg("subsocket connection established");
     379             :     int ret;
     380          53 :     struct netio_subscribe_socket* subscribe_socket = (struct netio_subscribe_socket*)socket->usr;
     381             : 
     382          53 :     if(subscribe_socket->total_tags == 0){
     383           0 :       log_info("Closing send connection again because there is no tag to subscribe to.");
     384           0 :       netio_disconnect(socket);
     385             :     }
     386          53 :     if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
     387          28 :       subscribe_socket->msg.addrlen = NETIO_MAX_ADDRLEN;
     388          28 :       if((ret = fi_getname(&subscribe_socket->recv_socket.listen_socket.pep->fid,
     389          28 :                           subscribe_socket->msg.addr,
     390             :                           &subscribe_socket->msg.addrlen)) != 0) {
     391           0 :         log_fatal("Cannot get information on local interface %d: %s", ret, fi_strerror(-ret));
     392           0 :         exit(1);
     393             :       }
     394          28 :       subscribe_socket->buf.data = &subscribe_socket->msg;
     395          28 :       subscribe_socket->buf.size = sizeof(struct netio_subscription_message);
     396          28 :       netio_register_send_buffer(socket, &subscribe_socket->buf, 0);
     397             :     }
     398          25 :     else if (socket->tcp_fi_mode == NETIO_MODE_TCP) {
     399             :       // initialize msgs[0] with defaults
     400             :       struct sockaddr sock_addr;
     401          26 :       socklen_t addrlen=sizeof(sock_addr);
     402          26 :       getsockname(socket->eq_ev_ctx.fd, &sock_addr, &addrlen);
     403          26 :       getnameinfo(&sock_addr, addrlen,
     404          26 :                 subscribe_socket->msgs[0].addr,NETIO_MAX_ADDRLEN,
     405             :                 NULL, 0, NI_NUMERICHOST);
     406          26 :       addrlen=strlen(subscribe_socket->msgs[0].addr);
     407          26 :       subscribe_socket->msgs[0].addr[addrlen] = 0;
     408          26 :       subscribe_socket->msgs[0].addrlen = addrlen+1;
     409          26 :       subscribe_socket->msgs[0].port = subscribe_socket->msg.port;
     410             :     }
     411             : 
     412             :     // send tags one by one
     413         135 :     while(subscribe_socket->total_tags > 0){
     414          81 :       size_t idx = subscribe_socket->total_tags - 1;
     415          81 :       netio_tag_t tag = subscribe_socket->tags_to_subscribe[idx];
     416          81 :       log_info("subsocket_on_connection_established, sending subscription for tag 0x%lx pos %lu", tag, idx);
     417          82 :       ret = send_subscription_message(subscribe_socket, tag, NETIO_SUBSCRIBE);
     418          82 :       if (ret == NETIO_STATUS_OK){
     419          82 :         subscribe_socket->total_tags--;
     420             :       } else {
     421           0 :         log_warn("subsocket_on_connection_established: send_subscription_message returned %d", ret);
     422           0 :         break;
     423             :       }
     424             :     }
     425          54 : }
     426             : 
     427             : 
     428             : static void
     429           0 : subsocket_on_send_connection_closed(struct netio_send_socket* socket)
     430             : {
     431             :     log_dbg("subsocket_on_send_connection_closed callback");
     432           0 :     struct netio_subscribe_socket* subscribe_socket = (struct netio_subscribe_socket*)socket->usr;
     433           0 :     subscribe_socket->state = NONE;
     434           0 :     if(socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
     435           0 :       handle_send_socket_shutdown(socket);
     436             :     }
     437           0 :     else if (socket->tcp_fi_mode == NETIO_MODE_TCP){
     438           0 :       handle_tcp_send_socket_shutdown(socket);
     439             :     }
     440           0 : }
     441             : 
     442             : 
     443             : static void
     444           3 : subsocket_on_error_connection_refused(struct netio_send_socket* ssocket) {
     445             :     log_dbg("subsocket connection refused");
     446           3 :     struct netio_subscribe_socket* subscribe_socket = (struct netio_subscribe_socket*)ssocket->usr;
     447           3 :     subscribe_socket->state = NONE;
     448             : 
     449           3 :     handle_listen_socket_shutdown(&subscribe_socket->recv_socket.listen_socket);
     450           3 :     if(subscribe_socket->cb_error_connection_refused) {
     451           3 :       subscribe_socket->cb_error_connection_refused(subscribe_socket);
     452             :     }
     453           3 : }
     454             : 
     455             : 
     456             : static void
     457         287 : subsocket_on_send_completed(struct netio_send_socket* socket, uint64_t key)
     458             : {
     459         287 :     struct netio_subscribe_socket* subscribe_socket = (struct netio_subscribe_socket*)socket->usr;
     460             :     //check for remaining tags from on_connection_established
     461         287 :     while(subscribe_socket->total_tags > 0){
     462           0 :       size_t idx = subscribe_socket->total_tags - 1;
     463           0 :       netio_tag_t tag = subscribe_socket->tags_to_subscribe[idx];
     464           0 :       int ret = send_subscription_message(subscribe_socket, tag, NETIO_SUBSCRIBE);
     465           0 :       if (ret == NETIO_STATUS_OK){
     466           0 :         subscribe_socket->total_tags--;
     467             :       } else {
     468           0 :         log_warn("subsocket_on_send_completed: send_subscription_message returned %d", ret);
     469           0 :         break;
     470             :       }
     471             :     }
     472         287 : }
     473             : 
     474             : 
     475             : static void
     476          55 : subscribe_socket_on_connection_established(struct netio_buffered_recv_socket* rsocket)
     477             : {
     478             :                 log_dbg("connection to subscribe socket has been established");
     479          55 :                 struct netio_subscribe_socket* socket = (struct netio_subscribe_socket*)rsocket->lsocket->listen_socket.usr;
     480          55 :                 if(socket->cb_connection_established) {
     481          53 :       socket->cb_connection_established(socket);
     482             :     }
     483          55 : }
     484             : 
     485             : 
     486             : static void
     487          25 : subscribe_socket_on_connection_closed(struct netio_buffered_recv_socket* rsocket)
     488             : {
     489          25 :     log_info("connection to subscribe socket has been closed");
     490          25 :     struct netio_subscribe_socket* socket = (struct netio_subscribe_socket*)rsocket->lsocket->listen_socket.usr;
     491          25 :     if(socket->cb_connection_closed) {
     492          25 :       socket->cb_connection_closed(socket);
     493             :     }
     494          25 : }
     495             : 
     496             : 
     497             : static void
     498  1714769824 : subscribe_socket_on_msg_received(struct netio_buffered_recv_socket* rsocket, void* data, size_t len)
     499             : {
     500             :     log_trc("buffer received");
     501  1714769824 :     struct netio_subscribe_socket* socket = (struct netio_subscribe_socket*)rsocket->lsocket->listen_socket.usr;
     502             : 
     503  1714769824 :     if(len <= sizeof(netio_tag_t)) {
     504           0 :       log_error("Received invalid message of size %lu. Need %lu bytes for tag, at least 1 of payload", len, sizeof(netio_tag_t));
     505           0 :       return;
     506             :     }
     507             : 
     508  1714769824 :     netio_tag_t tag = *((netio_tag_t*)data);
     509  1714769824 :     if(socket->cb_msg_received) {
     510  1714769824 :       socket->cb_msg_received(socket, tag, (char *)data + sizeof(netio_tag_t), len - sizeof(netio_tag_t));
     511             :     }
     512             : }
     513             : 
     514             : 
     515             : void
     516          17 : netio_subscription_table_init(struct netio_subscription_table* table)
     517             : {
     518          17 :     table->socket_list = NULL;
     519          17 :     table->subscriptions = (struct netio_subscription*)malloc(NETIO_INITIAL_SUBSCRIPTIONS*sizeof(struct netio_subscription));
     520          17 :     table->num_subscriptions = 0;
     521          17 :     table->size = NETIO_INITIAL_SUBSCRIPTIONS;
     522          17 :     table->ts = 0;
     523          17 : }
     524             : 
     525             : 
     526             : /*
     527             :  * Initializes a buffered publish socket.
     528             :  *
     529             :  * @param socket: The buffered publish socket to initialize
     530             :  * @param ctx: The NetIO context in which to initialize the socket
     531             :  * @param hostname: Hostname or IP address to bind to
     532             :  * @param port: Port to bind to
     533             :  * @param attr: Buffered connection settings to be used for the underlying connections
     534             :  *
     535             :  * @see `netio_buffered_send_socket_init` for a description of the connection parameters
     536             :  */
     537             : void
     538           7 : netio_publish_libfabric_socket_init(struct netio_publish_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffered_socket_attr* attr)
     539             : {
     540           7 :     memset(socket, 0, sizeof(*socket));
     541           7 :     socket->ctx = ctx;
     542           7 :     socket->tcp_fi_mode=NETIO_MODE_LIBFABRIC;
     543           7 :     netio_subscription_table_init(&socket->subscription_table);
     544           7 :     netio_init_listen_socket(&socket->lsocket, ctx, NULL);
     545           7 :     if (attr->num_pages > NETIO_DOMAIN_MAX_MR){
     546           0 :       log_error("Number of requested publisher netio pages %lu exceeds max value. Using %lu.", attr->num_pages, NETIO_DOMAIN_MAX_MR);
     547           0 :       attr->num_pages = NETIO_DOMAIN_MAX_MR;
     548             :     }
     549           7 :     memcpy(&socket->attr, attr, sizeof(struct netio_buffered_socket_attr));
     550           7 :     socket->lsocket.usr = socket;
     551           7 :     socket->lsocket.cb_connection_established = lsocket_on_connection_established;
     552           7 :     socket->lsocket.cb_msg_received = lsocket_on_msg_received;
     553           7 :     socket->lsocket.recv_sub_msg = 1;
     554           7 :     char* _hostname = netio_domain_name_lookup(hostname);
     555           7 :     netio_listen(&socket->lsocket, (const char*)_hostname, port);
     556           7 :     free(_hostname);
     557           7 : }
     558             : 
     559             : 
     560             : /**
     561             :  * Initializes a buffered publish socket but with tcp instead if libfabric.
     562             :  *
     563             :  * @param socket: The buffered publish socket to initialize
     564             :  * @param ctx: The NetIO context in which to initialize the socket
     565             :  * @param hostname: Hostname or IP address to bind to
     566             :  * @param port: Port to bind to
     567             :  * @param attr: Buffered connection settings to be used for the underlying connections
     568             :  *
     569             :  * @see `netio_buffered_send_socket_init` for a description of the connection parameters
     570             :  */
     571             : void
     572           2 : netio_publish_tcp_socket_init(struct netio_publish_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffered_socket_attr* attr)
     573             : {
     574           2 :     memset(socket, 0, sizeof(*socket));
     575           2 :     socket->ctx = ctx;
     576           2 :     socket->tcp_fi_mode = NETIO_MODE_TCP;
     577           2 :     netio_subscription_table_init(&socket->subscription_table);
     578           2 :     netio_init_listen_tcp_socket(&socket->lsocket, ctx, NULL);
     579           2 :     memcpy(&socket->attr, attr, sizeof(struct netio_buffered_socket_attr));
     580           2 :     socket->lsocket.usr = socket;
     581           2 :     socket->lsocket.cb_connection_established = lsocket_on_connection_established;
     582           2 :     socket->lsocket.cb_msg_received = lsocket_on_msg_received;
     583           2 :     socket->lsocket.recv_sub_msg = 1;
     584           2 :     char* _hostname = netio_domain_name_lookup(hostname);
     585           2 :     netio_listen_tcp(&socket->lsocket, (const char*)_hostname, port);
     586           2 :     free(_hostname);
     587           2 : }
     588             : 
     589             : 
     590             : /*
     591             :  * Initializes a buffered publish socket.
     592             :  *
     593             :  * @param socket: The buffered publish socket to initialize
     594             :  * @param ctx: The NetIO context in which to initialize the socket
     595             :  * @param hostname: Hostname or IP address to bind to
     596             :  * @param port: Port to bind to
     597             :  * @param attr: Buffered connection settings to be used for the underlying connections
     598             :  *
     599             :  * @see `netio_buffered_send_socket_init` for a description of the connection parameters
     600             :  */
     601             : void
     602           9 : netio_publish_socket_init(struct netio_publish_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffered_socket_attr* attr)
     603             : {
     604           9 :     if (netio_tcp_mode(hostname)) {
     605           2 :         netio_publish_tcp_socket_init(socket, ctx, netio_hostname(hostname), port, attr);
     606             :     } else {
     607           7 :         netio_publish_libfabric_socket_init(socket, ctx, netio_hostname(hostname), port, attr);
     608             :     }
     609           9 : }
     610             : 
     611             : /**
     612             :  * Initializes a buffered subscribe socket.
     613             :  *
     614             :  * @param socket: The buffered subscribe socket to initialize
     615             :  * @param ctx: The NetIO context in which to initialize the socket
     616             :  * @param attr: Buffered connection settings to be used for the underlying connections
     617             :  * @param hostname: Hostname or IP address of the local interface to bind to
     618             :  * @param remote_host: Hostname or IP of the remote publish socket
     619             :  * @param remote_port: Port of the remote publish socket
     620             :  *
     621             :  * @see `netio_buffered_send_socket_init` for a description of the connection parameters
     622             :  */
     623             : void
     624          31 : netio_subscribe_libfabric_socket_init(struct netio_subscribe_socket* socket,
     625             :                                       struct netio_context* ctx,
     626             :                                       struct netio_buffered_socket_attr* attr,
     627             :                                       const char* hostname,
     628             :                                       const char* remote_host,
     629             :                                       unsigned remote_port)
     630             : {
     631          31 :     memset(socket, 0, sizeof(*socket));
     632          31 :     socket->ctx = ctx;
     633             :     socket->state = NONE;
     634          31 :     socket->tcp_fi_mode=NETIO_MODE_LIBFABRIC;
     635          31 :     memcpy(&socket->attr, attr, sizeof(struct netio_buffered_socket_attr));
     636          31 :     netio_buffered_listen_socket_init(&socket->recv_socket, ctx, &socket->attr);
     637          31 :     socket->recv_socket.listen_socket.usr = socket;
     638          31 :     socket->recv_socket.cb_connection_established = subscribe_socket_on_connection_established;
     639          31 :     socket->recv_socket.cb_connection_closed = subscribe_socket_on_connection_closed;
     640          31 :     socket->recv_socket.cb_msg_received = subscribe_socket_on_msg_received;
     641             :     //set cb_buf_received meant only for pileup measurement to NULL
     642          31 :     socket->cb_buf_received = NULL;
     643          31 :     char* lookedup_hostname = netio_domain_name_lookup(hostname);
     644          31 :     char* lookedup_remote_hostname = netio_domain_name_lookup(remote_host);
     645          31 :     netio_buffered_listen(&socket->recv_socket, (const char*)lookedup_hostname, 0);
     646             : 
     647          31 :     socket->remote_hostname = strdup((const char*)lookedup_remote_hostname);
     648          31 :     socket->remote_port = remote_port;
     649          31 :     socket->total_tags = 0;
     650          31 :     free(lookedup_remote_hostname);
     651          31 :     free(lookedup_hostname);
     652          31 : }
     653             : 
     654             : 
     655             : /* Same as above except tcp instead of libfabric */
     656             : void
     657          26 : netio_subscribe_tcp_socket_init(struct netio_subscribe_socket* socket,
     658             :                             struct netio_context* ctx,
     659             :                             struct netio_buffered_socket_attr* attr,
     660             :                             const char* hostname,
     661             :                             const char* remote_host,
     662             :                             unsigned remote_port)
     663             : {
     664          26 :     log_info("subscribe_tcp from <%s> to <%s>",hostname,remote_host);
     665          26 :     memset(socket, 0, sizeof(*socket));
     666          26 :     socket->ctx = ctx;
     667             :     socket->state = NONE;
     668          26 :     socket->tcp_fi_mode=NETIO_MODE_TCP;
     669          26 :     memcpy(&socket->attr, attr, sizeof(struct netio_buffered_socket_attr));
     670             :     log_dbg("Prepare recv socket (buffered_listen_tcp_socket)");
     671          26 :     netio_buffered_listen_tcp_socket_init(&socket->recv_socket, ctx, &socket->attr);
     672             : 
     673          26 :     socket->recv_socket.listen_socket.usr = socket;
     674          26 :     socket->recv_socket.cb_connection_established = subscribe_socket_on_connection_established;
     675          26 :     socket->recv_socket.cb_connection_closed = subscribe_socket_on_connection_closed;
     676          26 :     socket->recv_socket.cb_msg_received = subscribe_socket_on_msg_received;
     677             :     //set cb_buf_received meant only for pileup measurement to NULL
     678          26 :     socket->cb_buf_received = NULL;
     679          26 :     char* lookedup_hostname = netio_domain_name_lookup(hostname);
     680          26 :     char* lookedup_remote_hostname = netio_domain_name_lookup(remote_host);
     681          26 :     netio_buffered_listen_tcp(&socket->recv_socket, (const char*)lookedup_hostname, 0);
     682             : 
     683          26 :     socket->msg.port=socket->recv_socket.listen_socket.port;
     684             : 
     685          26 :     socket->remote_hostname = strdup((const char*)lookedup_remote_hostname);
     686          26 :     socket->remote_port = remote_port;
     687          26 :     socket->total_tags = 0;
     688          26 :     free(lookedup_remote_hostname);
     689          26 :     free(lookedup_hostname);
     690          26 : }
     691             : 
     692             : 
     693             : /**
     694             :  * Initializes a buffered subscribe socket.
     695             :  *
     696             :  * @param socket: The buffered subscribe socket to initialize
     697             :  * @param ctx: The NetIO context in which to initialize the socket
     698             :  * @param attr: Buffered connection settings to be used for the underlying connections
     699             :  * @param hostname: Hostname or IP address of the local interface to bind to
     700             :  * @param remote_host: Hostname or IP of the remote publish socket
     701             :  * @param remote_port: Port of the remote publish socket
     702             :  *
     703             :  * @see `netio_buffered_send_socket_init` for a description of the connection parameters
     704             :  */
     705             : void
     706          57 : netio_subscribe_socket_init(struct netio_subscribe_socket* socket,
     707             :                             struct netio_context* ctx,
     708             :                             struct netio_buffered_socket_attr* attr,
     709             :                             const char* hostname,
     710             :                             const char* remote_host,
     711             :                             unsigned remote_port)
     712             : {
     713          57 :   if (netio_tcp_mode(hostname)) {
     714          26 :     netio_subscribe_tcp_socket_init(socket, ctx, attr, netio_hostname(hostname), netio_hostname(remote_host), remote_port);
     715             :   } else {
     716          31 :     netio_subscribe_libfabric_socket_init(socket, ctx, attr, netio_hostname(hostname), netio_hostname(remote_host), remote_port);
     717             :   }
     718          57 : }
     719             : 
     720             : /**
     721             :  * Initialize a netio_subscription_cache object.
     722             :  *
     723             :  * @param cache: The cache to be initialized
     724             :  */
     725             : void
     726           0 : netio_subscription_cache_init(struct netio_subscription_cache* cache)
     727             : {
     728           0 :   cache->ts = 0;
     729           0 :   cache->count = 0;
     730           0 :   cache->idx_start = 0;
     731           0 : }
     732             : 
     733             : 
     734             : static unsigned
     735       10102 : lookup_tag(struct netio_subscription_table* table, netio_tag_t tag, unsigned* start)
     736             : {
     737             :   struct netio_subscription key;
     738       10102 :   key.tag = tag;
     739       10102 :   struct netio_subscription* ptr = (struct netio_subscription*)bsearch(&key,
     740       10102 :                                                                        table->subscriptions,
     741             :                                                                        table->num_subscriptions,
     742             :                                                                        sizeof(struct netio_subscription),
     743             :                                                                        cmp_subscription);
     744       10102 :   if(ptr == NULL) {
     745             :     return 0;
     746             :   }
     747       10102 :   unsigned start_idx = ptr - table->subscriptions;
     748       10102 :         while(start_idx > 0 && table->subscriptions[start_idx-1].tag == tag) {
     749             :                 start_idx--;
     750             :         }
     751             :   unsigned count = 0;
     752       20204 :   for(unsigned int i=start_idx; i<table->num_subscriptions; i++) {
     753       10102 :     if(table->subscriptions[i].tag == tag) {
     754       10102 :       count++;
     755             :     } else {
     756             :       break;
     757             :     }
     758             :   }
     759       10102 :   *start = start_idx;
     760       10102 :   return count;
     761             : }
     762             : 
     763             : static unsigned
     764           0 : lookup_tag_cached(struct netio_subscription_table* table, netio_tag_t tag, struct netio_subscription_cache* cache, unsigned* start)
     765             : {
     766           0 :   if(cache->ts != table->ts) {
     767           0 :     cache->count = lookup_tag(table, tag, &cache->idx_start);
     768           0 :     cache->ts = table->ts;
     769             :   }
     770           0 :   *start = cache->idx_start;
     771           0 :   return cache->count;
     772             : }
     773             : 
     774             : /**
     775             :  * Publishes a message under a given tag
     776             :  *
     777             :  * @param socket:  The socket to publish on
     778             :  * @param tag:     The tag under which to publish
     779             :  * @param data:    Message data
     780             :  * @param len:     Message size
     781             :  * @param flags:   NETIO_REENTRY  publishing of this message was attempted before and
     782             :  *                 resulted in NETIO_STATUS_AGAIN. Calling publish with
     783             :  *                 this flag will only send on connections where the
     784             :  *                 message was previously unpublished.
     785             :  * @param cache:   Optional user-supplied cache for the subscription table lookup.
     786             :  */
     787             : int
     788       10102 : netio_buffered_publishi(struct netio_publish_socket* socket, struct iovec * iov, size_t len, int flags, struct netio_subscription_cache* cache)
     789             : {
     790             :   int ret = NETIO_STATUS_OK;
     791       10102 :   netio_tag_t tag = *(netio_tag_t*)iov[0].iov_base;
     792             :   unsigned start_idx;
     793             :   unsigned num_subscriptions;
     794             : 
     795       10102 :   if(cache) {
     796           0 :     num_subscriptions = lookup_tag_cached(&socket->subscription_table, tag, cache, &start_idx);
     797             :   } else {
     798       10102 :     num_subscriptions = lookup_tag(&socket->subscription_table, tag, &start_idx);
     799             :   }
     800       10102 :   if(num_subscriptions == 0) return NETIO_STATUS_OK_NOSUB;
     801             : 
     802       20204 :   for(unsigned i=start_idx; i<start_idx+num_subscriptions; i++) {
     803       10102 :     struct netio_subscription* subscription = &socket->subscription_table.subscriptions[i];
     804       10102 :     if(subscription->tag == tag) {
     805             : 
     806             :       // skip connections that were already successful if we are in reentry mode
     807       10102 :       if(flags & NETIO_REENTRY) {
     808           0 :         if(subscription->again == 0) {
     809           0 :           continue;
     810             :         }
     811             :       }
     812             : 
     813       10102 :       int result = netio_buffered_sendv(subscription->socket, iov, len);
     814             :       log_dbg("Sending iov on subscription->socket, result=%d",result);
     815             : 
     816       10102 :       if(result == NETIO_STATUS_OK) {
     817       10102 :         subscription->again = 0;
     818           0 :       } else if(result == NETIO_STATUS_AGAIN) {
     819           0 :         subscription->again = 1;
     820             :         ret = NETIO_STATUS_AGAIN;
     821             :       }
     822           0 :       else if(result == NETIO_STATUS_TOO_BIG) {
     823           0 :         subscription->again = 0;
     824             :         ret = NETIO_STATUS_TOO_BIG;
     825             :       }
     826             :       else {
     827             :         return result; // some error occured and we return immediately
     828             :       }
     829             :     }
     830             :   }
     831             :   return ret;
     832             : }
     833             : 
     834             : 
     835             : /**
     836             :  * Publishes a message under a given tag
     837             :  *
     838             :  * @param socket:  The socket to publish on
     839             :  * @param tag:     The tag under which to publish
     840             :  * @param data:    Message data
     841             :  * @param len:     Message size
     842             :  * @param flags:   NETIO_REENTRY  publishing of this message was attempted before and
     843             :  *                 resulted in NETIO_STATUS_AGAIN. Calling publish with
     844             :  *                 this flag will only send on connections where the
     845             :  *                 message was previously unpublished.
     846             :  * @param cache:   Optional user-supplied cache for the subscription table lookup.
     847             :  */
     848             : int
     849       10102 : netio_buffered_publish(struct netio_publish_socket* socket, netio_tag_t tag, void* data, size_t len, int flags, struct netio_subscription_cache* cache)
     850             : {
     851             :   log_trc("netio_buffered_publish (size=%lu)", len);
     852             : 
     853             :   struct iovec iov[2];
     854       10102 :   iov[0].iov_base = &tag;
     855       10102 :   iov[0].iov_len = sizeof(netio_tag_t);
     856       10102 :   iov[1].iov_base = data;
     857       10102 :   iov[1].iov_len = len;
     858             : 
     859       10102 :   return netio_buffered_publishi(socket, iov, 2, flags, cache);
     860             : }
     861             : 
     862             : 
     863             : int
     864           0 : netio_buffered_publishv(struct netio_publish_socket* socket, netio_tag_t tag, struct iovec * iov, size_t len, int flags, struct netio_subscription_cache* cache)
     865             : {
     866             :   struct iovec iovv[NETIO_MAX_IOV_LEN + 1];
     867             :   //NETIO_MAX_IOV_LEN is not a limititation, as all entries will be copied in one in netio_buffered_sendv
     868           0 :   iovv[0].iov_base = &tag;
     869           0 :   iovv[0].iov_len = sizeof(netio_tag_t);
     870           0 :   size_t size = (len > NETIO_MAX_IOV_LEN) ?  NETIO_MAX_IOV_LEN : len;
     871           0 :   memcpy(iovv + 1, iov, size * sizeof(struct iovec));
     872           0 :   return netio_buffered_publishi(socket, iovv, (size + 1), flags, cache);
     873             : }
     874             : 
     875             : 
     876             : /**
     877             :  * Flushes buffers on all connections of a given publish socket for a certain tag.
     878             :  *
     879             :  * @param socket The buffered publish socket
     880             :  * @param tag The message tag
     881             :  * @param cache An optional subscription cache object
     882             :  */
     883             : void
     884           2 : netio_buffered_publish_flush(struct netio_publish_socket* socket, netio_tag_t tag, struct netio_subscription_cache* cache)
     885             : {
     886           4 :   for(unsigned i=0; i<socket->subscription_table.num_subscriptions; i++) {
     887           2 :     struct netio_subscription* subscription = &socket->subscription_table.subscriptions[i];
     888           2 :     netio_buffered_flush(subscription->socket);
     889             :   }
     890           2 : }
     891             : 
     892             : 
     893             : /**
     894             :  * Subscribe to a given message tag.
     895             :  *
     896             :  * For a given subscribe socket, `netio_subscribe` can be called multiple times.
     897             :  *
     898             :  * @param socket: The buffered subscribe socket.
     899             :  * @param tag: The subscription tag.
     900             :  */
     901             : int
     902         149 : netio_subscribe(struct netio_subscribe_socket* socket, netio_tag_t tag)
     903             : {
     904         149 :     if(socket->state == NONE) {
     905             :       //A new socket is created and the on subsocket_on_connection_established
     906             :       //will trigger the actual subscriptions.
     907             :       log_dbg("Creating and connecting a new send_socket");
     908          57 :       if (socket->tcp_fi_mode  == NETIO_MODE_LIBFABRIC) {
     909          31 :         netio_init_send_socket(&socket->socket, socket->ctx);
     910          31 :         socket->socket.usr = socket;
     911          31 :         socket->socket.cb_connection_established = subsocket_on_connection_established;
     912          31 :         socket->socket.cb_connection_closed = subsocket_on_send_connection_closed;
     913          31 :         socket->socket.cb_error_connection_refused = subsocket_on_error_connection_refused;
     914          31 :         socket->socket.cb_send_completed = subsocket_on_send_completed;
     915          31 :         netio_connect(&socket->socket, socket->remote_hostname, socket->remote_port);
     916          26 :       } else if (socket->tcp_fi_mode  == NETIO_MODE_TCP) {
     917          26 :          netio_init_send_tcp_socket(&socket->socket, socket->ctx);
     918          26 :          socket->socket.usr = socket;
     919          26 :          socket->socket.cb_connection_established = subsocket_on_connection_established;
     920          26 :          socket->socket.cb_connection_closed = subsocket_on_send_connection_closed;
     921          26 :          socket->socket.cb_error_connection_refused = subsocket_on_error_connection_refused;
     922          26 :          socket->socket.cb_send_completed = subsocket_on_send_completed;
     923          26 :          netio_connect_tcp(&socket->socket, socket->remote_hostname, socket->remote_port);
     924             :       }
     925          57 :       socket->state = INITIALIZED;
     926             :     }
     927             : 
     928         279 :     for(unsigned i=0; i<socket->total_tags; i++) {
     929         130 :       if(socket->tags_to_subscribe[i] == tag) {
     930             :         return 0;
     931             :       }
     932             :     }
     933             : 
     934             :     //if send socket connected send message
     935             :     //otherwise on_connection_established will do it
     936         149 :     if (socket->socket.state){
     937          64 :       log_info("Sending subscription message for tag 0x%lx", tag);
     938          64 :       int ret = send_subscription_message(socket, tag, NETIO_SUBSCRIBE);
     939          64 :       return ret;
     940             :     } else {
     941          85 :       log_info("Queing subscription message for txg 0x%lx", tag);
     942          85 :       socket->tags_to_subscribe[socket->total_tags] = tag;
     943          85 :       socket->total_tags++;
     944          85 :       return 0;
     945             :     }
     946             : }
     947             : 
     948             : 
     949             : static int
     950           0 : remove_tag_to_subscribe(struct netio_subscribe_socket* socket, netio_tag_t tag)
     951             : {
     952             :   int found = 0;
     953           0 :   for(unsigned int i=0; i<socket->total_tags; ++i){
     954           0 :     if(socket->tags_to_subscribe[i] == tag){
     955           0 :       log_info("Removing tag 0x%lx from tags_to_subscribe of socket %p", socket);
     956           0 :       for(unsigned int j = i; j < socket->total_tags-1; ++j){
     957           0 :         socket->tags_to_subscribe[j] = socket->tags_to_subscribe[j+1];
     958             :       }
     959             :       found = 1;
     960           0 :       socket->total_tags--;
     961             :       break;
     962             :     }
     963             :   }
     964             :   if(found == 0){
     965           0 :     log_warn("tag 0x%lx expected in tags_to_subscribe of socket %p not found", tag);
     966             :   }
     967           0 :   return NETIO_STATUS_OK;
     968             : }
     969             : 
     970             : 
     971             : /**
     972             :  * Unsubscribe from a given message tag.
     973             :  *
     974             :  * For a given subscribe socket, `netio_unsubscribe` can be called multiple times.
     975             :  *
     976             :  * @param socket: The subscribe socket.
     977             :  * @param tag: The tag to unsubscribe from.
     978             :  */
     979             : int
     980         141 : netio_unsubscribe(struct netio_subscribe_socket* socket, netio_tag_t tag)
     981             : {
     982             :   int ret = NETIO_STATUS_OK;
     983         141 :   if(socket->state == INITIALIZED) {
     984             :     log_dbg("Subscribe socket initialised, can proceed with usubscription");
     985         141 :     if (socket->socket.state) {
     986         141 :       ret = send_subscription_message(socket, tag, NETIO_UNSUBSCRIBE);
     987             :       log_dbg("Unsubscription message for tag 0x%lx sent, return code %d", tag, ret);
     988             :     } else {
     989           0 :       ret = remove_tag_to_subscribe(socket, tag);
     990             :     }
     991             :   } else {
     992             :     log_dbg("The connection has been already closed.");
     993             :   }
     994         141 :   return ret;
     995             : }
     996             : 
     997             : /**
     998             :  * Returns the minimum number of netio pages across buffered send sockets
     999             :  * associated to a publish socket
    1000             :  *
    1001             :  * @param socket: The publish socket.
    1002             :  */
    1003             : unsigned
    1004           0 : netio_pubsocket_get_minimum_pages(struct netio_publish_socket* socket)
    1005             : {
    1006           0 :   if (!socket) {
    1007             :     return 0;
    1008             :   }
    1009             : 
    1010           0 :   size_t pages = socket->attr.num_pages;
    1011           0 :   struct netio_socket_list* itr = socket->subscription_table.socket_list;
    1012             : 
    1013           0 :   while(itr != NULL){
    1014           0 :     struct netio_buffered_send_socket* buf_send_socket = (struct netio_buffered_send_socket*)itr->socket;
    1015           0 :     uint64_t socket_pages = buf_send_socket->buffers.available_buffers;
    1016           0 :     if(socket_pages < pages){
    1017             :       pages = socket_pages;
    1018             :     }
    1019           0 :     itr = itr->next;
    1020             :   }
    1021           0 :   return pages;
    1022             : }
    1023             : 

Generated by: LCOV version 1.0