LCOV - code coverage report
Current view: top level - netio-next/src - pubsub.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 451 506 89.1 %
Date: 2025-08-12 04:15:35 Functions: 39 40 97.5 %

          Line data    Source code
       1             : #include <stdio.h>
       2             : #include <string.h>
       3             : #include <assert.h>
       4             : #include <netdb.h>
       5             : #include "log.h"
       6             : #include "netio/netio.h"
       7             : #include "netio/netio_tcp.h"
       8             : 
       9             : #if defined DEBUG || defined DEBUG_PUB
      10             : #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
      11             : #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
      12             : #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
      13             : #else
      14             : #define log_dbg(...)
      15             : #define log_trc(...)
      16             : #endif
      17             : 
      18             : 
      19             : char*
      20         497 : netio_domain_name_lookup(const char* domain_name)
      21             : {
      22         497 :   if(!domain_name) return NULL;
      23         497 :   struct sockaddr_in sock_address;
      24         497 :   char* ip_address = (char*)malloc(sizeof(char) * 17);
      25         497 :   int is_ip_address = inet_pton(AF_INET, domain_name, &(sock_address.sin_addr));
      26         497 :   if(!is_ip_address)
      27             :   {
      28           4 :     struct hostent* host = gethostbyname(domain_name);
      29           4 :     if(host)
      30             :     {
      31           2 :       strcpy(ip_address, inet_ntoa(*(struct in_addr *)host->h_addr));
      32             :     }
      33             :     else
      34             :     {
      35           2 :       char* _domain_name = malloc(sizeof(char) * (strlen(domain_name) + 1));
      36           2 :       strcpy(_domain_name, domain_name);
      37           2 :       log_error("The host cannot be resolved. Domain name set to %s", _domain_name);
      38           2 :       free(ip_address);
      39           2 :       return _domain_name;
      40             :     }
      41             :   }
      42             :   else
      43             :   {
      44         493 :     strcpy(ip_address, domain_name);
      45             :   }
      46             : 
      47             :   return ip_address;
      48             : }
      49             : 
      50             : static int
      51       10129 : cmp_subscription(const void* a, const void *b)
      52             : {
      53       10129 :   struct netio_subscription* suba = (struct netio_subscription*)a;
      54       10129 :   struct netio_subscription* subb = (struct netio_subscription*)b;
      55             : 
      56       10129 :         if(suba->tag == subb->tag) {
      57             :                 return 0;
      58             :         }
      59           5 :         return suba->tag > subb->tag ? 1 : -1;
      60             : }
      61             : 
      62             : static int
      63          24 : table_add_subscription(struct netio_subscription_table* table, netio_tag_t tag, struct netio_buffered_send_socket* socket)
      64             : {
      65          24 :     if(table->num_subscriptions == table->size) {
      66           0 :       log_error("Maximum number of subscriptions. New subscription for %lx dropped.", tag);
      67           0 :       return 1;
      68             :     }
      69             : 
      70             :     // TODO need to keep the list sorted
      71          24 :     table->subscriptions[table->num_subscriptions].tag = tag;
      72          24 :     table->subscriptions[table->num_subscriptions].socket = socket;
      73          24 :     table->subscriptions[table->num_subscriptions].again = 0;
      74             : 
      75             :     log_dbg("Adding connection in subscription table, tag=%lu, socket=%p",
      76             :       table->subscriptions[table->num_subscriptions].tag,
      77             :       table->subscriptions[table->num_subscriptions].socket
      78          24 :     );
      79             : 
      80          24 :     table->num_subscriptions++;
      81          24 :     table->ts++;
      82          24 :     log_info("New entry in buffered subscription table, tag=%lu in hex 0x%lx, socket=%p, total #subscription=%lu", tag, tag, socket, table->num_subscriptions);
      83          24 :     qsort(table->subscriptions, table->num_subscriptions, sizeof(struct netio_subscription), cmp_subscription);
      84          24 :     return 0;
      85             : }
      86             : 
      87             : 
      88             : /**
      89             :  * @brief Handle an unsubscription or a client disconnection.
      90             :  *
      91             :  * @param netio_subscription_table: the table of active subscriptions
      92             :  * @param socket:                   the unbuffered send socket used to send the data to the subscriber
      93             :  * @param tag:                      the tag for which an unsubscribe request has been received
      94             :  * @param closed_connection:        a flag to enable the removal of all the subscriptions associated to
      95             :  *                                  the send socket in response to a closed connection.
      96             :  */
      97             : static void
      98          23 : table_remove_subscription(struct netio_subscription_table* table, struct netio_buffered_send_socket* socket, netio_tag_t tag, uint8_t closed_connection)
      99             : {
     100          23 :   log_dbg("Total subscriptions: %lu", table->num_subscriptions);
     101          23 :   unsigned i=0;
     102          23 :   unsigned remaining_subscriptions_of_socket=0;
     103          48 :   for(unsigned int k = 0; k < table->num_subscriptions; ++k) {
     104          25 :     if(table->subscriptions[k].socket == socket){remaining_subscriptions_of_socket++;}
     105             :   }
     106             : 
     107             : 
     108          48 :   while(i<table->num_subscriptions) {
     109          25 :     if(table->subscriptions[i].socket == socket && (table->subscriptions[i].tag == tag || closed_connection)) {
     110             :       log_dbg("Removing entry in subscription table, tag=%lu, socket=%p index %lu becomes %u",
     111             :                 table->subscriptions[i].tag,
     112             :                 table->subscriptions[i].socket,
     113             :                 table->num_subscriptions-1,
     114          23 :                 i);
     115          23 :       table->subscriptions[i].tag = table->subscriptions[table->num_subscriptions-1].tag;
     116          23 :       table->subscriptions[i].socket = table->subscriptions[table->num_subscriptions-1].socket;
     117          23 :       table->subscriptions[i].again = table->subscriptions[table->num_subscriptions-1].again;
     118          23 :       table->num_subscriptions--;
     119          23 :       remaining_subscriptions_of_socket--;
     120          23 :       table->ts++;
     121             :     }
     122             :     else{
     123           2 :       i++;
     124          48 :       log_dbg("Skipping tag=%lu because the socket is %p", table->subscriptions[i].tag, table->subscriptions[i].socket);
     125             :     }
     126             :   }
     127          23 :   qsort(table->subscriptions, table->num_subscriptions, sizeof(struct netio_subscription), cmp_subscription);
     128          23 :   log_dbg("Remaining subscriptions: %lu",table->num_subscriptions);
     129          23 :   log_info("Removing entry from buffered subscription table, Fid=0x%lx, socket=%p, total #subscription=%lu", tag, socket, table->num_subscriptions);
     130          23 :   if(closed_connection==0 && remaining_subscriptions_of_socket==0){
     131          23 :     log_warn("Disconnecting endpoint with zero subscriptions");
     132          23 :     if (socket->send_socket.tcp_fi_mode == NETIO_MODE_TCP){
     133             :       // netio_disconnect(&socket->send_socket); // TODO prevents re-subscription
     134          21 :     } else if (socket->send_socket.tcp_fi_mode == NETIO_MODE_LIBFABRIC){
     135          21 :       netio_disconnect(&socket->send_socket);
     136             :     }
     137             :   }
     138          23 : }
     139             : 
     140             : 
     141             : static int
     142          23 : table_get_subs_by_socket(struct netio_subscription_table* table, struct netio_buffered_send_socket* socket)
     143             : {
     144          23 :   log_dbg("Subscription table lookup: Total subscriptions: %lu", table->num_subscriptions);
     145          23 :   unsigned remaining_subscriptions_of_socket=0;
     146          23 :   for(unsigned int k = 0; k < table->num_subscriptions; ++k) {
     147           0 :     if(table->subscriptions[k].socket == socket){remaining_subscriptions_of_socket++;}
     148             :   }
     149          23 :   log_dbg("Subscription table lookup: for socket %p: %lu", socket, remaining_subscriptions_of_socket);
     150          23 :   return remaining_subscriptions_of_socket;
     151             : }
     152             : 
     153             : static void
     154      212184 : on_buffer_available(void* ptr)
     155             : {
     156      212184 :   log_trc("a buffer became available, calling callback");
     157      212184 :   struct netio_publish_socket* socket = (struct netio_publish_socket*)ptr;
     158      212184 :   if(socket->cb_buffer_available) {
     159      212184 :     socket->cb_buffer_available(socket);
     160             :   }
     161      212184 : }
     162             : 
     163             : 
     164             : static void
     165          24 : pubsocket_on_connection_established(struct netio_buffered_send_socket* socket)
     166             : {
     167          24 :         log_dbg("publish socket established connection to remote, can publish now");
     168          24 :         struct netio_publish_socket* pubsocket = (struct netio_publish_socket*)socket->pub_socket;
     169             :   //add deferred subscriptions to the table
     170          24 :   struct deferred_subscription* sub = socket->send_socket.deferred_subs;
     171          48 :   while(sub){
     172          24 :     int ret = table_add_subscription(&pubsocket->subscription_table, sub->tag, socket);
     173          24 :     if(pubsocket->cb_subscribe && ret == 0){
     174           6 :       pubsocket->cb_subscribe(pubsocket, sub->tag, NULL, 0);
     175             :     }
     176          24 :     pop_subscription(&sub);
     177             :   }
     178             :   //user callback
     179          24 :   if(pubsocket->cb_connection_established) {
     180           6 :     pubsocket->cb_connection_established(pubsocket);
     181             :   }
     182          24 : }
     183             : 
     184             : 
     185             : static void
     186          23 : pubsocket_on_connection_closed(struct netio_buffered_send_socket* socket)
     187             : {
     188          23 :   log_dbg("published socket with buffered send socket %p and send socket %p: connection to remote was closed", socket, &socket->send_socket);
     189          23 :   struct netio_publish_socket* pubsocket = (struct netio_publish_socket*)socket->pub_socket;
     190          23 :   if(pubsocket->cb_connection_closed) {
     191           5 :     pubsocket->cb_connection_closed(pubsocket);
     192             :   }
     193             :   //Only if the connection was closed without unsubscribing first.
     194          23 :   if (table_get_subs_by_socket(&pubsocket->subscription_table, socket)){
     195           0 :     log_dbg("Removing a send socket for which a connection was closed without unsubscribing first");
     196           0 :     uint8_t connection_closed = 1;
     197           0 :     table_remove_subscription(&pubsocket->subscription_table, socket, 0, connection_closed);
     198             :   }
     199          23 : }
     200             : 
     201             : 
     202             : static struct netio_buffered_send_socket*
     203          24 : socket_list_add_or_lookup(struct netio_publish_socket* pubsocket,
     204             :                           struct netio_socket_list** list,
     205             :                           void* addr, size_t addrlen,
     206             :                           int port,
     207             :                           struct netio_context* ctx,
     208             :                           struct netio_buffered_socket_attr* attr)
     209             : {
     210          24 :   if(addrlen == 0) {
     211             :     return NULL;
     212             :   }
     213             : 
     214          24 :   struct netio_socket_list* entry = find_socket_by_address(*list, addr, addrlen, port);
     215             : 
     216          24 :   if ( entry == NULL ) {
     217          24 :     entry = add_socket_with_address(list, BSEND, addr, addrlen, port);
     218          24 :     struct netio_buffered_send_socket* bufsocket = entry->socket;
     219             : 
     220          24 :     if (pubsocket->tcp_fi_mode==NETIO_MODE_LIBFABRIC) {
     221          22 :       netio_buffered_send_socket_init(bufsocket, ctx, attr);
     222          22 :       bufsocket->pub_socket = pubsocket;
     223          22 :       bufsocket->cb_connection_established = pubsocket_on_connection_established;
     224          22 :       bufsocket->cb_connection_closed = pubsocket_on_connection_closed;
     225          22 :       netio_buffered_connect_rawaddr(bufsocket, entry->addr, entry->addrlen);
     226             :     }
     227           2 :     else if (pubsocket->tcp_fi_mode==NETIO_MODE_TCP){
     228           2 :       netio_buffered_send_tcp_socket_init(bufsocket, ctx, attr);
     229           2 :       bufsocket->pub_socket = pubsocket;
     230           2 :       bufsocket->cb_connection_established = pubsocket_on_connection_established;
     231           2 :       bufsocket->cb_connection_closed = pubsocket_on_connection_closed;
     232           2 :       log_dbg("Calling netio_connect_tcp for addr=<%s> port=%d", (char*)entry->addr, port);
     233           2 :       netio_connect_tcp(&bufsocket->send_socket, entry->addr, port);
     234             :     }
     235             :     else {
     236           0 :       log_error("socket_list_add_or_lookup called for unkown connection mode %d", pubsocket->tcp_fi_mode);
     237           0 :       remove_socket(list, bufsocket);
     238           0 :       return NULL;
     239             :     }
     240             : 
     241          24 :     bufsocket->signal_buffer_available.data = pubsocket;
     242          24 :     bufsocket->signal_buffer_available.cb = on_buffer_available;
     243             :   }
     244             : 
     245          24 :   struct netio_buffered_send_socket* ss = (struct netio_buffered_send_socket*)entry->socket;
     246          24 :   return ss;
     247             : }
     248             : 
     249             : 
     250             : static void
     251          24 : subscribe(struct netio_publish_socket* pubsocket, struct netio_recv_socket* recv_socket, void* addr, size_t addrlen, int port, netio_tag_t tag)
     252             : {
     253          24 :   struct netio_buffered_send_socket* socket = socket_list_add_or_lookup(pubsocket, &pubsocket->subscription_table.socket_list, addr, addrlen, port, pubsocket->ctx, &pubsocket->attr);
     254             : 
     255          24 :   if(socket->send_socket.recv_socket == NULL){
     256          24 :     socket->send_socket.recv_socket = recv_socket;
     257             :   }
     258             : 
     259          24 :   if (socket->send_socket.state == CONNECTED){
     260           0 :     table_add_subscription(&pubsocket->subscription_table, tag, socket);
     261           0 :     if(pubsocket->cb_subscribe) {
     262           0 :       pubsocket->cb_subscribe(pubsocket, tag, addr, addrlen);
     263             :     }
     264             :   } else {
     265          24 :     push_back_subscription(&socket->send_socket.deferred_subs, tag);
     266             :   }
     267          24 : }
     268             : 
     269             : 
     270             : static void
     271          23 : unsubscribe(struct netio_publish_socket* pubsocket, void* addr, size_t addrlen, int port, netio_tag_t tag)
     272             : {
     273          23 :   struct netio_socket_list* list = find_socket_by_address(pubsocket->subscription_table.socket_list, addr, addrlen, port);
     274          23 :   if(list == NULL){return;}
     275             : 
     276          23 :   struct netio_buffered_send_socket* socket = (struct netio_buffered_send_socket*)list->socket;
     277          23 :   uint8_t connection_closed = 0;
     278          23 :   table_remove_subscription(&pubsocket->subscription_table, socket, tag, connection_closed);
     279          23 :   pubsocket->subscription_table.ts++;
     280             : 
     281          23 :   if(pubsocket->cb_unsubscribe) {
     282           1 :     pubsocket->cb_unsubscribe(pubsocket, tag, addr, addrlen);
     283             :   }
     284             : 
     285             : }
     286             : 
     287             : 
     288             : static void
     289          24 : lsocket_on_connection_established(struct netio_recv_socket* socket)
     290             : {
     291          24 :   log_dbg("Buffered listen socket: on connection established");
     292          24 :   if(socket->tcp_fi_mode == NETIO_MODE_TCP){
     293             :     //libfabric buffers posted in on_listen_socket_cm_event
     294           2 :     socket->sub_msg_buffers = malloc(32*sizeof(struct netio_buffer*));
     295          66 :     for (int i = 0; i < 32; i++){
     296          64 :       socket->sub_msg_buffers[i] = malloc(sizeof(struct netio_buffer));
     297          64 :       socket->sub_msg_buffers[i]->size = sizeof(struct netio_subscription_message);
     298          64 :       socket->sub_msg_buffers[i]->data = malloc(socket->sub_msg_buffers[i]->size);
     299          64 :       netio_post_recv(socket, socket->sub_msg_buffers[i]);
     300             :     }
     301             :   }
     302          24 : }
     303             : 
     304             : 
     305             : static void
     306          47 : parse_subscription_message(struct netio_publish_socket* socket, struct netio_subscription_message* msg, struct netio_recv_socket* recv_socket)
     307             : {
     308          47 :     if (msg->action){
     309          24 :       log_dbg("subscription request for tag %lu, addrlen=%lu", msg->tag, msg->addrlen);
     310          24 :       subscribe(socket, recv_socket, msg->addr, msg->addrlen, msg->port, msg->tag);
     311             :     }
     312             :     else{
     313          23 :       log_dbg("unsubscription request for tag %lu, addrlen=%lu", msg->tag, msg->addrlen);
     314          23 :       unsubscribe(socket, msg->addr, msg->addrlen, msg->port, msg->tag);
     315             :     }
     316          47 : }
     317             : 
     318             : 
     319             : static void
     320          47 : lsocket_on_msg_received(struct netio_recv_socket* socket, struct netio_buffer* buf, void* data, size_t len)
     321             : {
     322          47 :     log_dbg("message received by recv socket %p", socket);
     323          47 :     struct netio_publish_socket* pubsocket = (struct netio_publish_socket*)socket->lsocket->usr;
     324          47 :     if(len != sizeof(struct netio_subscription_message)) {
     325           0 :       log_error("Illegal subscription message size %lu. Did you confuse tohost and from host links", len);
     326           0 :       netio_post_recv(socket, buf);
     327           0 :       return;
     328             :     }
     329          47 :     parse_subscription_message(pubsocket, (struct netio_subscription_message*)buf->data, socket);
     330          47 :     netio_post_recv(socket, buf);
     331             : }
     332             : 
     333             : 
     334             : static int
     335        2380 : send_subscription_message(struct netio_subscribe_socket* socket, netio_tag_t tag, int action)
     336             : {
     337        2380 :     if( action == NETIO_SUBSCRIBE ){
     338        1217 :       log_info("Sending subscription for tag 0x%lx", tag);
     339        1163 :     } else if ( action == NETIO_UNSUBSCRIBE ){
     340        1163 :       log_info("Sending unsubscription for tag 0x%lx", tag);
     341             :     } else {
     342           0 :       log_error("Invalid subscription action %d", action);
     343           0 :       return 0;
     344             :     }
     345        2380 :     int ret = 0;
     346        2380 :     if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
     347        1263 :       socket->msg.tag = tag;
     348        1263 :       socket->msg.action = action;
     349        1263 :       socket->buf.data = &socket->msg;
     350        1263 :       socket->buf.size = sizeof(struct netio_subscription_message);
     351        1263 :       ret = netio_send_inline_buffer(&socket->socket, &socket->buf);
     352        1117 :     } else if (socket->tcp_fi_mode == NETIO_MODE_TCP){
     353             :       // look for available buf
     354             :       size_t id = 0;
     355        2066 :       while (socket->bufs[id].to_send) {
     356         949 :         id++;
     357             :       }
     358        1117 :       log_info("tag id %d", id);
     359        1117 :       socket->msgs[id] = socket->msgs[0]; // initialize msg to default
     360        1117 :       socket->msgs[id].tag = tag;
     361        1117 :       socket->msgs[id].action = action;
     362        1117 :       socket->bufs[id].data = &socket->msgs[id];
     363        1117 :       socket->bufs[id].size = sizeof(struct netio_subscription_message);
     364        1117 :       socket->bufs[id].to_send = 1;
     365        1117 :       ret = netio_send_buffer(&socket->socket, &socket->bufs[id]);
     366             :     } else {
     367           0 :       log_error("Socket connection type unsupported %d", socket->tcp_fi_mode);
     368           0 :       ret = 1;
     369             :     }
     370        2380 :     log_info("send_subscription_message done");
     371        2380 :     return ret;
     372             : }
     373             : 
     374             : 
     375             : static void
     376         162 : subsocket_on_connection_established(struct netio_send_socket* socket)
     377             : {
     378         162 :     log_dbg("subsocket connection established");
     379         162 :     int ret;
     380         162 :     struct netio_subscribe_socket* subscribe_socket = (struct netio_subscribe_socket*)socket->usr;
     381             : 
     382         162 :     if(subscribe_socket->total_tags == 0){
     383           0 :       log_info("Closing send connection again because there is no tag to subscribe to.");
     384           0 :       netio_disconnect(socket);
     385             :     }
     386         162 :     if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
     387         105 :       subscribe_socket->msg.addrlen = NETIO_MAX_ADDRLEN;
     388         105 :       if((ret = fi_getname(&subscribe_socket->recv_socket.listen_socket.pep->fid,
     389         105 :                           subscribe_socket->msg.addr,
     390             :                           &subscribe_socket->msg.addrlen)) != 0) {
     391           0 :         log_fatal("Cannot get information on local interface %d: %s", ret, fi_strerror(-ret));
     392           0 :         exit(1);
     393             :       }
     394         105 :       subscribe_socket->buf.data = &subscribe_socket->msg;
     395         105 :       subscribe_socket->buf.size = sizeof(struct netio_subscription_message);
     396         105 :       netio_register_send_buffer(socket, &subscribe_socket->buf, 0);
     397             :     }
     398          57 :     else if (socket->tcp_fi_mode == NETIO_MODE_TCP) {
     399             :       // initialize msgs[0] with defaults
     400          57 :       struct sockaddr sock_addr;
     401          57 :       socklen_t addrlen=sizeof(sock_addr);
     402          57 :       getsockname(socket->eq_ev_ctx.fd, &sock_addr, &addrlen);
     403          57 :       getnameinfo(&sock_addr, addrlen,
     404          57 :                 subscribe_socket->msgs[0].addr,NETIO_MAX_ADDRLEN,
     405             :                 NULL, 0, NI_NUMERICHOST);
     406          57 :       addrlen=strlen(subscribe_socket->msgs[0].addr);
     407          57 :       subscribe_socket->msgs[0].addr[addrlen] = 0;
     408          57 :       subscribe_socket->msgs[0].addrlen = addrlen+1;
     409          57 :       subscribe_socket->msgs[0].port = subscribe_socket->msg.port;
     410             :     }
     411             : 
     412             :     // send tags one by one
     413         514 :     while(subscribe_socket->total_tags > 0){
     414         352 :       size_t idx = subscribe_socket->total_tags - 1;
     415         352 :       netio_tag_t tag = subscribe_socket->tags_to_subscribe[idx];
     416         352 :       log_info("subsocket_on_connection_established, sending subscription for tag 0x%lx pos %lu", tag, idx);
     417         352 :       ret = send_subscription_message(subscribe_socket, tag, NETIO_SUBSCRIBE);
     418         352 :       if (ret == NETIO_STATUS_OK){
     419         352 :         subscribe_socket->total_tags--;
     420             :       } else {
     421           0 :         log_warn("subsocket_on_connection_established: send_subscription_message returned %d", ret);
     422           0 :         break;
     423             :       }
     424             :     }
     425         162 : }
     426             : 
     427             : 
     428             : static void
     429          61 : subsocket_on_send_connection_closed(struct netio_send_socket* socket)
     430             : {
     431          61 :     log_dbg("subsocket_on_send_connection_closed callback");
     432          61 :     struct netio_subscribe_socket* subscribe_socket = (struct netio_subscribe_socket*)socket->usr;
     433          61 :     subscribe_socket->state = NONE;
     434          61 :     if(socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
     435          61 :       handle_send_socket_shutdown(socket);
     436             :     }
     437           0 :     else if (socket->tcp_fi_mode == NETIO_MODE_TCP){
     438           0 :       handle_tcp_send_socket_shutdown(socket);
     439             :     }
     440          61 : }
     441             : 
     442             : 
     443             : static void
     444          25 : subsocket_on_error_connection_refused(struct netio_send_socket* ssocket) {
     445          25 :     log_dbg("subsocket connection refused");
     446          25 :     struct netio_subscribe_socket* subscribe_socket = (struct netio_subscribe_socket*)ssocket->usr;
     447          25 :     subscribe_socket->state = NONE;
     448             : 
     449          25 :     handle_listen_socket_shutdown(&subscribe_socket->recv_socket.listen_socket);
     450          25 :     if(subscribe_socket->cb_error_connection_refused) {
     451          25 :       subscribe_socket->cb_error_connection_refused(subscribe_socket);
     452             :     }
     453          25 : }
     454             : 
     455             : 
     456             : static void
     457        2363 : subsocket_on_send_completed(struct netio_send_socket* socket, uint64_t key)
     458             : {
     459        2363 :     struct netio_subscribe_socket* subscribe_socket = (struct netio_subscribe_socket*)socket->usr;
     460             :     //check for remaining tags from on_connection_established
     461        2363 :     while(subscribe_socket->total_tags > 0){
     462           0 :       size_t idx = subscribe_socket->total_tags - 1;
     463           0 :       netio_tag_t tag = subscribe_socket->tags_to_subscribe[idx];
     464           0 :       int ret = send_subscription_message(subscribe_socket, tag, NETIO_SUBSCRIBE);
     465           0 :       if (ret == NETIO_STATUS_OK){
     466           0 :         subscribe_socket->total_tags--;
     467             :       } else {
     468           0 :         log_warn("subsocket_on_send_completed: send_subscription_message returned %d", ret);
     469           0 :         break;
     470             :       }
     471             :     }
     472        2363 : }
     473             : 
     474             : 
     475             : static void
     476         163 : subscribe_socket_on_connection_established(struct netio_buffered_recv_socket* rsocket)
     477             : {
     478         163 :                 log_dbg("connection to subscribe socket has been established");
     479         163 :                 struct netio_subscribe_socket* socket = (struct netio_subscribe_socket*)rsocket->lsocket->listen_socket.usr;
     480         163 :                 if(socket->cb_connection_established) {
     481         161 :       socket->cb_connection_established(socket);
     482             :     }
     483         163 : }
     484             : 
     485             : 
     486             : static void
     487          85 : subscribe_socket_on_connection_closed(struct netio_buffered_recv_socket* rsocket)
     488             : {
     489          85 :     log_info("connection to subscribe socket has been closed");
     490          85 :     struct netio_subscribe_socket* socket = (struct netio_subscribe_socket*)rsocket->lsocket->listen_socket.usr;
     491          85 :     if(socket->cb_connection_closed) {
     492          85 :       socket->cb_connection_closed(socket);
     493             :     }
     494          85 : }
     495             : 
     496             : 
     497             : static void
     498  1662952403 : subscribe_socket_on_msg_received(struct netio_buffered_recv_socket* rsocket, void* data, size_t len)
     499             : {
     500  1662952403 :     log_trc("buffer received");
     501  1662952403 :     struct netio_subscribe_socket* socket = (struct netio_subscribe_socket*)rsocket->lsocket->listen_socket.usr;
     502             : 
     503  1662952403 :     if(len <= sizeof(netio_tag_t)) {
     504           0 :       log_error("Received invalid message of size %lu. Need %lu bytes for tag, at least 1 of payload", len, sizeof(netio_tag_t));
     505           0 :       return;
     506             :     }
     507             : 
     508  1662952403 :     netio_tag_t tag = *((netio_tag_t*)data);
     509  1662952403 :     if(socket->cb_msg_received) {
     510  1662952403 :       socket->cb_msg_received(socket, tag, (char *)data + sizeof(netio_tag_t), len - sizeof(netio_tag_t));
     511             :     }
     512             : }
     513             : 
     514             : 
     515             : void
     516          41 : netio_subscription_table_init(struct netio_subscription_table* table)
     517             : {
     518          41 :     table->socket_list = NULL;
     519          41 :     table->subscriptions = (struct netio_subscription*)malloc(NETIO_INITIAL_SUBSCRIPTIONS*sizeof(struct netio_subscription));
     520          41 :     table->num_subscriptions = 0;
     521          41 :     table->size = NETIO_INITIAL_SUBSCRIPTIONS;
     522          41 :     table->ts = 0;
     523          41 : }
     524             : 
     525             : 
     526             : /*
     527             :  * Initializes a buffered publish socket.
     528             :  *
     529             :  * @param socket: The buffered publish socket to initialize
     530             :  * @param ctx: The NetIO context in which to initialize the socket
     531             :  * @param hostname: Hostname or IP address to bind to
     532             :  * @param port: Port to bind to
     533             :  * @param attr: Buffered connection settings to be used for the underlying connections
     534             :  *
     535             :  * @see `netio_buffered_send_socket_init` for a description of the connection parameters
     536             :  */
     537             : void
     538          21 : netio_publish_libfabric_socket_init(struct netio_publish_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffered_socket_attr* attr)
     539             : {
     540          21 :     memset(socket, 0, sizeof(*socket));
     541          21 :     socket->ctx = ctx;
     542          21 :     socket->tcp_fi_mode=NETIO_MODE_LIBFABRIC;
     543          21 :     netio_subscription_table_init(&socket->subscription_table);
     544          21 :     netio_init_listen_socket(&socket->lsocket, ctx, NULL);
     545          21 :     if (attr->num_pages > NETIO_DOMAIN_MAX_MR){
     546           0 :       log_error("Number of requested publisher netio pages %lu exceeds max value. Using %lu.", attr->num_pages, NETIO_DOMAIN_MAX_MR);
     547           0 :       attr->num_pages = NETIO_DOMAIN_MAX_MR;
     548             :     }
     549          21 :     memcpy(&socket->attr, attr, sizeof(struct netio_buffered_socket_attr));
     550          21 :     socket->lsocket.usr = socket;
     551          21 :     socket->lsocket.cb_connection_established = lsocket_on_connection_established;
     552          21 :     socket->lsocket.cb_msg_received = lsocket_on_msg_received;
     553          21 :     socket->lsocket.recv_sub_msg = 1;
     554          21 :     char* _hostname = netio_domain_name_lookup(hostname);
     555          21 :     netio_listen(&socket->lsocket, (const char*)_hostname, port);
     556          21 :     free(_hostname);
     557          21 : }
     558             : 
     559             : 
     560             : /**
     561             :  * Initializes a buffered publish socket but with tcp instead if libfabric.
     562             :  *
     563             :  * @param socket: The buffered publish socket to initialize
     564             :  * @param ctx: The NetIO context in which to initialize the socket
     565             :  * @param hostname: Hostname or IP address to bind to
     566             :  * @param port: Port to bind to
     567             :  * @param attr: Buffered connection settings to be used for the underlying connections
     568             :  *
     569             :  * @see `netio_buffered_send_socket_init` for a description of the connection parameters
     570             :  */
     571             : void
     572           2 : netio_publish_tcp_socket_init(struct netio_publish_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffered_socket_attr* attr)
     573             : {
     574           2 :     memset(socket, 0, sizeof(*socket));
     575           2 :     socket->ctx = ctx;
     576           2 :     socket->tcp_fi_mode = NETIO_MODE_TCP;
     577           2 :     netio_subscription_table_init(&socket->subscription_table);
     578           2 :     netio_init_listen_tcp_socket(&socket->lsocket, ctx, NULL);
     579           2 :     memcpy(&socket->attr, attr, sizeof(struct netio_buffered_socket_attr));
     580           2 :     socket->lsocket.usr = socket;
     581           2 :     socket->lsocket.cb_connection_established = lsocket_on_connection_established;
     582           2 :     socket->lsocket.cb_msg_received = lsocket_on_msg_received;
     583           2 :     socket->lsocket.recv_sub_msg = 1;
     584           2 :     char* _hostname = netio_domain_name_lookup(hostname);
     585           2 :     netio_listen_tcp(&socket->lsocket, (const char*)_hostname, port);
     586           2 :     free(_hostname);
     587           2 : }
     588             : 
     589             : 
     590             : /*
     591             :  * Initializes a buffered publish socket.
     592             :  *
     593             :  * @param socket: The buffered publish socket to initialize
     594             :  * @param ctx: The NetIO context in which to initialize the socket
     595             :  * @param hostname: Hostname or IP address to bind to
     596             :  * @param port: Port to bind to
     597             :  * @param attr: Buffered connection settings to be used for the underlying connections
     598             :  *
     599             :  * @see `netio_buffered_send_socket_init` for a description of the connection parameters
     600             :  */
     601             : void
     602          23 : netio_publish_socket_init(struct netio_publish_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffered_socket_attr* attr)
     603             : {
     604          23 :     if (netio_tcp_mode(hostname)) {
     605           2 :         netio_publish_tcp_socket_init(socket, ctx, netio_hostname(hostname), port, attr);
     606             :     } else {
     607          21 :         netio_publish_libfabric_socket_init(socket, ctx, netio_hostname(hostname), port, attr);
     608             :     }
     609          23 : }
     610             : 
     611             : /**
     612             :  * Initializes a buffered subscribe socket.
     613             :  *
     614             :  * @param socket: The buffered subscribe socket to initialize
     615             :  * @param ctx: The NetIO context in which to initialize the socket
     616             :  * @param attr: Buffered connection settings to be used for the underlying connections
     617             :  * @param hostname: Hostname or IP address of the local interface to bind to
     618             :  * @param remote_host: Hostname or IP of the remote publish socket
     619             :  * @param remote_port: Port of the remote publish socket
     620             :  *
     621             :  * @see `netio_buffered_send_socket_init` for a description of the connection parameters
     622             :  */
     623             : void
     624         130 : netio_subscribe_libfabric_socket_init(struct netio_subscribe_socket* socket,
     625             :                                       struct netio_context* ctx,
     626             :                                       struct netio_buffered_socket_attr* attr,
     627             :                                       const char* hostname,
     628             :                                       const char* remote_host,
     629             :                                       unsigned remote_port)
     630             : {
     631         130 :     memset(socket, 0, sizeof(*socket));
     632         130 :     socket->ctx = ctx;
     633         130 :     socket->state = NONE;
     634         130 :     socket->tcp_fi_mode=NETIO_MODE_LIBFABRIC;
     635         130 :     memcpy(&socket->attr, attr, sizeof(struct netio_buffered_socket_attr));
     636         130 :     netio_buffered_listen_socket_init(&socket->recv_socket, ctx, &socket->attr);
     637         130 :     socket->recv_socket.listen_socket.usr = socket;
     638         130 :     socket->recv_socket.cb_connection_established = subscribe_socket_on_connection_established;
     639         130 :     socket->recv_socket.cb_connection_closed = subscribe_socket_on_connection_closed;
     640         130 :     socket->recv_socket.cb_msg_received = subscribe_socket_on_msg_received;
     641             :     //set cb_buf_received meant only for pileup measurement to NULL
     642         130 :     socket->cb_buf_received = NULL;
     643         130 :     char* lookedup_hostname = netio_domain_name_lookup(hostname);
     644         130 :     char* lookedup_remote_hostname = netio_domain_name_lookup(remote_host);
     645         130 :     netio_buffered_listen(&socket->recv_socket, (const char*)lookedup_hostname, 0);
     646             : 
     647         130 :     socket->remote_hostname = strdup((const char*)lookedup_remote_hostname);
     648         130 :     socket->remote_port = remote_port;
     649         130 :     socket->total_tags = 0;
     650         130 :     free(lookedup_remote_hostname);
     651         130 :     free(lookedup_hostname);
     652         130 : }
     653             : 
     654             : 
     655             : /* Same as above except tcp instead of libfabric */
     656             : void
     657          57 : netio_subscribe_tcp_socket_init(struct netio_subscribe_socket* socket,
     658             :                             struct netio_context* ctx,
     659             :                             struct netio_buffered_socket_attr* attr,
     660             :                             const char* hostname,
     661             :                             const char* remote_host,
     662             :                             unsigned remote_port)
     663             : {
     664          57 :     log_info("subscribe_tcp from <%s> to <%s>",hostname,remote_host);
     665          57 :     memset(socket, 0, sizeof(*socket));
     666          57 :     socket->ctx = ctx;
     667          57 :     socket->state = NONE;
     668          57 :     socket->tcp_fi_mode=NETIO_MODE_TCP;
     669          57 :     memcpy(&socket->attr, attr, sizeof(struct netio_buffered_socket_attr));
     670          57 :     log_dbg("Prepare recv socket (buffered_listen_tcp_socket)");
     671          57 :     netio_buffered_listen_tcp_socket_init(&socket->recv_socket, ctx, &socket->attr);
     672             : 
     673          57 :     socket->recv_socket.listen_socket.usr = socket;
     674          57 :     socket->recv_socket.cb_connection_established = subscribe_socket_on_connection_established;
     675          57 :     socket->recv_socket.cb_connection_closed = subscribe_socket_on_connection_closed;
     676          57 :     socket->recv_socket.cb_msg_received = subscribe_socket_on_msg_received;
     677             :     //set cb_buf_received meant only for pileup measurement to NULL
     678          57 :     socket->cb_buf_received = NULL;
     679          57 :     char* lookedup_hostname = netio_domain_name_lookup(hostname);
     680          57 :     char* lookedup_remote_hostname = netio_domain_name_lookup(remote_host);
     681          57 :     netio_buffered_listen_tcp(&socket->recv_socket, (const char*)lookedup_hostname, 0);
     682             : 
     683          57 :     socket->msg.port=socket->recv_socket.listen_socket.port;
     684             : 
     685          57 :     socket->remote_hostname = strdup((const char*)lookedup_remote_hostname);
     686          57 :     socket->remote_port = remote_port;
     687          57 :     socket->total_tags = 0;
     688          57 :     free(lookedup_remote_hostname);
     689          57 :     free(lookedup_hostname);
     690          57 : }
     691             : 
     692             : 
     693             : /**
     694             :  * Initializes a buffered subscribe socket.
     695             :  *
     696             :  * @param socket: The buffered subscribe socket to initialize
     697             :  * @param ctx: The NetIO context in which to initialize the socket
     698             :  * @param attr: Buffered connection settings to be used for the underlying connections
     699             :  * @param hostname: Hostname or IP address of the local interface to bind to
     700             :  * @param remote_host: Hostname or IP of the remote publish socket
     701             :  * @param remote_port: Port of the remote publish socket
     702             :  *
     703             :  * @see `netio_buffered_send_socket_init` for a description of the connection parameters
     704             :  */
     705             : void
     706         156 : netio_subscribe_socket_init(struct netio_subscribe_socket* socket,
     707             :                             struct netio_context* ctx,
     708             :                             struct netio_buffered_socket_attr* attr,
     709             :                             const char* hostname,
     710             :                             const char* remote_host,
     711             :                             unsigned remote_port)
     712             : {
     713         156 :   if (netio_tcp_mode(hostname)) {
     714          26 :     netio_subscribe_tcp_socket_init(socket, ctx, attr, netio_hostname(hostname), netio_hostname(remote_host), remote_port);
     715             :   } else {
     716         130 :     netio_subscribe_libfabric_socket_init(socket, ctx, attr, netio_hostname(hostname), netio_hostname(remote_host), remote_port);
     717             :   }
     718         156 : }
     719             : 
     720             : /**
     721             :  * Initialize a netio_subscription_cache object.
     722             :  *
     723             :  * @param cache: The cache to be initialized
     724             :  */
     725             : void
     726    25165824 : netio_subscription_cache_init(struct netio_subscription_cache* cache)
     727             : {
     728    25165824 :   cache->ts = 0;
     729    25165824 :   cache->count = 0;
     730    25165824 :   cache->idx_start = 0;
     731    25165824 : }
     732             : 
     733             : 
     734             : static unsigned
     735       10144 : lookup_tag(struct netio_subscription_table* table, netio_tag_t tag, unsigned* start)
     736             : {
     737       10144 :   struct netio_subscription key;
     738       10144 :   key.tag = tag;
     739       20288 :   struct netio_subscription* ptr = (struct netio_subscription*)bsearch(&key,
     740       10144 :                                                                        table->subscriptions,
     741             :                                                                        table->num_subscriptions,
     742             :                                                                        sizeof(struct netio_subscription),
     743             :                                                                        cmp_subscription);
     744       10144 :   if(ptr == NULL) {
     745             :     return 0;
     746             :   }
     747       10123 :   unsigned start_idx = ptr - table->subscriptions;
     748       10124 :         while(start_idx > 0 && table->subscriptions[start_idx-1].tag == tag) {
     749             :                 start_idx--;
     750             :         }
     751             :   unsigned count = 0;
     752       20247 :   for(unsigned int i=start_idx; i<table->num_subscriptions; i++) {
     753       10125 :     if(table->subscriptions[i].tag == tag) {
     754       10124 :       count++;
     755             :     } else {
     756             :       break;
     757             :     }
     758             :   }
     759       10123 :   *start = start_idx;
     760       10123 :   return count;
     761             : }
     762             : 
     763             : static unsigned
     764   606002965 : lookup_tag_cached(struct netio_subscription_table* table, netio_tag_t tag, struct netio_subscription_cache* cache, unsigned* start)
     765             : {
     766   606002965 :   if(cache->ts != table->ts) {
     767          42 :     cache->count = lookup_tag(table, tag, &cache->idx_start);
     768          42 :     cache->ts = table->ts;
     769             :   }
     770   606002965 :   *start = cache->idx_start;
     771   606002965 :   return cache->count;
     772             : }
     773             : 
     774             : /**
     775             :  * Publishes a message under a given tag
     776             :  *
     777             :  * @param socket:  The socket to publish on
     778             :  * @param tag:     The tag under which to publish
     779             :  * @param data:    Message data
     780             :  * @param len:     Message size
     781             :  * @param flags:   NETIO_REENTRY  publishing of this message was attempted before and
     782             :  *                 resulted in NETIO_STATUS_AGAIN. Calling publish with
     783             :  *                 this flag will only send on connections where the
     784             :  *                 message was previously unpublished.
     785             :  * @param cache:   Optional user-supplied cache for the subscription table lookup.
     786             :  */
     787             : int
     788   606013410 : netio_buffered_publishi(struct netio_publish_socket* socket, struct iovec * iov, size_t len, int flags, struct netio_subscription_cache* cache)
     789             : {
     790   606013410 :   int ret = NETIO_STATUS_OK;
     791   606013410 :   netio_tag_t tag = *(netio_tag_t*)iov[0].iov_base;
     792   606013410 :   unsigned start_idx;
     793   606013410 :   unsigned num_subscriptions;
     794             : 
     795   606013410 :   if(cache) {
     796   606003308 :     num_subscriptions = lookup_tag_cached(&socket->subscription_table, tag, cache, &start_idx);
     797             :   } else {
     798       10102 :     num_subscriptions = lookup_tag(&socket->subscription_table, tag, &start_idx);
     799             :   }
     800   606013155 :   if(num_subscriptions == 0) return NETIO_STATUS_OK_NOSUB;
     801             : 
     802   120494182 :   for(unsigned i=start_idx; i<start_idx+num_subscriptions; i++) {
     803    62617941 :     struct netio_subscription* subscription = &socket->subscription_table.subscriptions[i];
     804    62617941 :     if(subscription->tag == tag) {
     805             : 
     806             :       // skip connections that were already successful if we are in reentry mode
     807    62618081 :       if(flags & NETIO_REENTRY) {
     808      219219 :         if(subscription->again == 0) {
     809         116 :           continue;
     810             :         }
     811             :       }
     812             : 
     813    62617965 :       int result = netio_buffered_sendv(subscription->socket, iov, len);
     814    62617185 :       log_dbg("Sending iov on subscription->socket, result=%d",result);
     815             : 
     816    62617185 :       if(result == NETIO_STATUS_OK) {
     817    62398221 :         subscription->again = 0;
     818      218964 :       } else if(result == NETIO_STATUS_AGAIN) {
     819      218964 :         subscription->again = 1;
     820      218964 :         ret = NETIO_STATUS_AGAIN;
     821             :       }
     822           0 :       else if(result == NETIO_STATUS_TOO_BIG) {
     823           0 :         subscription->again = 0;
     824           0 :         ret = NETIO_STATUS_TOO_BIG;
     825             :       }
     826             :       else {
     827             :         return result; // some error occured and we return immediately
     828             :       }
     829             :     }
     830             :   }
     831             :   return ret;
     832             : }
     833             : 
     834             : 
     835             : /**
     836             :  * Publishes a message under a given tag
     837             :  *
     838             :  * @param socket:  The socket to publish on
     839             :  * @param tag:     The tag under which to publish
     840             :  * @param data:    Message data
     841             :  * @param len:     Message size
     842             :  * @param flags:   NETIO_REENTRY  publishing of this message was attempted before and
     843             :  *                 resulted in NETIO_STATUS_AGAIN. Calling publish with
     844             :  *                 this flag will only send on connections where the
     845             :  *                 message was previously unpublished.
     846             :  * @param cache:   Optional user-supplied cache for the subscription table lookup.
     847             :  */
     848             : int
     849       10102 : netio_buffered_publish(struct netio_publish_socket* socket, netio_tag_t tag, void* data, size_t len, int flags, struct netio_subscription_cache* cache)
     850             : {
     851       10102 :   log_trc("netio_buffered_publish (size=%lu)", len);
     852             : 
     853       10102 :   struct iovec iov[2];
     854       10102 :   iov[0].iov_base = &tag;
     855       10102 :   iov[0].iov_len = sizeof(netio_tag_t);
     856       10102 :   iov[1].iov_base = data;
     857       10102 :   iov[1].iov_len = len;
     858             : 
     859       10102 :   return netio_buffered_publishi(socket, iov, 2, flags, cache);
     860             : }
     861             : 
     862             : 
     863             : int
     864   605990356 : netio_buffered_publishv(struct netio_publish_socket* socket, netio_tag_t tag, struct iovec * iov, size_t len, int flags, struct netio_subscription_cache* cache)
     865             : {
     866   605990356 :   struct iovec iovv[NETIO_MAX_IOV_LEN + 1];
     867             :   //NETIO_MAX_IOV_LEN is not a limititation, as all entries will be copied in one in netio_buffered_sendv
     868   605990356 :   iovv[0].iov_base = &tag;
     869   605990356 :   iovv[0].iov_len = sizeof(netio_tag_t);
     870   605990356 :   size_t size = (len > NETIO_MAX_IOV_LEN) ?  NETIO_MAX_IOV_LEN : len;
     871   605990356 :   memcpy(iovv + 1, iov, size * sizeof(struct iovec));
     872   605990356 :   return netio_buffered_publishi(socket, iovv, (size + 1), flags, cache);
     873             : }
     874             : 
     875             : 
     876             : /**
     877             :  * Flushes buffers on all connections of a given publish socket for a certain tag.
     878             :  *
     879             :  * @param socket The buffered publish socket
     880             :  * @param tag The message tag
     881             :  * @param cache An optional subscription cache object
     882             :  */
     883             : void
     884    14160353 : netio_buffered_publish_flush(struct netio_publish_socket* socket, netio_tag_t tag, struct netio_subscription_cache* cache)
     885             : {
     886    15211103 :   for(unsigned i=0; i<socket->subscription_table.num_subscriptions; i++) {
     887     1050750 :     struct netio_subscription* subscription = &socket->subscription_table.subscriptions[i];
     888     1050750 :     netio_buffered_flush(subscription->socket);
     889             :   }
     890    14160353 : }
     891             : 
     892             : 
     893             : /**
     894             :  * Subscribe to a given message tag.
     895             :  *
     896             :  * For a given subscribe socket, `netio_subscribe` can be called multiple times.
     897             :  *
     898             :  * @param socket: The buffered subscribe socket.
     899             :  * @param tag: The subscription tag.
     900             :  */
     901             : int
     902        1242 : netio_subscribe(struct netio_subscribe_socket* socket, netio_tag_t tag)
     903             : {
     904        1242 :     if(socket->state == NONE) {
     905             :       //A new socket is created and the on subsocket_on_connection_established
     906             :       //will trigger the actual subscriptions.
     907         187 :       log_dbg("Creating and connecting a new send_socket");
     908         187 :       if (socket->tcp_fi_mode  == NETIO_MODE_LIBFABRIC) {
     909         130 :         netio_init_send_socket(&socket->socket, socket->ctx);
     910         130 :         socket->socket.usr = socket;
     911         130 :         socket->socket.cb_connection_established = subsocket_on_connection_established;
     912         130 :         socket->socket.cb_connection_closed = subsocket_on_send_connection_closed;
     913         130 :         socket->socket.cb_error_connection_refused = subsocket_on_error_connection_refused;
     914         130 :         socket->socket.cb_send_completed = subsocket_on_send_completed;
     915         130 :         netio_connect(&socket->socket, socket->remote_hostname, socket->remote_port);
     916          57 :       } else if (socket->tcp_fi_mode  == NETIO_MODE_TCP) {
     917          57 :          netio_init_send_tcp_socket(&socket->socket, socket->ctx);
     918          57 :          socket->socket.usr = socket;
     919          57 :          socket->socket.cb_connection_established = subsocket_on_connection_established;
     920          57 :          socket->socket.cb_connection_closed = subsocket_on_send_connection_closed;
     921          57 :          socket->socket.cb_error_connection_refused = subsocket_on_error_connection_refused;
     922          57 :          socket->socket.cb_send_completed = subsocket_on_send_completed;
     923          57 :          netio_connect_tcp(&socket->socket, socket->remote_hostname, socket->remote_port);
     924             :       }
     925         187 :       socket->state = INITIALIZED;
     926             :     }
     927             : 
     928        4267 :     for(unsigned i=0; i<socket->total_tags; i++) {
     929        3025 :       if(socket->tags_to_subscribe[i] == tag) {
     930             :         return 0;
     931             :       }
     932             :     }
     933             : 
     934             :     //if send socket connected send message
     935             :     //otherwise on_connection_established will do it
     936        1242 :     if (socket->socket.state){
     937         865 :       log_info("Sending subscription message for tag 0x%lx", tag);
     938         865 :       int ret = send_subscription_message(socket, tag, NETIO_SUBSCRIBE);
     939         865 :       return ret;
     940             :     } else {
     941         377 :       log_info("Queing subscription message for txg 0x%lx", tag);
     942         377 :       socket->tags_to_subscribe[socket->total_tags] = tag;
     943         377 :       socket->total_tags++;
     944         377 :       return 0;
     945             :     }
     946             : }
     947             : 
     948             : 
     949             : static int
     950           0 : remove_tag_to_subscribe(struct netio_subscribe_socket* socket, netio_tag_t tag)
     951             : {
     952           0 :   int found = 0;
     953           0 :   for(unsigned int i=0; i<socket->total_tags; ++i){
     954           0 :     if(socket->tags_to_subscribe[i] == tag){
     955           0 :       log_info("Removing tag 0x%lx from tags_to_subscribe of socket %p", socket);
     956           0 :       for(unsigned int j = i; j < socket->total_tags-1; ++j){
     957           0 :         socket->tags_to_subscribe[j] = socket->tags_to_subscribe[j+1];
     958             :       }
     959           0 :       found = 1;
     960           0 :       socket->total_tags--;
     961           0 :       break;
     962             :     }
     963             :   }
     964           0 :   if(found == 0){
     965           0 :     log_warn("tag 0x%lx expected in tags_to_subscribe of socket %p not found", tag);
     966             :   }
     967           0 :   return NETIO_STATUS_OK;
     968             : }
     969             : 
     970             : 
     971             : /**
     972             :  * Unsubscribe from a given message tag.
     973             :  *
     974             :  * For a given subscribe socket, `netio_unsubscribe` can be called multiple times.
     975             :  *
     976             :  * @param socket: The subscribe socket.
     977             :  * @param tag: The tag to unsubscribe from.
     978             :  */
     979             : int
     980        1163 : netio_unsubscribe(struct netio_subscribe_socket* socket, netio_tag_t tag)
     981             : {
     982        1163 :   int ret = NETIO_STATUS_OK;
     983        1163 :   if(socket->state == INITIALIZED) {
     984        1163 :     log_dbg("Subscribe socket initialised, can proceed with usubscription");
     985        1163 :     if (socket->socket.state) {
     986        1163 :       ret = send_subscription_message(socket, tag, NETIO_UNSUBSCRIBE);
     987        1163 :       log_dbg("Unsubscription message for tag 0x%lx sent, return code %d", tag, ret);
     988             :     } else {
     989           0 :       ret = remove_tag_to_subscribe(socket, tag);
     990             :     }
     991             :   } else {
     992        1163 :     log_dbg("The connection has been already closed.");
     993             :   }
     994        1163 :   return ret;
     995             : }
     996             : 
     997             : /**
     998             :  * Returns the minimum number of netio pages across buffered send sockets
     999             :  * associated to a publish socket
    1000             :  *
    1001             :  * @param socket: The publish socket.
    1002             :  */
    1003             : unsigned
    1004          83 : netio_pubsocket_get_minimum_pages(struct netio_publish_socket* socket)
    1005             : {
    1006          83 :   if (!socket) {
    1007             :     return 0;
    1008             :   }
    1009             : 
    1010          83 :   size_t pages = socket->attr.num_pages;
    1011          83 :   struct netio_socket_list* itr = socket->subscription_table.socket_list;
    1012             : 
    1013         122 :   while(itr != NULL){
    1014          39 :     struct netio_buffered_send_socket* buf_send_socket = (struct netio_buffered_send_socket*)itr->socket;
    1015          39 :     uint64_t socket_pages = buf_send_socket->buffers.available_buffers;
    1016          39 :     if(socket_pages < pages){
    1017             :       pages = socket_pages;
    1018             :     }
    1019          39 :     itr = itr->next;
    1020             :   }
    1021          83 :   return pages;
    1022             : }
    1023             : 

Generated by: LCOV version 1.0