LCOV - code coverage report
Current view: top level - netio-next/src - unbufpubsub.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 447 559 80.0 %
Date: 2025-08-12 04:15:35 Functions: 39 42 92.9 %

          Line data    Source code
       1             : #include <stdio.h>
       2             : #include <string.h>
       3             : #include <assert.h>
       4             : #include "log.h"
       5             : #include "netio/netio.h"
       6             : #include "netio/netio_tcp.h"
       7             : 
       8             : #include <sys/socket.h>
       9             : #include <netdb.h>
      10             : 
      11             : 
      12             : #define PUBLISH_SOCKET_MAX_COMPLETIONS (512)
      13             : 
      14             : #if defined DEBUG || defined DEBUG_UPUB
      15             : #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
      16             : #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
      17             : #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
      18             : #else
      19             : #define log_dbg(...)
      20             : #define log_trc(...)
      21             : #endif
      22             : 
      23             : static unsigned lookup_tag(struct netio_subscription_table* table, netio_tag_t tag, unsigned* start);
      24             : 
      25             : static int
      26      769996 : cmp_subscription(const void* a, const void *b)
      27             : {
      28      769996 :   struct netio_subscription* suba = (struct netio_subscription*)a;
      29      769996 :   struct netio_subscription* subb = (struct netio_subscription*)b;
      30             : 
      31      769996 :   if(suba->tag == subb->tag) {
      32             :     return 0;
      33             :   }
      34          11 :   return suba->tag > subb->tag ? 1 : -1;
      35             : }
      36             : 
      37             : 
      38             : static int
      39          22 : table_add_subscription(struct netio_subscription_table* table, netio_tag_t tag, struct netio_send_socket* socket)
      40             : {
      41          22 :   if(table->num_subscriptions == table->size) {
      42           0 :     log_error("Maximum number of subscriptions. New subscription for 0x%lx dropped.", tag);
      43           0 :     return 1;
      44             :   }
      45             : 
      46             :   //Check if the subscription is already in the list
      47          22 :   unsigned start;
      48          22 :   unsigned count = lookup_tag(table, tag, &start);
      49          25 :   for(unsigned i=0; i<count; i++) {
      50           3 :     if(table->subscriptions[start+i].socket == socket) {
      51             :       return 0;
      52             :     }
      53             :   }
      54             : 
      55          22 :   table->subscriptions[table->num_subscriptions].tag = tag;
      56          22 :   table->subscriptions[table->num_subscriptions].socket = socket;
      57          22 :   table->subscriptions[table->num_subscriptions].again = 0;
      58          22 :   table->num_subscriptions++;
      59          22 :   table->ts++;
      60          22 :   log_info("New entry in subscription table, tag=%lu in hex 0x%lx, socket=%p, total #subscription=%lu", tag, tag, socket, table->num_subscriptions);
      61          22 :   qsort(table->subscriptions, table->num_subscriptions, sizeof(struct netio_subscription), cmp_subscription);
      62          22 :   return 0;
      63             : }
      64             : 
      65             : 
      66             : /**
      67             :  * @brief Handle an unsubscription or a client disconnection.
      68             :  *
      69             :  * @param pubsocket: netio_unbuffered_publish_socket to which all send sockets are associated.
      70             :  * @param socket: the netio_send_socket not needed anymore
      71             :  * @param tag:                      the tag for which an unsubscribe request has been received
      72             :  * @param closed_connection:        a flag to enable the removal of all the subscriptions associated to
      73             :  *                                  the send socket in response to a closed connection.
      74             :  */
      75             : static void
      76          16 : table_remove_subscription(struct netio_unbuffered_publish_socket* pubsocket, struct netio_send_socket* socket, netio_tag_t tag, uint8_t closed_connection)
      77             : {
      78          16 :   struct netio_subscription_table* table = &pubsocket->subscription_table;
      79          16 :   log_dbg("Total subscriptions: %lu", table->num_subscriptions);
      80             : 
      81          16 :   unsigned i=0;
      82          16 :   unsigned remaining_subscriptions_of_socket=0;
      83          34 :   for(unsigned int k = 0; k < table->num_subscriptions; ++k) {
      84          18 :     if(table->subscriptions[k].socket == socket){remaining_subscriptions_of_socket++;}
      85             :   }
      86             : 
      87          34 :   while(i<table->num_subscriptions) {
      88          18 :     log_dbg("Removing subscription tag  %lx table socket %p socket %p", table->subscriptions[i].tag, table->subscriptions[i].socket, socket);
      89          18 :     if(table->subscriptions[i].socket == socket && (table->subscriptions[i].tag == tag || closed_connection)) {
      90             :       //increment the completion semaphore counter as if messages were sent to the disconnected client
      91             :       struct netio_completion_stack* cs = &pubsocket->completion_stack;
      92        8208 :       for(size_t j=0; j < cs->num_objects; ++j) {
      93        8192 :         if(cs->objects[j].header.tag == table->subscriptions[i].tag && cs->objects[j].key != 0xFFFFFFFFFFFFFFFF){
      94        2022 :           netio_semaphore_increment(&cs->objects[j].sem, 1);
      95             :         }
      96             :       }
      97             : 
      98          16 :       log_dbg("Available completion objects %lu / %lu",  cs->available_objects, cs->num_objects);
      99             :       log_dbg("Removing connection in subscription table, tag=%lu, socket=%p index %lu becomes %u",
     100             :                 table->subscriptions[i].tag,
     101             :                 table->subscriptions[i].socket,
     102             :                 table->num_subscriptions-1,
     103          16 :                 i);
     104          16 :       table->subscriptions[i].tag = table->subscriptions[table->num_subscriptions-1].tag;
     105          16 :       table->subscriptions[i].socket = table->subscriptions[table->num_subscriptions-1].socket;
     106          16 :       table->subscriptions[i].again = table->subscriptions[table->num_subscriptions-1].again;
     107          16 :       table->num_subscriptions--;
     108          16 :       remaining_subscriptions_of_socket--;
     109          16 :       table->ts++;
     110             :     }
     111             :     else{
     112           2 :       i++;
     113          34 :       log_dbg("Skipping tag=%lu because the socket is %p", table->subscriptions[i].tag, table->subscriptions[i].socket);
     114             :     }
     115             :   }
     116             : 
     117          16 :   log_dbg("At the end of table_remove_subscription available completion objects %lu / %lu",  pubsocket->completion_stack.available_objects, pubsocket->completion_stack.num_objects);
     118          16 :   qsort(table->subscriptions, table->num_subscriptions, sizeof(struct netio_subscription), cmp_subscription);
     119          16 :   log_dbg("Remaining subscriptions: %lu",table->num_subscriptions);
     120             : 
     121          16 :   log_info("Removing entry from unbuffered subscription table, Fid=0x%lx, socket=%p, total #subscription=%lu", tag, socket, table->num_subscriptions);
     122             : 
     123          16 :   if(remaining_subscriptions_of_socket==0){
     124          16 :     log_info("No subscriptions remaining");
     125          16 :     if (socket->tcp_fi_mode == NETIO_MODE_TCP){
     126             :       // netio_close_socket(&socket->ctx->evloop,socket,USEND); // TODO prevents re-subscription
     127          16 :     } else if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC && closed_connection==0){
     128          16 :       netio_disconnect(socket);
     129             :     }
     130             :   }
     131          16 : }
     132             : 
     133             : static int
     134          16 : table_get_subs_by_socket(struct netio_subscription_table* table, struct netio_send_socket* socket)
     135             : {
     136          16 :   log_dbg("Subscription table lookup: Total subscriptions: %lu", table->num_subscriptions);
     137          16 :   unsigned remaining_subscriptions_of_socket=0;
     138          16 :   for(unsigned int k = 0; k < table->num_subscriptions; ++k) {
     139           0 :     if(table->subscriptions[k].socket == socket){remaining_subscriptions_of_socket++;}
     140             :   }
     141          16 :   log_dbg("Subscription table lookup: for socket %p: %lu", socket, remaining_subscriptions_of_socket);
     142          16 :   return remaining_subscriptions_of_socket;
     143             : }
     144             : 
     145             : static void
     146    10805972 : release_completion_object(struct netio_completion_object* completion)
     147             : {
     148    10805972 :   log_trc("releasing completion object");
     149    10805972 :   if(netio_completion_stack_push(&completion->socket->completion_stack, completion)) {
     150           0 :     log_error("Could not push completion object since stack was full. This should not happen.");
     151             :   }
     152    10805972 : }
     153             : 
     154             : static void
     155    10805972 : on_completion_trigger(void* c)
     156             : {
     157    10805972 :   struct netio_completion_object* completion = (struct netio_completion_object*)c;
     158    10805972 :   log_trc("calling cb_msg_published");
     159    10805972 :   if(completion->socket->cb_msg_published) {
     160    10805971 :     completion->socket->cb_msg_published(completion->socket, completion->key);
     161             :   }
     162    10805972 :   release_completion_object(completion);
     163    10805972 : }
     164             : 
     165           0 : void print_completion_objects(struct netio_unbuffered_publish_socket* socket){
     166           0 :   printf("Number of available completion objects: %zu \n", socket->completion_stack.available_objects);
     167           0 :   printf("===============================================\n");
     168           0 :   printf("CO# \t KEY \t \t TAG \n");
     169           0 :   printf("-----------------------------------------------\n");
     170           0 :   for(unsigned int i=0; i < socket->completion_stack.num_objects; ++i){
     171           0 :     uint32_t tag = (socket->completion_stack.key_array[i] >> 32);
     172           0 :     printf("%u \t 0x%lx \t %x \n", i, socket->completion_stack.key_array[i], tag);
     173             :   }
     174           0 :   printf("===============================================\n");
     175           0 :   printf("Subscription table: there are %zu elinks subscribed\n", socket->subscription_table.num_subscriptions);
     176           0 :   printf("FID \t \t \t SOCKET\n");
     177           0 :   printf("-----------------------------------------------\n");
     178           0 :   for (unsigned int i=0; i < socket->subscription_table.num_subscriptions; ++i){
     179           0 :     printf("0x%lx \t %p \n", socket->subscription_table.subscriptions[i].tag, socket->subscription_table.subscriptions[i].socket);
     180             :   }
     181           0 :   socket->completion_stack.printed = 1;
     182           0 : }
     183             : 
     184             : 
     185             : 
     186             : static struct netio_completion_object*
     187    18137380 : fetch_completion_object(struct netio_unbuffered_publish_socket* socket)
     188             : {
     189    18137380 :   struct netio_completion_object* completion;
     190    18137380 :   if(netio_completion_stack_pop(&socket->completion_stack, &completion)) {
     191             : #if defined DEBUG || defined DEBUG_UPUB
     192             :     if (socket->completion_stack.printed == 0){
     193             :       print_completion_objects(socket);
     194             :     }
     195             : #endif
     196             :     return NULL;
     197             :   }
     198             : 
     199    10805972 :   netio_semaphore_init(&completion->sem, 0);
     200    10805972 :   completion->sem.data = completion;
     201    10805972 :   completion->sem.cb = on_completion_trigger;
     202    10805972 :   completion->socket = socket;
     203    10805972 :   return completion;
     204             : }
     205             : 
     206             : static void
     207    11304316 : increment_completion_object(struct netio_unbuffered_publish_socket* pubsocket, uint64_t key)
     208             : {
     209    11304316 :   log_trc("incrementing completion object");
     210    11304316 :   struct netio_completion_object *completion = (struct netio_completion_object*)key;
     211    11304316 :   netio_semaphore_increment(&completion->sem, 1);
     212    11304316 :   log_trc("current: %d     expected: %d", completion->sem.current, completion->sem.threshold);
     213    11304316 : }
     214             : 
     215             : static void
     216          21 : pubsocket_on_connection_established(struct netio_send_socket* socket)
     217             : {
     218          21 :   log_dbg("publish socket established connection to remote, can publish now");
     219          21 :   struct netio_unbuffered_publish_socket* pubsocket = (struct netio_unbuffered_publish_socket*)socket->unbuf_pub_socket;
     220             :   //add deferred subscriptions to the table
     221          21 :   struct deferred_subscription* sub = socket->deferred_subs;
     222          43 :   while(sub){
     223          22 :     int ret = table_add_subscription(&pubsocket->subscription_table, sub->tag, socket);
     224          22 :     if(pubsocket->cb_subscribe && ret == 0){
     225           8 :       pubsocket->cb_subscribe(pubsocket, sub->tag, NULL, 0);
     226             :     }
     227          22 :     pop_subscription(&sub);
     228             :   }
     229             :   //user callback
     230          21 :   if(pubsocket->cb_connection_established) {
     231           7 :     pubsocket->cb_connection_established(pubsocket);
     232             :   }
     233          21 : }
     234             : 
     235             : 
     236             : static void
     237          16 : pubsocket_on_connection_closed(struct netio_send_socket* socket)
     238             : {
     239          16 :   log_info("published socket: connection to remote was closed");
     240          16 :   struct netio_unbuffered_publish_socket* pubsocket = (struct netio_unbuffered_publish_socket*)socket->unbuf_pub_socket;
     241          16 :   if(pubsocket->cb_connection_closed) {
     242           2 :     pubsocket->cb_connection_closed(pubsocket);
     243             :   }
     244          16 :   if (table_get_subs_by_socket(&pubsocket->subscription_table, socket)){
     245           0 :     uint8_t closed_connection = 1;
     246           0 :     table_remove_subscription(pubsocket, socket, 0, closed_connection);
     247             :   }
     248          16 :   handle_send_socket_shutdown(socket);
     249          16 :   remove_socket(&(pubsocket->subscription_table.socket_list), socket);
     250          16 : }
     251             : 
     252             : 
     253             : static void
     254         155 : on_unbuffered_send_connection_closed(struct netio_send_socket* socket)
     255             : {
     256         155 :     log_info("Send socket: connection to remote was closed");
     257         155 :     if(socket->unbuf_pub_socket != NULL) {
     258           0 :       pubsocket_on_connection_closed(socket);
     259             :     } else{
     260         155 :       handle_send_socket_shutdown(socket);
     261             :     }
     262         155 : }
     263             : 
     264             : static void
     265    11304316 : pubsocket_on_send_completed(struct netio_send_socket* socket, uint64_t key)
     266             : {
     267    11304316 :   struct netio_unbuffered_publish_socket* pubsocket = (struct netio_unbuffered_publish_socket*)socket->unbuf_pub_socket;
     268    11304316 :   increment_completion_object(pubsocket, key);
     269    11304316 :   log_trc("completion on pubsocket connection, key=%lu", key);
     270    11304316 : }
     271             : 
     272             : 
     273             : static struct netio_send_socket*
     274          22 : socket_list_add_or_lookup(struct netio_unbuffered_publish_socket* pubsocket,
     275             :                           struct netio_socket_list** list,
     276             :                           void* addr, size_t addrlen,
     277             :                           int port,
     278             :                           struct netio_context* ctx)
     279             : {
     280          22 :   if(addrlen == 0) {
     281           0 :     log_error("Invalid zero-byte address");
     282           0 :     return NULL;
     283             :   }
     284             : 
     285          22 :   struct netio_socket_list* entry = find_socket_by_address(*list, addr, addrlen, port);
     286             : 
     287          22 :   if ( entry == NULL ) {
     288          21 :     entry = add_socket_with_address(list, USEND, addr, addrlen, port);
     289          21 :     struct netio_send_socket* socket = (struct netio_send_socket*)entry->socket;
     290             : 
     291          21 :     if (pubsocket->tcp_fi_mode==NETIO_MODE_LIBFABRIC) {
     292          21 :       struct netio_domain* domain = NULL;  //domain is shared among sockets,
     293          21 :       if (entry->next != NULL && entry->next->socket != NULL) {
     294           4 :         domain = ((struct netio_send_socket*)(entry->next->socket))->domain;
     295             :       }
     296          21 :       netio_init_send_socket(socket, ctx); //here we memset to zero and we loose domain
     297          21 :       socket->unbuf_pub_socket = pubsocket;
     298          21 :       socket->cb_connection_established = pubsocket_on_connection_established;
     299          21 :       socket->cb_connection_closed = pubsocket_on_connection_closed;
     300          21 :       socket->cb_send_completed = pubsocket_on_send_completed;
     301          21 :       netio_connect_rawaddr_domain(socket, entry->addr, entry->addrlen, domain);
     302          21 :     if ( domain == NULL ){  //check on local domain variable
     303          17 :       netio_register_send_buffer(socket, &pubsocket->buf, 0);
     304          17 :       netio_completion_stack_register_send_socket(&pubsocket->completion_stack, socket);
     305             :       }
     306             :     }
     307           0 :     else if (pubsocket->tcp_fi_mode==NETIO_MODE_TCP){
     308           0 :       netio_init_send_tcp_socket(socket, ctx);
     309           0 :       socket->unbuf_pub_socket = pubsocket;
     310           0 :       socket->cb_connection_established = pubsocket_on_connection_established;
     311           0 :       socket->cb_connection_closed = pubsocket_on_connection_closed;
     312           0 :       socket->cb_send_completed = pubsocket_on_send_completed;
     313           0 :       log_dbg("Calling netio_connect_tcp for addr=<%s> port=%d", (char*)entry->addr, port);
     314           0 :       netio_connect_tcp(socket, entry->addr, port);
     315             :     }
     316             :     else {
     317           0 :       log_error("socket_list_add_or_lookup called for unkown connection mode %d", pubsocket->tcp_fi_mode);
     318           0 :       remove_socket(list, socket);
     319           0 :       return NULL;
     320             :     }
     321             :   }
     322             : 
     323          22 :   struct netio_send_socket* ss = (struct netio_send_socket*)entry->socket;
     324          22 :   return ss;
     325             : }
     326             : 
     327             : 
     328             : static void
     329          22 : subscribe(struct netio_unbuffered_publish_socket* pubsocket, struct netio_recv_socket* recv_socket, void* addr, size_t addrlen, int port, netio_tag_t tag)
     330             : {
     331          22 :   struct netio_send_socket* socket = socket_list_add_or_lookup(pubsocket, &pubsocket->subscription_table.socket_list, addr, addrlen, port, pubsocket->ctx);
     332          22 :   if(socket->recv_socket == NULL){
     333          21 :     socket->recv_socket =recv_socket;
     334             :   }
     335             : 
     336          22 :   if (socket->state == CONNECTED){
     337           0 :     table_add_subscription(&pubsocket->subscription_table, tag, socket);
     338           0 :     if(pubsocket->cb_subscribe) {
     339           0 :       pubsocket->cb_subscribe(pubsocket, tag, addr, addrlen);
     340             :     }
     341             :   } else {
     342          22 :     push_back_subscription(&socket->deferred_subs, tag);
     343             :   }
     344          22 : }
     345             : 
     346             : static void
     347          16 : unsubscribe(struct netio_unbuffered_publish_socket* pubsocket, void* addr, size_t addrlen, int port, netio_tag_t tag)
     348             : {
     349          16 :   struct netio_socket_list* list = find_socket_by_address(pubsocket->subscription_table.socket_list, addr, addrlen, port);
     350          16 :   if(list == NULL){return;}
     351             : 
     352          16 :   struct netio_send_socket* socket = (struct netio_send_socket*)list->socket;
     353          16 :   uint8_t connection_closed = 0;
     354          16 :   table_remove_subscription(pubsocket, socket, tag, connection_closed);
     355          16 :   pubsocket->subscription_table.ts++;
     356             : 
     357          16 :   if(pubsocket->cb_unsubscribe) {
     358           1 :     pubsocket->cb_unsubscribe(pubsocket, tag, addr, addrlen);
     359             :   }
     360             : }
     361             : 
     362             : static void
     363          21 : lsocket_on_connection_established(struct netio_recv_socket* socket)
     364             : {
     365          21 :   log_dbg("Buffered listen socket: on connection established");
     366          21 :   if(socket->tcp_fi_mode == NETIO_MODE_TCP){
     367           0 :     struct netio_buffer* buf = malloc(sizeof(struct netio_buffer));
     368           0 :     buf->size = sizeof(struct netio_subscription_message);
     369           0 :     buf->data = malloc(buf->size);
     370           0 :     netio_post_recv(socket, buf);
     371           0 :     socket->usr = buf;
     372             :   }
     373          21 : }
     374             : 
     375             : 
     376             : static void
     377          38 : parse_subscription_message(struct netio_unbuffered_publish_socket* socket, struct netio_subscription_message* msg, struct netio_recv_socket* recv_socket)
     378             : {
     379          38 :   if (msg->action){
     380          22 :     log_dbg("subscription request for tag %lu, addrlen=%lu", msg->tag, msg->addrlen);
     381          22 :     subscribe(socket, recv_socket, msg->addr, msg->addrlen, msg->port, msg->tag);
     382             :   }
     383             :   else{
     384          16 :     log_dbg("unsubscription request for tag %lu, addrlen=%lu", msg->tag, msg->addrlen);
     385          16 :     unsubscribe(socket, msg->addr, msg->addrlen, msg->port, msg->tag);
     386             :   }
     387          38 : }
     388             : 
     389             : 
     390             : static void
     391          38 : lsocket_on_msg_received(struct netio_recv_socket* socket, struct netio_buffer* buf, void* data, size_t len)
     392             : {
     393          38 :     log_trc("message received");
     394          38 :     struct netio_unbuffered_publish_socket* pubsocket = (struct netio_unbuffered_publish_socket*)socket->lsocket->usr;
     395          38 :     if(len != sizeof(struct netio_subscription_message)) {
     396           0 :       log_error("Illegal subscription message size %lu", len);
     397           0 :       netio_post_recv(socket, buf);
     398           0 :       return;
     399             :     }
     400          38 :     parse_subscription_message(pubsocket, (struct netio_subscription_message*)buf->data, socket);
     401          38 :     netio_post_recv(socket, buf);
     402             : }
     403             : 
     404             : 
     405             : static int
     406        2255 : send_subscription_message(struct netio_unbuffered_subscribe_socket* socket, netio_tag_t tag, int action)
     407             : {
     408        2255 :     if( action == NETIO_SUBSCRIBE ){
     409        1157 :       log_info("Sending subscription for tag 0x%lx", tag);
     410        1098 :     } else if ( action == NETIO_UNSUBSCRIBE ){
     411        1098 :       log_info("Sending unsubscription for tag 0x%lx", tag);
     412             :     } else {
     413           0 :       log_error("Invalid subscription action %d", action);
     414           0 :       return 0;
     415             :     }
     416        2255 :     int ret = 0;
     417        2255 :     if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
     418        1170 :       socket->msg.tag = tag;
     419        1170 :       socket->msg.action = action;
     420        1170 :       socket->buf.data = &socket->msg;
     421        1170 :       socket->buf.size = sizeof(struct netio_subscription_message);
     422        1170 :       ret = netio_send_inline_buffer(&socket->socket, &socket->buf);
     423        1085 :     } else if (socket->tcp_fi_mode == NETIO_MODE_TCP){
     424             :       // look for available buf
     425             :       size_t id = 0;
     426        2122 :       while (socket->bufs[id].to_send) {
     427        1037 :         id++;
     428             :       }
     429        1085 :       log_dbg("tag id %d", id);
     430        1085 :       socket->msgs[id] = socket->msgs[0]; // initialize msg to default
     431        1085 :       socket->msgs[id].tag = tag;
     432        1085 :       socket->msgs[id].action = action;
     433        1085 :       socket->bufs[id].data = &socket->msgs[id];
     434        1085 :       socket->bufs[id].size = sizeof(struct netio_subscription_message);
     435        1085 :       socket->bufs[id].to_send = 1;
     436        1085 :       ret = netio_send_buffer(&socket->socket, &socket->bufs[id]);
     437             :     } else {
     438           0 :       log_error("Socket connection type unsupported %d", socket->tcp_fi_mode);
     439           0 :       ret = 1;
     440             :     }
     441             :     return ret;
     442             : }
     443             : 
     444             : /**
     445             :  * Initialize an unbuffered publish socket
     446             :  *
     447             :  * @param socket: An unbuffered publish socket
     448             :  * @param ctx: A netio context
     449             :  * @param hostname: Local hostname to bind to
     450             :  * @param port: Local port to bind to
     451             :  * @param buf: A registered send buffer
     452             :  */
     453             : void
     454          18 : netio_unbuffered_publish_libfabric_socket_init(struct netio_unbuffered_publish_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffer* buf)
     455             : {
     456          18 :     log_dbg("netio_unbuffered_publish_libfabric_socket_init");
     457          18 :     socket->ctx = ctx;
     458          18 :     socket->tcp_fi_mode = NETIO_MODE_LIBFABRIC;
     459          18 :     memcpy(&socket->buf, buf, sizeof(struct netio_buffer));
     460          18 :     netio_subscription_table_init(&socket->subscription_table);
     461          18 :     netio_completion_stack_init(&socket->completion_stack, PUBLISH_SOCKET_MAX_COMPLETIONS);
     462          18 :     netio_init_listen_socket(&socket->lsocket, ctx, NULL);
     463          18 :     socket->lsocket.usr = socket;
     464          18 :     socket->lsocket.cb_connection_established = lsocket_on_connection_established;
     465          18 :     socket->lsocket.cb_msg_received = lsocket_on_msg_received;
     466          18 :     socket->lsocket.recv_sub_msg = 1;
     467          18 :     netio_listen(&socket->lsocket, hostname, port);
     468             : 
     469          18 :     socket->buf_array[0] = &socket->completion_stack.buf;
     470         504 :     for(unsigned i=1; i<NETIO_MAX_IOV_LEN; i++) {
     471         486 :       socket->buf_array[i] = &socket->buf;
     472             :     }
     473          18 : }
     474             : 
     475             : /* tcp versio of above */
     476             : void
     477           0 : netio_unbuffered_publish_tcp_socket_init(struct netio_unbuffered_publish_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffer* buf)
     478             : {
     479           0 :     log_dbg("INIT: netio_unbuffered_publish_tcp_socket_init");
     480           0 :     socket->ctx = ctx;
     481           0 :     socket->tcp_fi_mode = NETIO_MODE_TCP;
     482           0 :     memcpy(&socket->buf, buf, sizeof(struct netio_buffer));
     483           0 :     netio_subscription_table_init(&socket->subscription_table);
     484           0 :     netio_completion_stack_init(&socket->completion_stack, PUBLISH_SOCKET_MAX_COMPLETIONS);
     485           0 :     netio_init_listen_tcp_socket(&socket->lsocket, ctx, NULL);
     486           0 :     socket->lsocket.usr = socket;
     487           0 :     socket->lsocket.cb_connection_established = lsocket_on_connection_established;
     488           0 :     socket->lsocket.cb_msg_received = lsocket_on_msg_received;
     489           0 :     socket->lsocket.recv_sub_msg = 1;
     490           0 :     netio_listen_tcp(&socket->lsocket, hostname, port);
     491             : 
     492           0 :     socket->buf_array[0] = &socket->completion_stack.buf;
     493           0 :     for(unsigned i=1; i<NETIO_MAX_IOV_LEN; i++) {
     494           0 :       socket->buf_array[i] = &socket->buf;
     495             :     }
     496           0 : }
     497             : 
     498             : /**
     499             :  * Initialize an unbuffered publish socket
     500             :  *
     501             :  * @param socket: An unbuffered publish socket
     502             :  * @param ctx: A netio context
     503             :  * @param hostname: Local hostname to bind to
     504             :  * @param port: Local port to bind to
     505             :  * @param buf: A registered send buffer
     506             :  */
     507             : void
     508          18 : netio_unbuffered_publish_socket_init(struct netio_unbuffered_publish_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffer* buf)
     509             : {
     510          18 :     if (netio_tcp_mode(hostname)) {
     511           0 :         netio_unbuffered_publish_tcp_socket_init(socket, ctx, netio_hostname(hostname), port, buf);
     512             :     } else {
     513          18 :         netio_unbuffered_publish_libfabric_socket_init(socket, ctx, netio_hostname(hostname), port, buf);
     514             :     }
     515          18 : }
     516             : 
     517             : static unsigned
     518     1316889 : lookup_tag(struct netio_subscription_table* table, netio_tag_t tag, unsigned* start)
     519             : {
     520     1316889 :   struct netio_subscription key;
     521     1316889 :   key.tag = tag;
     522     2633778 :   struct netio_subscription* ptr = (struct netio_subscription*)bsearch(&key,
     523     1316889 :                                                                        table->subscriptions,
     524             :                                                                        table->num_subscriptions,
     525             :                                                                        sizeof(struct netio_subscription),
     526             :                                                                        cmp_subscription);
     527             : 
     528     1316889 :   log_trc("found ptr=0x%p", ptr);
     529     1316889 :   if(ptr == NULL) {
     530             :     return 0;
     531             :   }
     532      769982 :   unsigned start_idx = ptr - table->subscriptions;
     533      770499 :         while(start_idx > 0 && table->subscriptions[start_idx-1].tag == tag) {
     534             :                 start_idx--;
     535             :         }
     536             :         log_trc("start_idx=%d", start_idx);
     537             :   unsigned count = 0;
     538     1540481 :   for(unsigned i=start_idx; i<table->num_subscriptions; i++) {
     539      770503 :     if(table->subscriptions[i].tag == tag) {
     540      770499 :       count++;
     541             :     } else {
     542             :       break;
     543             :     }
     544             :   }
     545      769982 :   *start = start_idx;
     546      769982 :   return count;
     547             : }
     548             : 
     549             : static unsigned
     550   211120298 : lookup_tag_cached(struct netio_subscription_table* table, netio_tag_t tag, struct netio_subscription_cache* cache, unsigned* start)
     551             : {
     552   211120298 :   if(cache->ts != table->ts) {
     553          34 :                 log_trc("subscription updating cache");
     554          34 :     cache->count = lookup_tag(table, tag, &cache->idx_start);
     555          34 :     cache->ts = table->ts;
     556             :   }
     557             : 
     558             : #if defined DEBUG || defined DEBUG_UBUF
     559             :     printf("Subscription table:");
     560             :           for(unsigned i=0; i<table->num_subscriptions; i++) {
     561             :                         printf("%u:%lu ", i, table->subscriptions[i].tag);
     562             :                 }
     563             :                 printf("\n");
     564             : #endif
     565             : 
     566   211120298 :   *start = cache->idx_start;
     567   211120298 :         log_trc("cache count=%d for tag=%lu", cache->count, tag);
     568   211120298 :   return cache->count;
     569             : }
     570             : 
     571             : /**
     572             :  * @brief Publishes a message on an unbuffered publish socket.
     573             :  *
     574             :  * @see netio_unbuffered_publishv_usr
     575             :  *
     576             :  * Same as netio_unbuffered_publishv_usr, but without the user header data field.
     577             :  */
     578             : int
     579     1316832 : netio_unbuffered_publishv(struct netio_unbuffered_publish_socket* socket,
     580             :                               netio_tag_t tag,
     581             :                               struct iovec* iov,
     582             :                               size_t count,
     583             :                               uint64_t* key,
     584             :                               int flags,
     585             :                               struct netio_subscription_cache* cache)
     586             : {
     587     1316832 :     return netio_unbuffered_publishv_usr(socket, tag, iov, count, key, flags, cache, 0, 0);
     588             : }
     589             : 
     590             : /**
     591             :  * @brief Publishes a message on an unbuffered publish socket.
     592             :  *
     593             :  * The message is given as a scatter/gather buffer (`struct iovec`). The caller
     594             :  * has to ensure the validity of the buffer until the transfer is complete.
     595             :  * A transfer is complete when the socket's `msg_published` callback has been
     596             :  * called. A key can be passed to the call to identify the publication.
     597             :  * The key will be passed in the `msg_published` callback.
     598             :  *
     599             :  * The `msg_published` callback will only be called if the message has been
     600             :  * sent successfully to all subscribed endpoints.
     601             :  *
     602             :  * The call may return `NETIO_STATUS_AGAIN` if one of the sockets connections
     603             :  * yields `NETIO_STATUS_AGAIN`. In this case it is the user's responsibility
     604             :  * to call `netio_unbuffered_publishv` again with the `NETIO_REENTRY` flag.
     605             :  *
     606             :  * @param socket:  The socket to publish on
     607             :  * @param tag:     The tag under which to publish
     608             :  * @param iov:     Message data iov
     609             :  * @param count:   IOV count
     610             :  * @param key:     Key that will be passed to the callback on successful publish of the
     611             :  *                 message. This is an input-output parameter. In case the function
     612             :  *                 returns NETIO_STATUS_PARTIAL, 'key' is used as storage to track the
     613             :  *                 completion data for the given tag. If netio_unbuffered_publishv is
     614             :  *                 called again with the NETIO_REENTRY flag, 'key' must remain
     615             :  *                 unchanged. In other words, for a given tag, 'key' is only set by the
     616             :  *                 user before the initial call to netio_unbuffered_publishv without
     617             :  *                 the NETIO_REENTRY flag.
     618             :  * @param flags:   NETIO_REENTRY  publishing of this message was attempted before and
     619             :  *                         resulted in NETIO_STATUS_AGAIN. Calling publish with
     620             :  *                         this flag will only send on connections where the
     621             :  *                         message was previously unpublished.
     622             :  * @param cache:   Optional user-supplied cache for the subsctiption table lookup.
     623             :  * @param usr:     Up to 8 byte of data that are transmitted as beginning of the message. This
     624             :  *                 allows the user to add a short header to a message without having to allocate
     625             :  *                 bufferspace for it.
     626             :  * @param usr_size: Size of the usr header field. Set to 0 if no header is required. The maximum
     627             :  *                  header size is 8.
     628             :  *
     629             :  * @returns NETIO_STATUS_OK       If the message was published successfully to all subscribed endpoints
     630             :  * @returns NETIO_STATUS_OK_NOSUB No ongoing subscriptions to publish the given message
     631             :  * @returns NETIO_STATUS_AGAIN    If not enough resources are available to proceed with the operation.
     632             :  *                                No data were sent to any endpoint. The user should try again with the exact same parameters.
     633             :  * @returns NETIO_STATUS_PARTIAL  The message was sent to some of the subscribed endpoints, but not all. The user should
     634             :  *                                try again, and additionally set the NETIO_REENTRY flag. Users must take care not to
     635             :  *                                overwrite the key parameter, which is used by the function call to track the operation
     636             :  *                                status.
     637             :  * @returns NETIO_ERROR_MAX_IOV_EXCEEDED
     638             :  *                                Too many iovec entries, try with less.
     639             :  */
     640             : int
     641   212437131 : netio_unbuffered_publishv_usr(struct netio_unbuffered_publish_socket* socket,
     642             :                               netio_tag_t tag,
     643             :                               struct iovec* iov,
     644             :                               size_t count,
     645             :                               uint64_t* key,
     646             :                               int flags,
     647             :                               struct netio_subscription_cache* cache,
     648             :                               uint64_t usr,
     649             :                               uint8_t usr_size)
     650             : {
     651   212437131 :   int ret = NETIO_STATUS_OK;
     652   212437131 :   if(count > NETIO_MAX_IOV_LEN-1) {
     653             :     return NETIO_ERROR_MAX_IOV_EXCEEDED;
     654             :   }
     655             : 
     656   212437131 :   log_trc("unbuffered publishv, key=0x%p, tag=%lu", key, tag);
     657             : 
     658   212437131 :   unsigned start_idx;
     659   212437131 :   unsigned num_subscriptions;
     660             : 
     661   212437131 :   if(cache) {
     662   211120298 :     num_subscriptions = lookup_tag_cached(&socket->subscription_table, tag, cache, &start_idx);
     663             :   } else {
     664     1316833 :     num_subscriptions = lookup_tag(&socket->subscription_table, tag, &start_idx);
     665             :   }
     666             : 
     667   212437131 :   log_trc("unbuffered publishv: num_subscriptions for tag %lu: %d, start_idx: %d", tag, num_subscriptions, start_idx);
     668   212437131 :   if(num_subscriptions == 0) {
     669   194299751 :     if(socket->cb_msg_published) {
     670   194299751 :       socket->cb_msg_published(socket, *key);
     671             :     }
     672   194299751 :     return NETIO_STATUS_OK_NOSUB;
     673             :   }
     674             : 
     675    18137380 :   int used_completion = 0;
     676    18137380 :   struct netio_completion_object* completion = NULL;
     677    18137380 :   if(flags & NETIO_REENTRY) {
     678           0 :     log_trc("unbuffered publishv: REENTRY - fetching completion from user");
     679           0 :     completion = (struct netio_completion_object*)(*key);
     680           0 :     used_completion = 1;
     681             :   } else {
     682    18137380 :     log_trc("unbuffered publishv: fetching completion object");
     683    18137380 :     completion = fetch_completion_object(socket);
     684    18137380 :     if(completion) {
     685    10805972 :       netio_semphore_set_threshold(&completion->sem, num_subscriptions);
     686    10805972 :       completion->key = *key;
     687    10805972 :       completion->header.tag = tag;
     688    10805972 :       completion->header.usr = usr;
     689    10805972 :       completion->usr_size = usr_size;
     690    10805972 :       socket->completion_stack.key_array[socket->completion_stack.available_objects] = *key;
     691    10805972 :       *key = (uint64_t)completion;
     692    10805972 :       used_completion = 0;
     693    10805972 :       log_trc("fetched completion: %lu for tag 0%lx", completion->key, tag);
     694             :     } else {
     695             :       log_trc("unbuffered publishv: no completion available -> AGAIN");
     696             :       // When no completion is available, we return NETIO_STATUS_AGAIN
     697             :       // The user is supposed to call the same call again (no need to keep track of completion object)
     698             :       return NETIO_STATUS_AGAIN;
     699             :     }
     700             :   }
     701             : 
     702             : 
     703    22112002 :   for(unsigned i=start_idx; i<start_idx+num_subscriptions; i++) {
     704    11306030 :     struct netio_subscription* subscription = &socket->subscription_table.subscriptions[i];
     705    11306030 :     log_trc("subscription %d has tag %lu", i, subscription->tag);
     706    11306030 :     if(subscription->tag == tag) {
     707             :       // skip connections that were already successful if we are in reentry mode
     708    11306030 :       if(flags & NETIO_REENTRY) {
     709           0 :         if(subscription->again == 0) {
     710           0 :           continue;
     711             :         }
     712             :       }
     713             : 
     714    11306030 :       struct netio_send_socket* ssocket = (struct netio_send_socket*)(subscription->socket);
     715    11306030 :       log_trc("unbuffered publishv: send message on subscribed connection, iov-count=%lu, iov-len[0]=%lu", count, iov[0].iov_len);
     716    11306030 :       int result;
     717    11306030 :       struct iovec hdr_iov[NETIO_MAX_IOV_LEN];
     718    11306030 :       hdr_iov[0].iov_base = &completion->header;
     719    11306030 :       hdr_iov[0].iov_len = sizeof(netio_tag_t) + completion->usr_size;
     720    11306030 :       uint32_t total_size=hdr_iov[0].iov_len;
     721             : 
     722    54812069 :       for(unsigned j=0; j<count; j++) {
     723    43506039 :         hdr_iov[1+j].iov_base = iov[j].iov_base;
     724    43506039 :         hdr_iov[1+j].iov_len = iov[j].iov_len;
     725    43506039 :         total_size+=iov[j].iov_len;
     726             :       }
     727    11306030 :       log_trc("iov: [0]: %lu - compl: %lu", *((netio_tag_t*)hdr_iov[0].iov_base), completion->key);
     728    11306030 :       if (socket->tcp_fi_mode==NETIO_MODE_LIBFABRIC) {
     729    11306030 :         result = netio_sendv_imm(ssocket, socket->buf_array, hdr_iov, count+1, (uint64_t)completion, 0);
     730             :       }
     731             :       else {
     732           0 :         result = netio_tcp_sendv_imm(ssocket, total_size,
     733             :                                      hdr_iov, count+1, (uint64_t)completion, 0);
     734             :       }
     735    11306030 :       log_trc("unbuffered publishv: result=%d", result);
     736             : 
     737    11306030 :       if(result == NETIO_STATUS_OK) {
     738    11306030 :         used_completion = 1;
     739    11306030 :         subscription->again = 0;
     740           0 :       } else if(result == NETIO_STATUS_AGAIN) {
     741           0 :         subscription->again = 1;
     742           0 :         used_completion = 1;
     743           0 :         ret = NETIO_STATUS_PARTIAL;
     744             :       } else {
     745           0 :         return result;
     746             :         // some error occured and we return immediately
     747             :         // TODO we should handle the error and unsubscribe the faulty remote
     748             :       }
     749             :     }
     750             :   }
     751             : 
     752    10805972 :   if(used_completion == 0) {
     753           0 :     netio_completion_stack_push(&socket->completion_stack, completion);
     754             :   }
     755             : 
     756             :   return ret;
     757             : }
     758             : 
     759             : 
     760             : static void
     761         100 : subscribe_socket_on_connection_established(struct netio_recv_socket* socket)
     762             : {
     763         100 :         struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->lsocket->usr;
     764         100 :   if(subscribe_socket->cb_connection_established) {
     765          94 :     subscribe_socket->cb_connection_established(subscribe_socket);
     766             :   }
     767         100 : }
     768             : 
     769             : 
     770             : static void
     771          46 : subscribe_socket_on_connection_closed(struct netio_recv_socket* socket)
     772             : {
     773          46 :         struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->lsocket->usr;
     774          46 :         log_info("subscribe socket: connection closed");
     775          46 :   if(subscribe_socket->cb_connection_closed)  {
     776          46 :     subscribe_socket->cb_connection_closed(subscribe_socket);
     777             :   }
     778          46 : }
     779             : 
     780             : 
     781             : static void
     782    29569709 : subscribe_socket_on_msg_received(struct netio_recv_socket* rsocket, struct netio_buffer* buf, void* data, size_t len)
     783             : {
     784    29569709 :     struct netio_unbuffered_subscribe_socket* socket = (struct netio_unbuffered_subscribe_socket*)rsocket->lsocket->usr;
     785             : 
     786    29569709 :     netio_tag_t tag;
     787    29569709 :     memcpy(&tag, data, sizeof(netio_tag_t));
     788    29569709 :     data = (char *)data + sizeof(netio_tag_t);
     789    29569709 :     len -= sizeof(netio_tag_t);
     790             : 
     791    29569709 :     log_trc("buffer received of length %lu for tag %lu", len, tag);
     792             : 
     793    29569709 :     if(socket->cb_msg_received) {
     794    29569709 :       socket->cb_msg_received(socket, tag, data, len);
     795             :     }
     796             : 
     797    29569700 :     netio_post_recv(rsocket, buf);
     798    29569700 : }
     799             : 
     800             : 
     801             : static void
     802          99 : subsocket_on_connection_established(struct netio_send_socket* socket)
     803             : {
     804          99 :     log_dbg("subsocket connection established");
     805          99 :     int ret;
     806          99 :     struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->usr;
     807             : 
     808          99 :     if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
     809          60 :       subscribe_socket->msg.addrlen = NETIO_MAX_ADDRLEN;
     810          60 :       if((ret = fi_getname(&subscribe_socket->recv_socket.pep->fid,
     811          60 :                           subscribe_socket->msg.addr,
     812             :                           &subscribe_socket->msg.addrlen)) !=0) {
     813           0 :         log_fatal("Cannot get information on local interface %d: %s", ret, fi_strerror(-ret));
     814           0 :         exit(1);
     815             :       }
     816          60 :       subscribe_socket->buf.data = &subscribe_socket->msg;
     817          60 :       subscribe_socket->buf.size = sizeof(struct netio_subscription_message);
     818          60 :       netio_register_send_buffer(socket, &subscribe_socket->buf, 0);
     819             :     }
     820          39 :     else if (socket->tcp_fi_mode == NETIO_MODE_TCP) {
     821             :       // initialize msgs[0] with defaults
     822          39 :       struct sockaddr sock_addr;
     823          39 :       socklen_t addrlen=sizeof(sock_addr);
     824          39 :       getsockname(socket->eq_ev_ctx.fd, &sock_addr, &addrlen);
     825          39 :       getnameinfo(&sock_addr, addrlen,
     826          39 :                 subscribe_socket->msgs[0].addr,NETIO_MAX_ADDRLEN,
     827             :                 NULL, 0, NI_NUMERICHOST);
     828          39 :       addrlen=strlen(subscribe_socket->msgs[0].addr);
     829          39 :       subscribe_socket->msgs[0].addr[addrlen] = 0;
     830          39 :       subscribe_socket->msgs[0].addrlen = addrlen+1;
     831          39 :       subscribe_socket->msgs[0].port = subscribe_socket->msg.port;
     832             :     }
     833             : 
     834          99 :     if(subscribe_socket->total_tags == 0){
     835           0 :       log_info("Closing send connection again because there is no tag to subscribe to.");
     836           0 :       netio_disconnect(socket);
     837             :     }
     838             : 
     839          99 :     if(subscribe_socket->total_tags == 0){
     840           0 :       log_info("Closing send connection again because there is no tag to subscribe to.");
     841           0 :       netio_disconnect(socket);
     842             :     }
     843             : 
     844             :     // send tags one by one
     845         366 :     while(subscribe_socket->total_tags > 0){
     846         267 :       size_t idx = subscribe_socket->total_tags - 1;
     847         267 :       netio_tag_t tag = subscribe_socket->tags_to_subscribe[idx];
     848         267 :       log_info("subsocket_on_connection_established, sending subscription for tag 0x%lx pos %lu", tag, idx);
     849         267 :       ret = send_subscription_message(subscribe_socket, tag, NETIO_SUBSCRIBE);
     850         267 :       if (ret == NETIO_STATUS_OK){
     851         267 :         subscribe_socket->total_tags--;
     852             :       } else {
     853           0 :         log_warn("subsocket_on_connection_established: send_subscription_message returned %d", ret);
     854           0 :         break;
     855             :       }
     856             :     }
     857          99 : }
     858             : 
     859             : 
     860             : static void
     861          41 : subsocket_on_connection_closed(struct netio_send_socket* socket)
     862             : {
     863          41 :     struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->usr;
     864          41 :     subscribe_socket->state = NONE;
     865          41 : }
     866             : 
     867             : 
     868             : static void
     869          26 : subsocket_on_error_connection_refused(struct netio_send_socket* socket) {
     870          26 :     log_dbg("subsocket connection refused");
     871          26 :     struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->usr;
     872          26 :     subscribe_socket->state = NONE;
     873          26 :     handle_listen_socket_shutdown(&subscribe_socket->recv_socket);
     874          26 :     if(subscribe_socket->cb_error_connection_refused) {
     875          26 :       subscribe_socket->cb_error_connection_refused(subscribe_socket);
     876             :     }
     877          26 : }
     878             : 
     879             : 
     880             : static void
     881        2246 : subsocket_on_send_completed(struct netio_send_socket* socket, uint64_t key)
     882             : {
     883        2246 :     struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->usr;
     884             : 
     885             :     //check for remaining tags from on_connection_established
     886        2246 :     while(subscribe_socket->total_tags > 0){
     887           0 :       size_t idx = subscribe_socket->total_tags - 1;
     888           0 :       netio_tag_t tag = subscribe_socket->tags_to_subscribe[idx];
     889           0 :       int ret = send_subscription_message(subscribe_socket, tag, NETIO_SUBSCRIBE);
     890           0 :       if (ret == NETIO_STATUS_OK){
     891           0 :         subscribe_socket->total_tags--;
     892             :       } else {
     893           0 :         log_warn("subsocket_on_send_completed: send_subscription_message returned %d", ret);
     894           0 :         break;
     895             :       }
     896             :     }
     897        2246 : }
     898             : 
     899             : 
     900             : /**
     901             :  * Initialize an unbuffered subscribe socket
     902             :  *
     903             :  * @param socket: An unbuffered subscribe socket
     904             :  * @param ctx: A netio context
     905             :  * @param hostname: A local hostname or IP to bind to
     906             :  * @param remote_host: Hostname or IP of the remote publish socket
     907             :  * @param remote_port: Port of the remote publish socket
     908             :  * @param count: Size of the buffer array
     909             :  */
     910          86 : void netio_unbuffered_subscribe_libfabric_socket_init(struct netio_unbuffered_subscribe_socket* socket,
     911             :                                                       struct netio_context* ctx,
     912             :                                                       const char* hostname,
     913             :                                                       const char* remote_host,
     914             :                                                       unsigned remote_port,
     915             :                                                       size_t buffer_size,
     916             :                                                       size_t count)
     917             : {
     918          86 :         memset(socket, 0, sizeof(*socket));
     919          86 :         socket->ctx = ctx;
     920          86 :         socket->state = NONE;
     921          86 :   socket->tcp_fi_mode = NETIO_MODE_LIBFABRIC;
     922          86 :   struct netio_unbuffered_socket_attr attr = {count, buffer_size};
     923          86 :         netio_init_listen_socket(&socket->recv_socket, ctx, &attr);
     924          86 :         socket->recv_socket.usr = socket;
     925          86 :         socket->recv_socket.cb_msg_received = subscribe_socket_on_msg_received;
     926          86 :         socket->recv_socket.cb_connection_established = subscribe_socket_on_connection_established;
     927          86 :   socket->recv_socket.cb_connection_closed = subscribe_socket_on_connection_closed;
     928          86 :         netio_listen(&socket->recv_socket, hostname, 0);
     929             : 
     930          86 :         socket->remote_hostname = strdup(remote_host);
     931          86 :         socket->remote_port = remote_port;
     932             : 
     933          86 :         log_dbg("subscribe socket is listening");
     934             : 
     935          86 :         socket->total_tags = 0;
     936          86 : }
     937             : 
     938             : 
     939             : /* As above but with tcp instead of libfabric   */
     940          39 : void netio_unbuffered_subscribe_tcp_socket_init(struct netio_unbuffered_subscribe_socket* socket,
     941             :                                             struct netio_context* ctx,
     942             :                                             const char* hostname,
     943             :                                             const char* remote_host,
     944             :                                             unsigned remote_port,
     945             :                                             size_t buffer_size,
     946             :                                             size_t count)
     947             : {
     948          39 :   log_dbg("INIT: netio_unbuffered_subscribe_tcp_socket_init");
     949          39 :         memset(socket, 0, sizeof(*socket));
     950          39 :         socket->ctx = ctx;
     951          39 :         socket->state = NONE;
     952          39 :   socket->tcp_fi_mode = NETIO_MODE_TCP;
     953          39 :   struct netio_unbuffered_socket_attr attr = {count, buffer_size};
     954          39 :         netio_init_listen_tcp_socket(&socket->recv_socket, ctx, &attr);
     955          39 :         socket->recv_socket.usr = socket;
     956          39 :         socket->recv_socket.cb_msg_received = subscribe_socket_on_msg_received;
     957          39 :         socket->recv_socket.cb_connection_established = subscribe_socket_on_connection_established;
     958          39 :   socket->recv_socket.cb_connection_closed = subscribe_socket_on_connection_closed;
     959          39 :         netio_listen_tcp(&socket->recv_socket, hostname, 0);
     960             : 
     961          39 :   log_dbg("socket=%p   recv_socket.port=%d\n",socket,socket->recv_socket.port);
     962          39 :   socket->msg.port=socket->recv_socket.port;
     963             : 
     964          39 :         socket->remote_hostname = strdup(remote_host);
     965          39 :         socket->remote_port = remote_port;
     966             : 
     967          39 :         socket->total_tags = 0;
     968          39 : }
     969             : 
     970             : /**
     971             :  * Initialize an unbuffered subscribe socket
     972             :  *
     973             :  * @param socket: An unbuffered subscribe socket
     974             :  * @param ctx: A netio context
     975             :  * @param hostname: A local hostname or IP to bind to
     976             :  * @param remote_host: Hostname or IP of the remote publish socket
     977             :  * @param remote_port: Port of the remote publish socket
     978             :  * @param buffers: Array of registered receive buffers
     979             :  * @param count: Size of the buffer array
     980             :  */
     981         125 : void netio_unbuffered_subscribe_socket_init(struct netio_unbuffered_subscribe_socket* socket,
     982             :                                             struct netio_context* ctx,
     983             :                                             const char* hostname,
     984             :                                             const char* remote_host,
     985             :                                             unsigned remote_port,
     986             :                                             //struct netio_buffer* buffers,
     987             :                                             size_t buffer_size,
     988             :                                             size_t count)
     989             : {
     990         125 :   if (netio_tcp_mode(hostname)) {
     991          39 :     netio_unbuffered_subscribe_tcp_socket_init(socket, ctx, netio_hostname(hostname), netio_hostname(remote_host), remote_port, buffer_size, count);
     992             :   } else {
     993          86 :     netio_unbuffered_subscribe_libfabric_socket_init(socket, ctx, netio_hostname(hostname), netio_hostname(remote_host), remote_port, buffer_size, count);
     994             :   }
     995         125 : }
     996             : 
     997             : /**
     998             :  * Initializes an unbuffered send socket.
     999             :  *
    1000             :  * @param socket: The socket to initialize
    1001             :  * @param ctx: The NetIO context object in which to initialize the socket
    1002             :  */
    1003             : void
    1004         137 : netio_unbuffered_send_socket_init(struct netio_send_socket* socket, struct netio_context* ctx)
    1005             : {
    1006         137 :   netio_init_send_socket(socket, ctx);
    1007         137 :   socket->cb_internal_connection_closed = on_unbuffered_send_connection_closed;
    1008         137 : }
    1009             : 
    1010             : 
    1011             : /**
    1012             :  * Subscribe an unbuffered subscribe socket to a given tag.
    1013             :  *
    1014             :  * @param socket: An unbuffered subscribe socket
    1015             :  * @param tag: A netio tag
    1016             :  */
    1017             : int
    1018        1183 : netio_unbuffered_subscribe(struct netio_unbuffered_subscribe_socket* socket, netio_tag_t tag)
    1019             : {
    1020        1183 :   if(socket->state == NONE) {
    1021         125 :     log_dbg("Creating and connecting a new socket");
    1022         125 :     if (socket->tcp_fi_mode  == NETIO_MODE_LIBFABRIC) {
    1023          86 :                   netio_init_send_socket(&socket->socket, socket->ctx);
    1024          86 :                   socket->socket.usr = socket;
    1025          86 :                   socket->socket.cb_connection_established = subsocket_on_connection_established;
    1026          86 :       socket->socket.cb_connection_closed = subsocket_on_connection_closed;
    1027          86 :                   socket->socket.cb_send_completed = subsocket_on_send_completed;
    1028          86 :       socket->socket.cb_error_connection_refused = subsocket_on_error_connection_refused;
    1029          86 :       socket->socket.cb_internal_connection_closed = on_unbuffered_send_connection_closed;
    1030          86 :                   netio_connect(&socket->socket, socket->remote_hostname, socket->remote_port);
    1031             :     }
    1032          39 :     else if (socket->tcp_fi_mode  == NETIO_MODE_TCP) {
    1033          39 :       netio_init_send_tcp_socket(&socket->socket, socket->ctx);
    1034          39 :       socket->socket.usr = socket;
    1035          39 :       socket->socket.cb_connection_established = subsocket_on_connection_established;
    1036          39 :       socket->socket.cb_connection_closed = subsocket_on_connection_closed;
    1037          39 :       socket->socket.cb_error_connection_refused = subsocket_on_error_connection_refused;
    1038          39 :       socket->socket.cb_send_completed = subsocket_on_send_completed;
    1039          39 :       socket->socket.cb_internal_connection_closed = on_unbuffered_send_connection_closed;
    1040          39 :       netio_connect_tcp(&socket->socket, socket->remote_hostname, socket->remote_port);
    1041             :     } else {
    1042           0 :       log_error("Unsupported connection type %d", socket->tcp_fi_mode);
    1043           0 :       return 1;
    1044             :     }
    1045         125 :                 socket->state = INITIALIZED;
    1046             :         }
    1047             : 
    1048        3760 :   for(unsigned i=0; i<socket->total_tags; i++) {
    1049        2577 :     if(socket->tags_to_subscribe[i] == tag) {
    1050             :       return 0;
    1051             :     }
    1052             :   }
    1053             : 
    1054             :   //if send socket connected send message
    1055             :   //otherwise on_connection_established will do it
    1056        1183 :   if (socket->socket.state){
    1057         890 :     log_info("Sending subscription message for tag 0x%lx", tag);
    1058         890 :     int ret = send_subscription_message(socket, tag, NETIO_SUBSCRIBE);
    1059         890 :     return ret;
    1060             :   } else {
    1061         293 :     log_info("Queing subscription message for txg 0x%lx", tag);
    1062         293 :     socket->tags_to_subscribe[socket->total_tags] = tag;
    1063         293 :     socket->total_tags++;
    1064         293 :     return 0;
    1065             :   }
    1066             : 
    1067             :   return 0;
    1068             : }
    1069             : 
    1070             : 
    1071             : static int
    1072           0 : remove_tag_to_subscribe(struct netio_unbuffered_subscribe_socket* socket, netio_tag_t tag)
    1073             : {
    1074           0 :   int found = 0;
    1075           0 :   for(unsigned int i=0; i<socket->total_tags; ++i){
    1076           0 :     if(socket->tags_to_subscribe[i] == tag){
    1077           0 :       log_info("Removing tag 0x%lx from tags_to_subscribe of socket %p", socket);
    1078           0 :       for(unsigned int j = i; j < socket->total_tags-1; ++j){
    1079           0 :         socket->tags_to_subscribe[j] = socket->tags_to_subscribe[j+1];
    1080             :       }
    1081           0 :       found = 1;
    1082           0 :       socket->total_tags--;
    1083           0 :       break;
    1084             :     }
    1085             :   }
    1086           0 :   if(found == 0){
    1087           0 :     log_warn("tag 0x%lx expected in tags_to_subscribe of socket %p not found", tag);
    1088             :   }
    1089           0 :   return NETIO_STATUS_OK;
    1090             : }
    1091             : 
    1092             : 
    1093             : /**
    1094             :  * Unsubscribe from a given message tag.
    1095             :  *
    1096             :  * For a given subscribe socket, `netio_unsubscribe` can be called multiple times.
    1097             :  *
    1098             :  * @param socket: The unbuffered subscribe socket.
    1099             :  * @param tag: The tag to unsubscribe from.
    1100             :  */
    1101             : int
    1102        1098 : netio_unbuffered_unsubscribe(struct netio_unbuffered_subscribe_socket* socket, netio_tag_t tag)
    1103             : {
    1104        1098 :   int ret = NETIO_STATUS_OK;
    1105        1098 :   if(socket->state == INITIALIZED) {
    1106        1098 :     log_dbg("Subscribe socket initialised, can proceed with unsubscription");
    1107        1098 :     if (socket->socket.state) {
    1108        1098 :       ret = send_subscription_message(socket, tag, NETIO_UNSUBSCRIBE);
    1109        1098 :       log_dbg("Unsubscription message for tag 0x%lx sent, return code %d", tag, ret);
    1110             :     } else {
    1111           0 :       ret = remove_tag_to_subscribe(socket, tag);
    1112             :     }
    1113             :   } else {
    1114        1098 :     log_dbg("The connection has been already closed.");
    1115             :   }
    1116        1098 :   return ret;
    1117             : }
    1118             : 
    1119             : 
    1120             : /**
    1121             :  * Returns the available number of completion objects of an unbuffered publish
    1122             :  * socket
    1123             :  *
    1124             :  * @param socket: The unbuffered publish socket.
    1125             :  */
    1126             : unsigned
    1127         104 : netio_pubsocket_get_available_co(struct netio_unbuffered_publish_socket* socket)
    1128             : {
    1129         104 :   if(socket != NULL){
    1130         104 :     return socket->completion_stack.available_objects;
    1131             :   } else {
    1132             :     return 0;
    1133             :   }
    1134             : }

Generated by: LCOV version 1.0