LCOV - code coverage report
Current view: top level - netio-next/src - pubsub.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 331 376 88.0 %
Date: 2025-06-10 03:23:28 Functions: 30 31 96.8 %

          Line data    Source code
       1             : #include <stdio.h>
       2             : #include <string.h>
       3             : #include <assert.h>
       4             : #include <netdb.h>
       5             : #include "log.h"
       6             : #include "netio/netio.h"
       7             : 
       8             : #if defined DEBUG || defined DEBUG_PUB
       9             : #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
      10             : #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
      11             : #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
      12             : #else
      13             : #define log_dbg(...)
      14             : #define log_trc(...)
      15             : #endif
      16             : 
      17             : 
      18             : char*
      19         686 : netio_domain_name_lookup(const char* domain_name)
      20             : {
      21         686 :   if(!domain_name) return NULL;
      22         686 :   struct sockaddr_in sock_address;
      23         686 :   char* ip_address = (char*)malloc(sizeof(char) * 17);
      24         686 :   int is_ip_address = inet_pton(AF_INET, domain_name, &(sock_address.sin_addr));
      25         686 :   if(!is_ip_address)
      26             :   {
      27           8 :     struct hostent* host = gethostbyname(domain_name);
      28           8 :     if(host)
      29             :     {
      30           4 :       strcpy(ip_address, inet_ntoa(*(struct in_addr *)host->h_addr));
      31             :     }
      32             :     else
      33             :     {
      34           4 :       char* _domain_name = malloc(sizeof(char) * (strlen(domain_name) + 1));
      35           4 :       strcpy(_domain_name, domain_name);
      36           4 :       log_error("The host cannot be resolved. Domain name set to %s", _domain_name);
      37           4 :       free(ip_address);
      38           4 :       return _domain_name;
      39             :     }
      40             :   }
      41             :   else
      42             :   {
      43         678 :     strcpy(ip_address, domain_name);
      44             :   }
      45             : 
      46             :   return ip_address;
      47             : }
      48             : 
      49             : static int
      50       10144 : cmp_subscription(const void* a, const void *b)
      51             : {
      52       10144 :   struct netio_subscription* suba = (struct netio_subscription*)a;
      53       10144 :   struct netio_subscription* subb = (struct netio_subscription*)b;
      54             : 
      55       10144 :         if(suba->tag == subb->tag) {
      56             :                 return 0;
      57             :         }
      58           1 :         return suba->tag > subb->tag ? 1 : -1;
      59             : }
      60             : 
      61             : static int
      62          44 : table_add_subscription(struct netio_subscription_table* table, netio_tag_t tag, struct netio_buffered_send_socket* socket)
      63             : {
      64          44 :     if(table->num_subscriptions == table->size) {
      65           0 :       log_error("Maximum number of subscriptions. New subscription for %lx dropped.", tag);
      66           0 :       return 1;
      67             :     }
      68             : 
      69             :     // TODO need to keep the list sorted
      70          44 :     table->subscriptions[table->num_subscriptions].tag = tag;
      71          44 :     table->subscriptions[table->num_subscriptions].socket = socket;
      72          44 :     table->subscriptions[table->num_subscriptions].again = 0;
      73             : 
      74             :     log_dbg("Adding connection in subscription table, tag=%lu, socket=%p",
      75             :       table->subscriptions[table->num_subscriptions].tag,
      76             :       table->subscriptions[table->num_subscriptions].socket
      77          44 :     );
      78             : 
      79          44 :     table->num_subscriptions++;
      80          44 :     table->ts++;
      81          44 :     log_info("New entry in buffered subscription table, tag=%lu in hex 0x%lx, socket=%p, total #subscription=%lu", tag, tag, socket, table->num_subscriptions);
      82          44 :     qsort(table->subscriptions, table->num_subscriptions, sizeof(struct netio_subscription), cmp_subscription);
      83          44 :     return 0;
      84             : }
      85             : 
      86             : 
      87             : /**
      88             :  * @brief Handle an unsubscription or a client disconnection.
      89             :  *
      90             :  * @param netio_subscription_table: the table of active subscriptions
      91             :  * @param socket:                   the unbuffered send socket used to send the data to the subscriber
      92             :  * @param tag:                      the tag for which an unsubscribe request has been received
      93             :  * @param closed_connection:        a flag to enable the removal of all the subscriptions associated to
      94             :  *                                  the send socket in response to a closed connection.
      95             :  */
      96             : static void
      97          40 : table_remove_subscription(struct netio_subscription_table* table, struct netio_buffered_send_socket* socket, netio_tag_t tag, uint8_t closed_connection)
      98             : {
      99          40 :   log_dbg("Total subscriptions: %lu", table->num_subscriptions);
     100          40 :   unsigned i=0;
     101          40 :   unsigned remaining_subscriptions_of_socket=0;
     102          83 :   for(unsigned int k = 0; k < table->num_subscriptions; ++k) {
     103          43 :     if(table->subscriptions[k].socket == socket){remaining_subscriptions_of_socket++;}
     104             :   }
     105             : 
     106             : 
     107          83 :   while(i<table->num_subscriptions) {
     108          43 :     if(table->subscriptions[i].socket == socket && (table->subscriptions[i].tag == tag || closed_connection)) {
     109             :       log_dbg("Removing entry in subscription table, tag=%lu, socket=%p index %lu becomes %u",
     110             :                 table->subscriptions[i].tag,
     111             :                 table->subscriptions[i].socket,
     112             :                 table->num_subscriptions-1,
     113          40 :                 i);
     114          40 :       table->subscriptions[i].tag = table->subscriptions[table->num_subscriptions-1].tag;
     115          40 :       table->subscriptions[i].socket = table->subscriptions[table->num_subscriptions-1].socket;
     116          40 :       table->subscriptions[i].again = table->subscriptions[table->num_subscriptions-1].again;
     117          40 :       table->num_subscriptions--;
     118          40 :       remaining_subscriptions_of_socket--;
     119          40 :       table->ts++;
     120             :     }
     121             :     else{
     122           3 :       i++;
     123          83 :       log_dbg("Skipping tag=%lu because the socket is %p", table->subscriptions[i].tag, table->subscriptions[i].socket);
     124             :     }
     125             :   }
     126          40 :   qsort(table->subscriptions, table->num_subscriptions, sizeof(struct netio_subscription), cmp_subscription);
     127          40 :   log_dbg("Remaining subscriptions: %lu",table->num_subscriptions);
     128          40 :   log_info("Removing entry from buffered subscription table, Fid=0x%lx, socket=%p, total #subscription=%lu", tag, socket, table->num_subscriptions);
     129          40 :   if(closed_connection==0 && remaining_subscriptions_of_socket==0){
     130          33 :     log_info("Disconnecting endpoint with zero subscriptions");
     131          33 :     netio_disconnect(&socket->send_socket);
     132             :   }
     133          40 : }
     134             : 
     135             : 
     136             : static int
     137          39 : table_get_subs_by_socket(struct netio_subscription_table* table, struct netio_buffered_send_socket* socket)
     138             : {
     139          39 :   log_dbg("Subscription table lookup: Total subscriptions: %lu", table->num_subscriptions);
     140          39 :   unsigned remaining_subscriptions_of_socket=0;
     141          45 :   for(unsigned int k = 0; k < table->num_subscriptions; ++k) {
     142           6 :     if(table->subscriptions[k].socket == socket){remaining_subscriptions_of_socket++;}
     143             :   }
     144          39 :   log_dbg("Subscription table lookup: for socket %p: %lu", socket, remaining_subscriptions_of_socket);
     145          39 :   return remaining_subscriptions_of_socket;
     146             : }
     147             : 
     148             : static void
     149      214519 : on_buffer_available(void* ptr)
     150             : {
     151      214519 :   log_trc("a buffer became available, calling callback");
     152      214519 :   struct netio_publish_socket* socket = (struct netio_publish_socket*)ptr;
     153      214519 :   if(socket->cb_buffer_available) {
     154      214519 :     socket->cb_buffer_available(socket);
     155             :   }
     156      214519 : }
     157             : 
     158             : 
     159             : static void
     160          43 : pubsocket_on_connection_established(struct netio_buffered_send_socket* socket)
     161             : {
     162          43 :         log_dbg("publish socket established connection to remote, can publish now");
     163          43 :         struct netio_publish_socket* pubsocket = (struct netio_publish_socket*)socket->pub_socket;
     164             :   //add deferred subscriptions to the table
     165          43 :   struct deferred_subscription* sub = socket->send_socket.deferred_subs;
     166          87 :   while(sub){
     167          44 :     int ret = table_add_subscription(&pubsocket->subscription_table, sub->tag, socket);
     168          44 :     if(pubsocket->cb_subscribe && ret == 0){
     169          44 :       pubsocket->cb_subscribe(pubsocket, sub->tag, NULL, 0);
     170             :     }
     171          44 :     pop_subscription(&sub);
     172             :   }
     173             :   //user callback
     174          43 :   if(pubsocket->cb_connection_established) {
     175           8 :     pubsocket->cb_connection_established(pubsocket);
     176             :   }
     177          43 : }
     178             : 
     179             : 
     180             : static void
     181          39 : pubsocket_on_connection_closed(struct netio_buffered_send_socket* socket)
     182             : {
     183          39 :   log_dbg("published socket with buffered send socket %p and send socket %p: connection to remote was closed", socket, &socket->send_socket);
     184          39 :   struct netio_publish_socket* pubsocket = (struct netio_publish_socket*)socket->pub_socket;
     185          39 :   if(pubsocket->cb_connection_closed) {
     186           6 :     pubsocket->cb_connection_closed(pubsocket);
     187             :   }
     188             :   //Only if the connection was closed without unsubscribing first.
     189          78 :   if (table_get_subs_by_socket(&pubsocket->subscription_table, socket)){
     190           6 :     log_dbg("Removing a send socket for which a connection was closed without unsubscribing first");
     191           6 :     uint8_t connection_closed = 1;
     192           6 :     table_remove_subscription(&pubsocket->subscription_table, socket, 0, connection_closed);
     193             :   }
     194          39 : }
     195             : 
     196             : 
     197             : static struct netio_buffered_send_socket*
     198          44 : socket_list_add_or_lookup(struct netio_publish_socket* pubsocket, struct netio_socket_list** list, void* addr, size_t addrlen, struct netio_context* ctx, struct netio_buffered_socket_attr* attr)
     199             : {
     200          44 :   if(addrlen == 0) {
     201             :     return NULL;
     202             :   }
     203             : 
     204          44 :   struct netio_socket_list* entry = find_socket_by_address(*list, addr, addrlen);
     205             : 
     206          44 :   if ( entry == NULL ) {
     207          43 :     entry = add_socket_with_address(list, BSEND, addr, addrlen);
     208          43 :     struct netio_buffered_send_socket* bufsocket = entry->socket;
     209          43 :     netio_buffered_send_socket_init(bufsocket, ctx, attr);
     210          43 :     bufsocket->pub_socket = pubsocket;
     211          43 :     bufsocket->cb_connection_established = pubsocket_on_connection_established;
     212          43 :     bufsocket->cb_connection_closed = pubsocket_on_connection_closed;
     213             : 
     214          43 :     netio_buffered_connect_rawaddr(bufsocket, entry->addr, entry->addrlen);
     215          43 :     bufsocket->signal_buffer_available.data = pubsocket;
     216          43 :     bufsocket->signal_buffer_available.cb = on_buffer_available;
     217             :   }
     218          44 :   struct netio_buffered_send_socket* ss = (struct netio_buffered_send_socket*)entry->socket;
     219          44 :   return ss;
     220             : }
     221             : 
     222             : 
     223             : static void
     224          44 : subscribe(struct netio_publish_socket* pubsocket, struct netio_recv_socket* recv_socket, void* addr, size_t addrlen, netio_tag_t tag)
     225             : {
     226          44 :   struct netio_buffered_send_socket* socket = socket_list_add_or_lookup(pubsocket, &pubsocket->subscription_table.socket_list, addr, addrlen, pubsocket->ctx, &pubsocket->attr);
     227             : 
     228          44 :   if(socket->send_socket.recv_socket == NULL){
     229          43 :     socket->send_socket.recv_socket = recv_socket;
     230             :   }
     231             : 
     232          44 :   if (socket->send_socket.state == CONNECTED){
     233           0 :     table_add_subscription(&pubsocket->subscription_table, tag, socket);
     234           0 :     if(pubsocket->cb_subscribe) {
     235           0 :       pubsocket->cb_subscribe(pubsocket, tag, addr, addrlen);
     236             :     }
     237             :   } else {
     238          44 :     push_back_subscription(&socket->send_socket.deferred_subs, tag);
     239             :   }
     240          44 : }
     241             : 
     242             : 
     243             : static void
     244          34 : unsubscribe(struct netio_publish_socket* pubsocket, void* addr, size_t addrlen, netio_tag_t tag)
     245             : {
     246          34 :   struct netio_socket_list* list = find_socket_by_address(pubsocket->subscription_table.socket_list, addr, addrlen);
     247          34 :   if(list == NULL){return;}
     248             : 
     249          34 :   struct netio_buffered_send_socket* socket = (struct netio_buffered_send_socket*)list->socket;
     250          34 :   uint8_t connection_closed = 0;
     251          34 :   table_remove_subscription(&pubsocket->subscription_table, socket, tag, connection_closed);
     252          34 :   pubsocket->subscription_table.ts++;
     253             : 
     254          34 :   if(pubsocket->cb_unsubscribe) {
     255           2 :     pubsocket->cb_unsubscribe(pubsocket, tag, addr, addrlen);
     256             :   }
     257             : 
     258             : }
     259             : 
     260             : 
     261             : static void
     262          43 : lsocket_on_connection_established(struct netio_recv_socket* socket)
     263             : {
     264          43 :   log_dbg("Buffered listen socket: on connection establsihed");
     265          43 : }
     266             : 
     267             : 
     268             : static void
     269          78 : parse_subscription_message(struct netio_publish_socket* socket, struct netio_subscription_message* msg, struct netio_recv_socket* recv_socket)
     270             : {
     271          78 :     if (msg->action){
     272          44 :       log_dbg("subscription request for tag %lu, addrlen=%lu", msg->tag, msg->addrlen);
     273          44 :       subscribe(socket, recv_socket, msg->addr, msg->addrlen, msg->tag);
     274             :     }
     275             :     else{
     276          34 :       log_dbg("unsubscription request for tag %lu, addrlen=%lu", msg->tag, msg->addrlen);
     277          34 :       unsubscribe(socket, msg->addr, msg->addrlen, msg->tag);
     278             :     }
     279          78 : }
     280             : 
     281             : 
     282             : static void
     283          78 : lsocket_on_msg_received(struct netio_recv_socket* socket, struct netio_buffer* buf, void* data, size_t len)
     284             : {
     285          78 :     log_dbg("message received by recv socket %p", socket);
     286          78 :     struct netio_publish_socket* pubsocket = (struct netio_publish_socket*)socket->lsocket->usr;
     287          78 :     if(len != sizeof(struct netio_subscription_message)) {
     288           0 :       log_error("Illegal subscription message size %lu. Did you confuse tohost and from host links", len);
     289           0 :       netio_post_recv(socket, buf);
     290           0 :       return;
     291             :     }
     292          78 :     parse_subscription_message(pubsocket, (struct netio_subscription_message*)buf->data, socket);
     293          78 :     netio_post_recv(socket, buf);
     294             : }
     295             : 
     296             : 
     297             : static int
     298        2534 : send_subscription_message(struct netio_subscribe_socket* socket, netio_tag_t tag, int action)
     299             : {
     300        2534 :     if( action == NETIO_SUBSCRIBE ){
     301        1296 :       log_info("Sending subscription for tag 0x%lx", tag);
     302        1238 :     } else if ( action == NETIO_UNSUBSCRIBE ){
     303        1238 :       log_info("Sending unsubscription for tag 0x%lx", tag);
     304             :     } else {
     305           0 :       log_error("Invalid subscription action %d", action);
     306           0 :       return 0;
     307             :     }
     308        2534 :     socket->msg.tag = tag;
     309        2534 :     socket->msg.action = action;
     310        2534 :     socket->buf.data = &socket->msg;
     311        2534 :     socket->buf.size = sizeof(struct netio_subscription_message);
     312        2534 :     int ret = netio_send_inline_buffer(&socket->socket, &socket->buf);
     313        2534 :     return ret;
     314             : }
     315             : 
     316             : 
     317             : static void
     318         227 : subsocket_on_connection_established(struct netio_send_socket* socket)
     319             : {
     320         227 :     log_dbg("subsocket connection established");
     321         227 :     int ret = 0;
     322         227 :     struct netio_subscribe_socket* subscribe_socket = (struct netio_subscribe_socket*)socket->usr;
     323             : 
     324         227 :     if(subscribe_socket->total_tags == 0){
     325           0 :       log_info("Closing send connection again because there is no tag to subscribe to.");
     326           0 :       netio_disconnect(socket);
     327             :     }
     328             : 
     329         227 :     subscribe_socket->msg.addrlen = NETIO_MAX_ADDRLEN;
     330         227 :     if((ret = fi_getname(&subscribe_socket->recv_socket.listen_socket.pep->fid,
     331         227 :                         subscribe_socket->msg.addr,
     332             :                         &subscribe_socket->msg.addrlen)) != 0) {
     333           0 :       log_fatal("Cannot get information on local interface %d: %s", ret, fi_strerror(-ret));
     334           0 :       exit(1);
     335             :     }
     336             : 
     337         227 :     subscribe_socket->buf.data = &subscribe_socket->msg;
     338         227 :     subscribe_socket->buf.size = sizeof(struct netio_subscription_message);
     339         227 :     netio_register_send_buffer(socket, &subscribe_socket->buf, 0);
     340             : 
     341             :     // send tags one by one
     342         809 :     while(subscribe_socket->total_tags > 0){
     343         582 :       size_t idx = subscribe_socket->total_tags - 1;
     344         582 :       netio_tag_t tag = subscribe_socket->tags_to_subscribe[idx];
     345         582 :       log_info("subsocket_on_connection_established, sending subscription for tag 0x%lx pos %lu", tag, idx);
     346         582 :       ret = send_subscription_message(subscribe_socket, tag, NETIO_SUBSCRIBE);
     347         582 :       if (ret == NETIO_STATUS_OK){
     348         582 :         subscribe_socket->total_tags--;
     349             :       } else {
     350           0 :         log_warn("subsocket_on_connection_established: send_subscription_message returned %d", ret);
     351           0 :         break;
     352             :       }
     353             :     }
     354         227 : }
     355             : 
     356             : static void
     357         136 : subsocket_on_send_connection_closed(struct netio_send_socket* socket)
     358             : {
     359         136 :     log_dbg("subsocket_on_send_connection_closed callback");
     360         136 :     struct netio_subscribe_socket* subscribe_socket = (struct netio_subscribe_socket*)socket->usr;
     361         136 :     subscribe_socket->state = NONE;
     362         136 :     handle_send_socket_shutdown(socket);
     363         136 : }
     364             : 
     365             : 
     366             : static void
     367          59 : subsocket_on_error_connection_refused(struct netio_send_socket* ssocket) {
     368          59 :     log_dbg("subsocket connection refused");
     369          59 :     struct netio_subscribe_socket* subscribe_socket = (struct netio_subscribe_socket*)ssocket->usr;
     370          59 :     subscribe_socket->state = NONE;
     371             : 
     372          59 :     handle_listen_socket_shutdown(&subscribe_socket->recv_socket.listen_socket);
     373          59 :     if(subscribe_socket->cb_error_connection_refused) {
     374          59 :       subscribe_socket->cb_error_connection_refused(subscribe_socket);
     375             :     }
     376          59 : }
     377             : 
     378             : 
     379             : static void
     380        2504 : subsocket_on_send_completed(struct netio_send_socket* socket, uint64_t key)
     381             : {
     382        2504 :     struct netio_subscribe_socket* subscribe_socket = (struct netio_subscribe_socket*)socket->usr;
     383             :     //check for remaining tags from on_connection_established
     384        2504 :     while(subscribe_socket->total_tags > 0){
     385           0 :       size_t idx = subscribe_socket->total_tags - 1;
     386           0 :       netio_tag_t tag = subscribe_socket->tags_to_subscribe[idx];
     387           0 :       int ret = send_subscription_message(subscribe_socket, tag, NETIO_SUBSCRIBE);
     388           0 :       if (ret == NETIO_STATUS_OK){
     389           0 :         subscribe_socket->total_tags--;
     390             :       } else {
     391           0 :         log_warn("subsocket_on_send_completed: send_subscription_message returned %d", ret);
     392           0 :         break;
     393             :       }
     394             :     }
     395        2504 : }
     396             : 
     397             : 
     398             : static void
     399         229 : subscribe_socket_on_connection_established(struct netio_buffered_recv_socket* rsocket)
     400             : {
     401         229 :                 log_dbg("connection to subscribe socket has been established");
     402         229 :                 struct netio_subscribe_socket* socket = (struct netio_subscribe_socket*)rsocket->lsocket->listen_socket.usr;
     403         229 :                 if(socket->cb_connection_established) {
     404         195 :       socket->cb_connection_established(socket);
     405             :     }
     406         229 : }
     407             : 
     408             : 
     409             : static void
     410         168 : subscribe_socket_on_connection_closed(struct netio_buffered_recv_socket* rsocket)
     411             : {
     412         168 :     log_info("connection to subscribe socket has been closed");
     413         168 :     struct netio_subscribe_socket* socket = (struct netio_subscribe_socket*)rsocket->lsocket->listen_socket.usr;
     414         168 :     if(socket->cb_connection_closed) {
     415         168 :       socket->cb_connection_closed(socket);
     416             :     }
     417         168 : }
     418             : 
     419             : 
     420             : static void
     421  3406697491 : subscribe_socket_on_msg_received(struct netio_buffered_recv_socket* rsocket, void* data, size_t len)
     422             : {
     423  3406697491 :     log_trc("buffer received");
     424  3406697491 :     struct netio_subscribe_socket* socket = (struct netio_subscribe_socket*)rsocket->lsocket->listen_socket.usr;
     425             : 
     426  3406697491 :     if(len <= sizeof(netio_tag_t)) {
     427           0 :       log_error("Received invalid message of size %lu. Need %lu bytes for tag, at least 1 of payload", len, sizeof(netio_tag_t));
     428           0 :       return;
     429             :     }
     430             : 
     431  3406697491 :     netio_tag_t tag = *((netio_tag_t*)data);
     432  3406697491 :     if(socket->cb_msg_received) {
     433  3406697491 :       socket->cb_msg_received(socket, tag, (char *)data + sizeof(netio_tag_t), len - sizeof(netio_tag_t));
     434             :     }
     435             : }
     436             : 
     437             : 
     438             : void
     439         149 : netio_subscription_table_init(struct netio_subscription_table* table)
     440             : {
     441         149 :     table->socket_list = NULL;
     442         149 :     table->subscriptions = (struct netio_subscription*)malloc(NETIO_INITIAL_SUBSCRIPTIONS*sizeof(struct netio_subscription));
     443         149 :     table->num_subscriptions = 0;
     444         149 :     table->size = NETIO_INITIAL_SUBSCRIPTIONS;
     445         149 :     table->ts = 0;
     446         149 : }
     447             : 
     448             : 
     449             : /**
     450             :  * Initializes a buffered publish socket.
     451             :  *
     452             :  * @param socket: The buffered publish socket to initialize
     453             :  * @param ctx: The NetIO context in which to initialize the socket
     454             :  * @param hostname: Hostname or IP address to bind to
     455             :  * @param port: Port to bind to
     456             :  * @param attr: Buffered connection settings to be used for the underlying connections
     457             :  *
     458             :  * @see `netio_buffered_send_socket_init` for a description of the connection parameters
     459             :  */
     460             : void
     461         114 : netio_publish_socket_init(struct netio_publish_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffered_socket_attr* attr)
     462             : {
     463         114 :     memset(socket, 0, sizeof(*socket));
     464         114 :     socket->ctx = ctx;
     465         114 :     netio_subscription_table_init(&socket->subscription_table);
     466         114 :     netio_init_listen_socket(&socket->lsocket, ctx, NULL);
     467         114 :     if (attr->num_pages > NETIO_DOMAIN_MAX_MR){
     468           0 :       log_error("Number of requested publisher netio pages %lu exceeds max value. Using %lu.", attr->num_pages, NETIO_DOMAIN_MAX_MR);
     469           0 :       attr->num_pages = NETIO_DOMAIN_MAX_MR;
     470             :     }
     471         114 :     memcpy(&socket->attr, attr, sizeof(struct netio_buffered_socket_attr));
     472         114 :     socket->lsocket.usr = socket;
     473         114 :     socket->lsocket.cb_connection_established = lsocket_on_connection_established;
     474         114 :     socket->lsocket.cb_msg_received = lsocket_on_msg_received;
     475         114 :     socket->lsocket.recv_sub_msg = 1;
     476         114 :     char* _hostname = netio_domain_name_lookup(hostname);
     477         114 :     netio_listen(&socket->lsocket, (const char*)_hostname, port);
     478         114 :     free(_hostname);
     479         114 : }
     480             : 
     481             : 
     482             : /**
     483             :  * Initializes a buffered subscribe socket.
     484             :  *
     485             :  * @param socket: The buffered subscribe socket to initialize
     486             :  * @param ctx: The NetIO context in which to initialize the socket
     487             :  * @param attr: Buffered connection settings to be used for the underlying connections
     488             :  * @param hostname: Hostname or IP address of the local interface to bind to
     489             :  * @param remote_host: Hostname or IP of the remote publish socket
     490             :  * @param remote_port: Port of the remote publish socket
     491             :  *
     492             :  * @see `netio_buffered_send_socket_init` for a description of the connection parameters
     493             :  */
     494             : void
     495         286 : netio_subscribe_socket_init(struct netio_subscribe_socket* socket,
     496             :                             struct netio_context* ctx,
     497             :                             struct netio_buffered_socket_attr* attr,
     498             :                             const char* hostname,
     499             :                             const char* remote_host,
     500             :                             unsigned remote_port)
     501             : {
     502         286 :     memset(socket, 0, sizeof(*socket));
     503         286 :     socket->ctx = ctx;
     504         286 :     socket->state = NONE;
     505         286 :     memcpy(&socket->attr, attr, sizeof(struct netio_buffered_socket_attr));
     506         286 :     netio_buffered_listen_socket_init(&socket->recv_socket, ctx, &socket->attr);
     507         286 :     socket->recv_socket.listen_socket.usr = socket;
     508         286 :     socket->recv_socket.cb_connection_established = subscribe_socket_on_connection_established;
     509         286 :     socket->recv_socket.cb_connection_closed = subscribe_socket_on_connection_closed;
     510         286 :     socket->recv_socket.cb_msg_received = subscribe_socket_on_msg_received;
     511             :     //set cb_buf_received meant only for pileup measurement to NULL
     512         286 :     socket->cb_buf_received = NULL;
     513         286 :     char* lookedup_hostname = netio_domain_name_lookup(hostname);
     514         286 :     char* lookedup_remote_hostname = netio_domain_name_lookup(remote_host);
     515         286 :     netio_buffered_listen(&socket->recv_socket, (const char*)lookedup_hostname, 0);
     516             : 
     517         286 :     socket->remote_hostname = strdup((const char*)lookedup_remote_hostname);
     518         286 :     socket->remote_port = remote_port;
     519         286 :     socket->total_tags = 0;
     520         286 :     free(lookedup_remote_hostname);
     521         286 :     free(lookedup_hostname);
     522         286 : }
     523             : 
     524             : 
     525             : /**
     526             :  * Initialize a netio_subscription_cache object.
     527             :  *
     528             :  * @param cache: The cache to be initialized
     529             :  */
     530             : void
     531        1081 : netio_subscription_cache_init(struct netio_subscription_cache* cache)
     532             : {
     533        1081 :   cache->ts = 0;
     534        1081 :   cache->count = 0;
     535        1081 :   cache->idx_start = 0;
     536        1081 : }
     537             : 
     538             : 
     539             : static unsigned
     540       10166 : lookup_tag(struct netio_subscription_table* table, netio_tag_t tag, unsigned* start)
     541             : {
     542       10166 :   struct netio_subscription key;
     543       10166 :   key.tag = tag;
     544       20332 :   struct netio_subscription* ptr = (struct netio_subscription*)bsearch(&key,
     545       10166 :                                                                        table->subscriptions,
     546             :                                                                        table->num_subscriptions,
     547             :                                                                        sizeof(struct netio_subscription),
     548             :                                                                        cmp_subscription);
     549       10166 :   if(ptr == NULL) {
     550             :     return 0;
     551             :   }
     552       10141 :   unsigned start_idx = ptr - table->subscriptions;
     553       10143 :         while(start_idx > 0 && table->subscriptions[start_idx-1].tag == tag) {
     554             :                 start_idx--;
     555             :         }
     556             :   unsigned count = 0;
     557       20284 :   for(unsigned int i=start_idx; i<table->num_subscriptions; i++) {
     558       10143 :     if(table->subscriptions[i].tag == tag) {
     559       10143 :       count++;
     560             :     } else {
     561             :       break;
     562             :     }
     563             :   }
     564       10141 :   *start = start_idx;
     565       10141 :   return count;
     566             : }
     567             : 
     568             : static unsigned
     569   838293491 : lookup_tag_cached(struct netio_subscription_table* table, netio_tag_t tag, struct netio_subscription_cache* cache, unsigned* start)
     570             : {
     571   838293491 :   if(cache->ts != table->ts) {
     572          62 :     cache->count = lookup_tag(table, tag, &cache->idx_start);
     573          62 :     cache->ts = table->ts;
     574             :   }
     575   838293491 :   *start = cache->idx_start;
     576   838293491 :   return cache->count;
     577             : }
     578             : 
     579             : /**
     580             :  * Publishes a message under a given tag
     581             :  *
     582             :  * @param socket:  The socket to publish on
     583             :  * @param tag:     The tag under which to publish
     584             :  * @param data:    Message data
     585             :  * @param len:     Message size
     586             :  * @param flags:   NETIO_REENTRY  publishing of this message was attempted before and
     587             :  *                 resulted in NETIO_STATUS_AGAIN. Calling publish with
     588             :  *                 this flag will only send on connections where the
     589             :  *                 message was previously unpublished.
     590             :  * @param cache:   Optional user-supplied cache for the subscription table lookup.
     591             :  */
     592             : int
     593   838303595 : netio_buffered_publish(struct netio_publish_socket* socket, netio_tag_t tag, void* data, size_t len, int flags, struct netio_subscription_cache* cache)
     594             : {
     595   838303595 :   int ret = NETIO_STATUS_OK;
     596             : 
     597   838303595 :   log_trc("netio_buffered_publish (size=%lu)", len);
     598             : 
     599   838303595 :   struct iovec iov[2];
     600   838303595 :   iov[0].iov_base = &tag;
     601   838303595 :   iov[0].iov_len = sizeof(netio_tag_t);
     602   838303595 :   iov[1].iov_base = data;
     603   838303595 :   iov[1].iov_len = len;
     604             : 
     605   838303595 :   unsigned start_idx;
     606   838303595 :   unsigned num_subscriptions;
     607             : 
     608   838303595 :   if(cache) {
     609   838293553 :     num_subscriptions = lookup_tag_cached(&socket->subscription_table, tag, cache, &start_idx);
     610             :   } else {
     611       10104 :     num_subscriptions = lookup_tag(&socket->subscription_table, tag, &start_idx);
     612             :   }
     613   838303595 :   if(num_subscriptions == 0) return NETIO_STATUS_OK_NOSUB;
     614             : 
     615   165641420 :   for(unsigned i=start_idx; i<start_idx+num_subscriptions; i++) {
     616    87573513 :     struct netio_subscription* subscription = &socket->subscription_table.subscriptions[i];
     617    87573513 :     if(subscription->tag == tag) {
     618             : 
     619             :       // skip connections that were already successful if we are in reentry mode
     620    87573513 :       if(flags & NETIO_REENTRY) {
     621      431703 :         if(subscription->again == 0) {
     622        3042 :           continue;
     623             :         }
     624             :       }
     625             : 
     626    87570471 :       int result = netio_buffered_sendv(subscription->socket, iov, 2);
     627             : 
     628    87570471 :       if(result == NETIO_STATUS_OK) {
     629    87141790 :         subscription->again = 0;
     630      428681 :       } else if(result == NETIO_STATUS_AGAIN) {
     631      428681 :         subscription->again = 1;
     632      428681 :         ret = NETIO_STATUS_AGAIN;
     633             :       }
     634           0 :       else if(result == NETIO_STATUS_TOO_BIG) {
     635           0 :         subscription->again = 0;
     636           0 :         ret = NETIO_STATUS_TOO_BIG;
     637             :       }
     638             :       else {
     639           0 :         return result; // some error occured and we return immediately
     640             :       }
     641             :     }
     642             :   }
     643             :   return ret;
     644             : }
     645             : 
     646             : 
     647             : /**
     648             :  * Flushes buffers on all connections of a given publish socket for a certain tag.
     649             :  *
     650             :  * @param socket The buffered publish socket
     651             :  * @param tag The message tag
     652             :  * @param cache An optional subscription cache object
     653             :  */
     654             : void
     655     6639790 : netio_buffered_publish_flush(struct netio_publish_socket* socket, netio_tag_t tag, struct netio_subscription_cache* cache)
     656             : {
     657     7440435 :   for(unsigned i=0; i<socket->subscription_table.num_subscriptions; i++) {
     658      800645 :     struct netio_subscription* subscription = &socket->subscription_table.subscriptions[i];
     659      800645 :     netio_buffered_flush(subscription->socket);
     660             :   }
     661     6639790 : }
     662             : 
     663             : 
     664             : /**
     665             :  * Subscribe to a given message tag.
     666             :  *
     667             :  * For a given subscribe socket, `netio_subscribe` can be called multiple times.
     668             :  *
     669             :  * @param socket: The buffered subscribe socket.
     670             :  * @param tag: The subscription tag.
     671             :  */
     672             : int
     673        1355 : netio_subscribe(struct netio_subscribe_socket* socket, netio_tag_t tag)
     674             : {
     675        1355 :     if(socket->state == NONE) {
     676         286 :       log_dbg("Creating and connecting a new send_socket");
     677         286 :       netio_init_send_socket(&socket->socket, socket->ctx);
     678         286 :       socket->socket.usr = socket;
     679         286 :       socket->socket.cb_connection_established = subsocket_on_connection_established;
     680         286 :       socket->socket.cb_connection_closed = subsocket_on_send_connection_closed;
     681         286 :       socket->socket.cb_error_connection_refused = subsocket_on_error_connection_refused;
     682         286 :       socket->socket.cb_send_completed = subsocket_on_send_completed;
     683         286 :       netio_connect(&socket->socket, socket->remote_hostname, socket->remote_port);
     684         286 :       socket->state = INITIALIZED;
     685             :     }
     686             : 
     687        7290 :     for(unsigned i=0; i<socket->total_tags; i++) {
     688        5935 :       if(socket->tags_to_subscribe[i] == tag) {
     689             :         return 0;
     690             :       }
     691             :     }
     692             : 
     693             :     //if send socket connected send message
     694             :     //otherwise on_connection_established will do it
     695        1355 :     if (socket->socket.state){
     696         714 :       log_info("Sending subscription message for tag 0x%lx", tag);
     697         714 :       int ret = send_subscription_message(socket, tag, NETIO_SUBSCRIBE);
     698         714 :       return ret;
     699             :     } else {
     700         641 :       log_info("Queing subscription message for txg 0x%lx", tag);
     701         641 :       socket->tags_to_subscribe[socket->total_tags] = tag;
     702         641 :       socket->total_tags++;
     703         641 :       return 0;
     704             :     }
     705             : }
     706             : 
     707             : 
     708             : static int
     709           0 : remove_tag_to_subscribe(struct netio_subscribe_socket* socket, netio_tag_t tag)
     710             : {
     711           0 :   int found = 0;
     712           0 :   for(unsigned int i=0; i<socket->total_tags; ++i){
     713           0 :     if(socket->tags_to_subscribe[i] == tag){
     714           0 :       log_info("Removing tag 0x%lx from tags_to_subscribe of socket %p", socket);
     715           0 :       for(unsigned int j = i; j < socket->total_tags-1; ++j){
     716           0 :         socket->tags_to_subscribe[j] = socket->tags_to_subscribe[j+1];
     717             :       }
     718           0 :       found = 1;
     719           0 :       socket->total_tags--;
     720           0 :       break;
     721             :     }
     722             :   }
     723           0 :   if(found == 0){
     724           0 :     log_warn("tag 0x%lx expected in tags_to_subscribe of socket %p not found", tag);
     725             :   }
     726           0 :   return NETIO_STATUS_OK;
     727             : }
     728             : 
     729             : 
     730             : /**
     731             :  * Unsubscribe from a given message tag.
     732             :  *
     733             :  * For a given subscribe socket, `netio_unsubscribe` can be called multiple times.
     734             :  *
     735             :  * @param socket: The subscribe socket.
     736             :  * @param tag: The tag to unsubscribe from.
     737             :  */
     738             : int
     739        1238 : netio_unsubscribe(struct netio_subscribe_socket* socket, netio_tag_t tag)
     740             : {
     741        1238 :   int ret = NETIO_STATUS_OK;
     742        1238 :   if(socket->state == INITIALIZED) {
     743        1238 :     log_dbg("Subscribe socket initialised, can proceed with usubscription");
     744        1238 :     if (socket->socket.state) {
     745        1238 :       ret = send_subscription_message(socket, tag, NETIO_UNSUBSCRIBE);
     746        1238 :       log_dbg("Unsubscription message for tag 0x%lx sent, return code %d", tag, ret);
     747             :     } else {
     748           0 :       ret = remove_tag_to_subscribe(socket, tag);
     749             :     }
     750             :   } else {
     751        1238 :     log_dbg("The connection has been already closed.");
     752             :   }
     753        1238 :   return ret;
     754             : }

Generated by: LCOV version 1.0