LCOV - code coverage report
Current view: top level - netio-next/src - unbufpubsub.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 376 444 84.7 %
Date: 2025-06-10 03:23:28 Functions: 32 34 94.1 %

          Line data    Source code
       1             : #include <stdio.h>
       2             : #include <string.h>
       3             : #include <assert.h>
       4             : #include "log.h"
       5             : #include "netio/netio.h"
       6             : 
       7             : #define PUBLISH_SOCKET_MAX_COMPLETIONS (512)
       8             : 
       9             : #if defined DEBUG || defined DEBUG_UPUB
      10             : #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
      11             : #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
      12             : #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
      13             : #else
      14             : #define log_dbg(...)
      15             : #define log_trc(...)
      16             : #endif
      17             : 
      18             : static unsigned lookup_tag(struct netio_subscription_table* table, netio_tag_t tag, unsigned* start);
      19             : 
      20             : static int
      21     1477454 : cmp_subscription(const void* a, const void *b)
      22             : {
      23     1477454 :   struct netio_subscription* suba = (struct netio_subscription*)a;
      24     1477454 :   struct netio_subscription* subb = (struct netio_subscription*)b;
      25             : 
      26     1477454 :   if(suba->tag == subb->tag) {
      27             :     return 0;
      28             :   }
      29          16 :   return suba->tag > subb->tag ? 1 : -1;
      30             : }
      31             : 
      32             : 
      33             : static int
      34          34 : table_add_subscription(struct netio_subscription_table* table, netio_tag_t tag, struct netio_send_socket* socket)
      35             : {
      36          34 :   if(table->num_subscriptions == table->size) {
      37           0 :     log_error("Maximum number of subscriptions. New subscription for 0x%lx dropped.", tag);
      38           0 :     return 1;
      39             :   }
      40             : 
      41             :   //Check if the subscription is already in the list
      42          34 :   unsigned start;
      43          34 :   unsigned count = lookup_tag(table, tag, &start);
      44          38 :   for(unsigned i=0; i<count; i++) {
      45           4 :     if(table->subscriptions[start+i].socket == socket) {
      46             :       return 0;
      47             :     }
      48             :   }
      49             : 
      50          34 :   table->subscriptions[table->num_subscriptions].tag = tag;
      51          34 :   table->subscriptions[table->num_subscriptions].socket = socket;
      52          34 :   table->subscriptions[table->num_subscriptions].again = 0;
      53          34 :   table->num_subscriptions++;
      54          34 :   table->ts++;
      55          34 :   log_info("New entry in subscription table, tag=%lu in hex 0x%lx, socket=%p, total #subscription=%lu", tag, tag, socket, table->num_subscriptions);
      56          34 :   qsort(table->subscriptions, table->num_subscriptions, sizeof(struct netio_subscription), cmp_subscription);
      57          34 :   return 0;
      58             : }
      59             : 
      60             : 
      61             : /**
      62             :  * @brief Handle an unsubscription or a client disconnection.
      63             :  *
      64             :  * @param pubsocket: netio_unbuffered_publish_socket to which all send sockets are associated.
      65             :  * @param socket: the netio_send_socket not needed anymore
      66             :  * @param tag:                      the tag for which an unsubscribe request has been received
      67             :  * @param closed_connection:        a flag to enable the removal of all the subscriptions associated to
      68             :  *                                  the send socket in response to a closed connection.
      69             :  */
      70             : static void
      71          22 : table_remove_subscription(struct netio_unbuffered_publish_socket* pubsocket, struct netio_send_socket* socket, netio_tag_t tag, uint8_t closed_connection)
      72             : {
      73          22 :   struct netio_subscription_table* table = &pubsocket->subscription_table;
      74          22 :   log_dbg("Total subscriptions: %lu", table->num_subscriptions);
      75             : 
      76          22 :   unsigned i=0;
      77          22 :   unsigned remaining_subscriptions_of_socket=0;
      78          45 :   for(unsigned int k = 0; k < table->num_subscriptions; ++k) {
      79          23 :     if(table->subscriptions[k].socket == socket){remaining_subscriptions_of_socket++;}
      80             :   }
      81             : 
      82          45 :   while(i<table->num_subscriptions) {
      83          23 :     log_dbg("Removing subscription tag  %lx table socket %p socket %p", table->subscriptions[i].tag, table->subscriptions[i].socket, socket);
      84          23 :     if(table->subscriptions[i].socket == socket && (table->subscriptions[i].tag == tag || closed_connection)) {
      85             :       //increment the completion semaphore counter as if messages were sent to the disconnected client
      86             :       struct netio_completion_stack* cs = &pubsocket->completion_stack;
      87       11286 :       for(size_t j=0; j < cs->num_objects; ++j) {
      88       11264 :         if(cs->objects[j].header.tag == table->subscriptions[i].tag && cs->objects[j].key != 0xFFFFFFFFFFFFFFFF){
      89        8523 :           netio_semaphore_increment(&cs->objects[j].sem, 1);
      90             :         }
      91             :       }
      92             : 
      93          22 :       log_dbg("Available completion objects %lu / %lu",  cs->available_objects, cs->num_objects);
      94             :       log_dbg("Removing connection in subscription table, tag=%lu, socket=%p index %lu becomes %u",
      95             :                 table->subscriptions[i].tag,
      96             :                 table->subscriptions[i].socket,
      97             :                 table->num_subscriptions-1,
      98          22 :                 i);
      99          22 :       table->subscriptions[i].tag = table->subscriptions[table->num_subscriptions-1].tag;
     100          22 :       table->subscriptions[i].socket = table->subscriptions[table->num_subscriptions-1].socket;
     101          22 :       table->subscriptions[i].again = table->subscriptions[table->num_subscriptions-1].again;
     102          22 :       table->num_subscriptions--;
     103          22 :       remaining_subscriptions_of_socket--;
     104          22 :       table->ts++;
     105             :     }
     106             :     else{
     107           1 :       i++;
     108          45 :       log_dbg("Skipping tag=%lu because the socket is %p", table->subscriptions[i].tag, table->subscriptions[i].socket);
     109             :     }
     110             :   }
     111             : 
     112          22 :   log_dbg("At the end of table_remove_subscription available completion objects %lu / %lu",  pubsocket->completion_stack.available_objects, pubsocket->completion_stack.num_objects);
     113          22 :   qsort(table->subscriptions, table->num_subscriptions, sizeof(struct netio_subscription), cmp_subscription);
     114          22 :   log_dbg("Remaining subscriptions: %lu",table->num_subscriptions);
     115          22 :   log_info("Removing entry from unbuffered subscription table, Fid=0x%lx, socket=%p, total #subscription=%lu", tag, socket, table->num_subscriptions);
     116             : 
     117          22 :   if(closed_connection==0 && remaining_subscriptions_of_socket==0){
     118          13 :     log_info("Disconnecting endpoint with zero subscriptions");
     119          13 :     netio_disconnect(socket);
     120             :   }
     121          22 : }
     122             : 
     123             : static int
     124          22 : table_get_subs_by_socket(struct netio_subscription_table* table, struct netio_send_socket* socket)
     125             : {
     126          22 :   log_dbg("Subscription table lookup: Total subscriptions: %lu", table->num_subscriptions);
     127          22 :   unsigned remaining_subscriptions_of_socket=0;
     128          31 :   for(unsigned int k = 0; k < table->num_subscriptions; ++k) {
     129           9 :     if(table->subscriptions[k].socket == socket){remaining_subscriptions_of_socket++;}
     130             :   }
     131          22 :   log_dbg("Subscription table lookup: for socket %p: %lu", socket, remaining_subscriptions_of_socket);
     132          22 :   return remaining_subscriptions_of_socket;
     133             : }
     134             : 
     135             : static void
     136    36808994 : release_completion_object(struct netio_completion_object* completion)
     137             : {
     138    36808994 :   log_trc("releasing completion object");
     139    36808994 :   if(netio_completion_stack_push(&completion->socket->completion_stack, completion)) {
     140           0 :     log_error("Could not push completion object since stack was full. This should not happen.");
     141             :   }
     142    36808994 : }
     143             : 
     144             : static void
     145    36808994 : on_completion_trigger(void* c)
     146             : {
     147    36808994 :   struct netio_completion_object* completion = (struct netio_completion_object*)c;
     148    36808994 :   log_trc("calling cb_msg_published");
     149    36808994 :   if(completion->socket->cb_msg_published) {
     150    36808992 :     completion->socket->cb_msg_published(completion->socket, completion->key);
     151             :   }
     152    36808994 :   release_completion_object(completion);
     153    36808994 : }
     154             : 
     155           0 : void print_completion_objects(struct netio_unbuffered_publish_socket* socket){
     156           0 :   printf("Number of available completion objects: %zu \n", socket->completion_stack.available_objects);
     157           0 :   printf("===============================================\n");
     158           0 :   printf("CO# \t KEY \t \t TAG \n");
     159           0 :   printf("-----------------------------------------------\n");
     160           0 :   for(unsigned int i=0; i < socket->completion_stack.num_objects; ++i){
     161           0 :     uint32_t tag = (socket->completion_stack.key_array[i] >> 32);
     162           0 :     printf("%u \t 0x%lx \t %x \n", i, socket->completion_stack.key_array[i], tag);
     163             :   }
     164           0 :   printf("===============================================\n");
     165           0 :   printf("Subscription table: there are %zu elinks subscribed\n", socket->subscription_table.num_subscriptions);
     166           0 :   printf("FID \t \t \t SOCKET\n");
     167           0 :   printf("-----------------------------------------------\n");
     168           0 :   for (unsigned int i=0; i < socket->subscription_table.num_subscriptions; ++i){
     169           0 :     printf("0x%lx \t %p \n", socket->subscription_table.subscriptions[i].tag, socket->subscription_table.subscriptions[i].socket);
     170             :   }
     171           0 :   socket->completion_stack.printed = 1;
     172           0 : }
     173             : 
     174             : 
     175             : 
     176             : static struct netio_completion_object*
     177    38747186 : fetch_completion_object(struct netio_unbuffered_publish_socket* socket)
     178             : {
     179    38747186 :   struct netio_completion_object* completion;
     180    38747186 :   if(netio_completion_stack_pop(&socket->completion_stack, &completion)) {
     181             : #if defined DEBUG || defined DEBUG_UPUB
     182             :     if (socket->completion_stack.printed == 0){
     183             :       print_completion_objects(socket);
     184             :     }
     185             : #endif
     186             :     return NULL;
     187             :   }
     188             : 
     189    36808994 :   netio_semaphore_init(&completion->sem, 0);
     190    36808994 :   completion->sem.data = completion;
     191    36808994 :   completion->sem.cb = on_completion_trigger;
     192    36808994 :   completion->socket = socket;
     193    36808994 :   return completion;
     194             : }
     195             : 
     196             : static void
     197    36805246 : increment_completion_object(struct netio_unbuffered_publish_socket* pubsocket, uint64_t key)
     198             : {
     199    36805246 :   log_trc("incrementing completion object");
     200    36805246 :   struct netio_completion_object *completion = (struct netio_completion_object*)key;
     201    36805246 :   netio_semaphore_increment(&completion->sem, 1);
     202    36805246 :   log_trc("current: %d     expected: %d", completion->sem.current, completion->sem.threshold);
     203             : }
     204             : 
     205             : static void
     206          32 : pubsocket_on_connection_established(struct netio_send_socket* socket)
     207             : {
     208          32 :   log_dbg("publish socket established connection to remote, can publish now");
     209          32 :   struct netio_unbuffered_publish_socket* pubsocket = (struct netio_unbuffered_publish_socket*)socket->unbuf_pub_socket;
     210             :   //add deferred subscriptions to the table
     211          32 :   struct deferred_subscription* sub = socket->deferred_subs;
     212          66 :   while(sub){
     213          34 :     int ret = table_add_subscription(&pubsocket->subscription_table, sub->tag, socket);
     214          34 :     if(pubsocket->cb_subscribe && ret == 0){
     215          34 :       pubsocket->cb_subscribe(pubsocket, sub->tag, NULL, 0);
     216             :     }
     217          34 :     pop_subscription(&sub);
     218             :   }
     219             :   //user callback
     220          32 :   if(pubsocket->cb_connection_established) {
     221          32 :     pubsocket->cb_connection_established(pubsocket);
     222             :   }
     223          32 : }
     224             : 
     225             : 
     226             : static void
     227          22 : pubsocket_on_connection_closed(struct netio_send_socket* socket)
     228             : {
     229          22 :   log_info("published socket: connection to remote was closed");
     230          22 :   struct netio_unbuffered_publish_socket* pubsocket = (struct netio_unbuffered_publish_socket*)socket->unbuf_pub_socket;
     231          22 :   if(pubsocket->cb_connection_closed) {
     232          22 :     pubsocket->cb_connection_closed(pubsocket);
     233             :   }
     234          44 :   if (table_get_subs_by_socket(&pubsocket->subscription_table, socket)){
     235           9 :     uint8_t closed_connection = 1;
     236           9 :     table_remove_subscription(pubsocket, socket, 0, closed_connection);
     237             :   }
     238          22 :   handle_send_socket_shutdown(socket);
     239          22 :   remove_socket(&(pubsocket->subscription_table.socket_list), socket);
     240          22 : }
     241             : 
     242             : 
     243             : static void
     244         318 : on_unbuffered_send_connection_closed(struct netio_send_socket* socket)
     245             : {
     246         318 :     log_info("Send socket: connection to remote was closed");
     247         318 :     if(socket->unbuf_pub_socket != NULL) {
     248           0 :       pubsocket_on_connection_closed(socket);
     249             :     } else{
     250         318 :       handle_send_socket_shutdown(socket);
     251             :     }
     252         318 : }
     253             : 
     254             : static void
     255    36805246 : pubsocket_on_send_completed(struct netio_send_socket* socket, uint64_t key)
     256             : {
     257    36805246 :   struct netio_unbuffered_publish_socket* pubsocket = (struct netio_unbuffered_publish_socket*)socket->unbuf_pub_socket;
     258    36805246 :   increment_completion_object(pubsocket, key);
     259    36805246 :   log_trc("completion on pubsocket connection, key=%lu", key);
     260    36805246 : }
     261             : 
     262             : 
     263             : static struct netio_send_socket*
     264          34 : socket_list_add_or_lookup(struct netio_unbuffered_publish_socket* pubsocket, struct netio_socket_list** list, void* addr, size_t addrlen, struct netio_context* ctx)
     265             : {
     266          34 :   if(addrlen == 0) {
     267           0 :     log_error("Invalid zero-byte address");
     268           0 :     return NULL;
     269             :   }
     270             : 
     271          34 :   struct netio_socket_list* entry = find_socket_by_address(*list, addr, addrlen);
     272             : 
     273          34 :   if ( entry == NULL ) {
     274          32 :     log_dbg("socket_list_add_or_lookup, adding new entry");
     275          32 :     entry = add_socket_with_address(list, USEND, addr, addrlen);
     276          32 :     struct netio_domain* domain = NULL;  //domain is shared among sockets,
     277          32 :     if (entry->next != NULL && entry->next->socket != NULL) {  //because we push to head of list
     278           5 :       domain = ((struct netio_send_socket*)(entry->next->socket))->domain;
     279             :     }
     280          32 :     struct netio_send_socket* socket = (struct netio_send_socket*)entry->socket;
     281          32 :     netio_init_send_socket(socket, ctx); //here we memset to zero and we loose domain
     282          32 :     socket->unbuf_pub_socket = pubsocket;
     283          32 :     socket->cb_connection_established = pubsocket_on_connection_established;
     284          32 :     socket->cb_connection_closed = pubsocket_on_connection_closed;
     285          32 :     socket->cb_send_completed = pubsocket_on_send_completed;
     286          32 :     netio_connect_rawaddr_domain(socket, entry->addr, entry->addrlen, domain);
     287          32 :     if ( domain == NULL ){  //check on local domain variable
     288          27 :       netio_register_send_buffer(socket, &pubsocket->buf, 0);
     289          27 :       netio_completion_stack_register_send_socket(&pubsocket->completion_stack, socket);  
     290             :     }
     291             :   }
     292          34 :   struct netio_send_socket* ss = (struct netio_send_socket*)entry->socket;
     293          34 :   return ss;
     294             : }
     295             : 
     296             : 
     297             : static void
     298          34 : subscribe(struct netio_unbuffered_publish_socket* pubsocket, struct netio_recv_socket* recv_socket, void* addr, size_t addrlen, netio_tag_t tag)
     299             : {
     300          34 :   struct netio_send_socket* socket = socket_list_add_or_lookup(pubsocket, &pubsocket->subscription_table.socket_list, addr, addrlen, pubsocket->ctx);
     301          34 :   if(socket->recv_socket == NULL){
     302          32 :     socket->recv_socket =recv_socket;
     303             :   }
     304             : 
     305          34 :   if (socket->state == CONNECTED){
     306           0 :     table_add_subscription(&pubsocket->subscription_table, tag, socket);
     307           0 :     if(pubsocket->cb_subscribe) {
     308           0 :       pubsocket->cb_subscribe(pubsocket, tag, addr, addrlen);
     309             :     }
     310             :   } else {
     311          34 :     push_back_subscription(&socket->deferred_subs, tag);
     312             :   }
     313          34 : }
     314             : 
     315             : static void
     316          13 : unsubscribe(struct netio_unbuffered_publish_socket* pubsocket, void* addr, size_t addrlen, netio_tag_t tag)
     317             : {
     318          13 :   struct netio_socket_list* list = find_socket_by_address(pubsocket->subscription_table.socket_list, addr, addrlen);
     319          13 :   if(list == NULL){return;}
     320             : 
     321          13 :   struct netio_send_socket* socket = (struct netio_send_socket*)list->socket;
     322          13 :   uint8_t connection_closed = 0;
     323          13 :   table_remove_subscription(pubsocket, socket, tag, connection_closed);
     324          13 :   pubsocket->subscription_table.ts++;
     325             : 
     326          13 :   if(pubsocket->cb_unsubscribe) {
     327           2 :     pubsocket->cb_unsubscribe(pubsocket, tag, addr, addrlen);
     328             :   }
     329             : }
     330             : 
     331             : static void
     332          32 : lsocket_on_connection_established(struct netio_recv_socket* socket)
     333             : {
     334          32 :   log_dbg("Buffered listen socket: on connection establsihed");
     335          32 : }
     336             : 
     337             : 
     338             : static void
     339          47 : parse_subscription_message(struct netio_unbuffered_publish_socket* socket, struct netio_subscription_message* msg, struct netio_recv_socket* recv_socket)
     340             : {
     341          47 :   if (msg->action){
     342          34 :     log_dbg("subscription request for tag %lu, addrlen=%lu", msg->tag, msg->addrlen);
     343          34 :     subscribe(socket, recv_socket, msg->addr, msg->addrlen, msg->tag);
     344             :   }
     345             :   else{
     346          13 :     log_dbg("unsubscription request for tag %lu, addrlen=%lu", msg->tag, msg->addrlen);
     347          13 :     unsubscribe(socket, msg->addr, msg->addrlen, msg->tag);
     348             :   }
     349          47 : }
     350             : 
     351             : 
     352             : static void
     353          47 : lsocket_on_msg_received(struct netio_recv_socket* socket, struct netio_buffer* buf, void* data, size_t len)
     354             : {
     355          47 :     log_trc("message received");
     356          47 :     struct netio_unbuffered_publish_socket* pubsocket = (struct netio_unbuffered_publish_socket*)socket->lsocket->usr;
     357          47 :     if(len != sizeof(struct netio_subscription_message)) {
     358           0 :       log_error("Illegal subscription message size %lu", len);
     359           0 :       netio_post_recv(socket, buf);
     360           0 :       return;
     361             :     }
     362          47 :     parse_subscription_message(pubsocket, (struct netio_subscription_message*)buf->data, socket);
     363          47 :     netio_post_recv(socket, buf);
     364             : }
     365             : 
     366             : 
     367             : static int
     368        2243 : send_subscription_message(struct netio_unbuffered_subscribe_socket* socket, netio_tag_t tag, int action)
     369             : {
     370        2243 :     if( action == NETIO_SUBSCRIBE ){
     371        1134 :       log_info("Sending subscription for tag 0x%lx", tag);
     372        1109 :     } else if ( action == NETIO_UNSUBSCRIBE ){
     373        1109 :       log_info("Sending unsubscription for tag 0x%lx", tag);
     374             :     } else {
     375           0 :       log_error("Invalid subscription action %d", action);
     376           0 :       return 0;
     377             :     }
     378        2243 :     socket->msg.tag = tag;
     379        2243 :     socket->msg.action = action;
     380        2243 :     socket->buf.data = &socket->msg;
     381        2243 :     socket->buf.size = sizeof(struct netio_subscription_message);
     382        2243 :     int ret = netio_send_inline_buffer(&socket->socket, &socket->buf);
     383        2243 :     return ret;
     384             : }
     385             : 
     386             : /**
     387             :  * Initialize an unbuffered publish socket
     388             :  *
     389             :  * @param socket: An unbuffered publish socket
     390             :  * @param ctx: A netio context
     391             :  * @param hostname: Local hostname to bind to
     392             :  * @param port: Local port to bind to
     393             :  * @param buf: A registered send buffer
     394             :  */
     395             : void
     396          35 : netio_unbuffered_publish_socket_init(struct netio_unbuffered_publish_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffer* buf)
     397             : {
     398          35 :     log_dbg("netio_unbuffered_publish_socket_init");
     399          35 :     socket->ctx = ctx;
     400          35 :     memcpy(&socket->buf, buf, sizeof(struct netio_buffer));
     401          35 :     netio_subscription_table_init(&socket->subscription_table);
     402          35 :     netio_completion_stack_init(&socket->completion_stack, PUBLISH_SOCKET_MAX_COMPLETIONS);
     403          35 :     netio_init_listen_socket(&socket->lsocket, ctx, NULL);
     404          35 :     socket->lsocket.usr = socket;
     405          35 :     socket->lsocket.cb_connection_established = lsocket_on_connection_established;
     406          35 :     socket->lsocket.cb_msg_received = lsocket_on_msg_received;
     407          35 :     socket->lsocket.recv_sub_msg = 1;
     408          35 :     netio_listen(&socket->lsocket, hostname, port);
     409             : 
     410          35 :     socket->buf_array[0] = &socket->completion_stack.buf;
     411         980 :     for(unsigned i=1; i<NETIO_MAX_IOV_LEN; i++) {
     412         945 :       socket->buf_array[i] = &socket->buf;
     413             :     }
     414          35 : }
     415             : 
     416             : 
     417             : 
     418             : static unsigned
     419     2501662 : lookup_tag(struct netio_subscription_table* table, netio_tag_t tag, unsigned* start)
     420             : {
     421     2501662 :   struct netio_subscription key;
     422     2501662 :   key.tag = tag;
     423     5003324 :   struct netio_subscription* ptr = (struct netio_subscription*)bsearch(&key,
     424     2501662 :                                                                        table->subscriptions,
     425             :                                                                        table->num_subscriptions,
     426             :                                                                        sizeof(struct netio_subscription),
     427             :                                                                        cmp_subscription);
     428             : 
     429     2501662 :   log_trc("found ptr=0x%p", ptr);
     430     2501662 :   if(ptr == NULL) {
     431             :     return 0;
     432             :   }
     433     1477434 :   unsigned start_idx = ptr - table->subscriptions;
     434     1478466 :         while(start_idx > 0 && table->subscriptions[start_idx-1].tag == tag) {
     435             :                 start_idx--;
     436             :         }
     437             :         log_trc("start_idx=%d", start_idx);
     438             :   unsigned count = 0;
     439     2955900 :   for(unsigned i=start_idx; i<table->num_subscriptions; i++) {
     440     1478473 :     if(table->subscriptions[i].tag == tag) {
     441     1478466 :       count++;
     442             :     } else {
     443             :       break;
     444             :     }
     445             :   }
     446     1477434 :   *start = start_idx;
     447     1477434 :   return count;
     448             : }
     449             : 
     450             : static unsigned
     451    81567839 : lookup_tag_cached(struct netio_subscription_table* table, netio_tag_t tag, struct netio_subscription_cache* cache, unsigned* start)
     452             : {
     453    81567839 :   if(cache->ts != table->ts) {
     454          42 :                 log_trc("subscription updating cache");
     455          42 :     cache->count = lookup_tag(table, tag, &cache->idx_start);
     456          42 :     cache->ts = table->ts;
     457             :   }
     458             : 
     459             : #if defined DEBUG || defined DEBUG_UBUF
     460             :           for(unsigned i=0; i<table->num_subscriptions; i++) {
     461             :                         printf("%u:%lu ", i, table->subscriptions[i].tag);
     462             :                 }
     463             :                 printf("\n");
     464             : #endif
     465             : 
     466    81567839 :   *start = cache->idx_start;
     467    81567839 :         log_trc("cache count=%d for tag=%lu", cache->count, tag);
     468    81567839 :   return cache->count;
     469             : }
     470             : 
     471             : /**
     472             :  * @brief Publishes a message on an unbuffered publish socket.
     473             :  *
     474             :  * @see netio_unbuffered_publishv_usr
     475             :  *
     476             :  * Same as netio_unbuffered_publishv_usr, but without the user header data field.
     477             :  */
     478             : int
     479     2501584 : netio_unbuffered_publishv(struct netio_unbuffered_publish_socket* socket,
     480             :                               netio_tag_t tag,
     481             :                               struct iovec* iov,
     482             :                               size_t count,
     483             :                               uint64_t* key,
     484             :                               int flags,
     485             :                               struct netio_subscription_cache* cache)
     486             : {
     487     2501584 :     return netio_unbuffered_publishv_usr(socket, tag, iov, count, key, flags, cache, 0, 0);
     488             : }
     489             : 
     490             : /**
     491             :  * @brief Publishes a message on an unbuffered publish socket.
     492             :  *
     493             :  * The message is given as a scatter/gather buffer (`struct iovec`). The caller
     494             :  * has to ensure the validity of the buffer until the transfer is complete.
     495             :  * A transfer is complete when the socket's `msg_published` callback has been
     496             :  * called. A key can be passed to the call to identify the publication.
     497             :  * The key will be passed in the `msg_published` callback.
     498             :  *
     499             :  * The `msg_published` callback will only be called if the message has been
     500             :  * sent successfully to all subscribed endpoints.
     501             :  *
     502             :  * The call may return `NETIO_STATUS_AGAIN` if one of the sockets connections
     503             :  * yields `NETIO_STATUS_AGAIN`. In this case it is the user's responsibility
     504             :  * to call `netio_unbuffered_publishv` again with the `NETIO_REENTRY` flag.
     505             :  *
     506             :  * @param socket:  The socket to publish on
     507             :  * @param tag:     The tag under which to publish
     508             :  * @param iov:     Message data iov
     509             :  * @param count:   IOV count
     510             :  * @param key:     Key that will be passed to the callback on successful publish of the
     511             :  *                 message. This is an input-output parameter. In case the function
     512             :  *                 returns NETIO_STATUS_PARTIAL, 'key' is used as storage to track the
     513             :  *                 completion data for the given tag. If netio_unbuffered_publishv is
     514             :  *                 called again with the NETIO_REENTRY flag, 'key' must remain
     515             :  *                 unchanged. In other words, for a given tag, 'key' is only set by the
     516             :  *                 user before the initial call to netio_unbuffered_publishv without
     517             :  *                 the NETIO_REENTRY flag.
     518             :  * @param flags:   NETIO_REENTRY  publishing of this message was attempted before and
     519             :  *                         resulted in NETIO_STATUS_AGAIN. Calling publish with
     520             :  *                         this flag will only send on connections where the
     521             :  *                         message was previously unpublished.
     522             :  * @param cache:   Optional user-supplied cache for the subsctiption table lookup.
     523             :  * @param usr:     Up to 8 byte of data that are transmitted as beginning of the message. This
     524             :  *                 allows the user to add a short header to a message without having to allocate
     525             :  *                 bufferspace for it.
     526             :  * @param usr_size: Size of the usr header field. Set to 0 if no header is required. The maximum
     527             :  *                  header size is 8.
     528             :  *
     529             :  * @returns NETIO_STATUS_OK       If the message was published successfully to all subscribed endpoints
     530             :  * @returns NETIO_STATUS_OK_NOSUB No ongoing subscriptions to publish the given message
     531             :  * @returns NETIO_STATUS_AGAIN    If not enough resources are available to proceed with the operation.
     532             :  *                                No data were sent to any endpoint. The user should try again with the exact same parameters.
     533             :  * @returns NETIO_STATUS_PARTIAL  The message was sent to some of the subscribed endpoints, but not all. The user should
     534             :  *                                try again, and additionally set the NETIO_REENTRY flag. Users must take care not to
     535             :  *                                overwrite the key parameter, which is used by the function call to track the operation
     536             :  *                                status.
     537             :  * @returns NETIO_ERROR_MAX_IOV_EXCEEDED
     538             :  *                                Too many iovec entries, try with less.
     539             :  */
     540             : int
     541    84069425 : netio_unbuffered_publishv_usr(struct netio_unbuffered_publish_socket* socket,
     542             :                               netio_tag_t tag,
     543             :                               struct iovec* iov,
     544             :                               size_t count,
     545             :                               uint64_t* key,
     546             :                               int flags,
     547             :                               struct netio_subscription_cache* cache,
     548             :                               uint64_t usr,
     549             :                               uint8_t usr_size)
     550             : {
     551    84069425 :   int ret = NETIO_STATUS_OK;
     552    84069425 :   if(count > NETIO_MAX_IOV_LEN-1) {
     553             :     return NETIO_ERROR_MAX_IOV_EXCEEDED;
     554             :   }
     555             : 
     556    84069425 :   log_trc("unbuffered publishv, key=0x%p, tag=%lu", key, tag);
     557             : 
     558    84069425 :   unsigned start_idx;
     559    84069425 :   unsigned num_subscriptions;
     560             : 
     561    84069425 :   if(cache) {
     562    81567881 :     num_subscriptions = lookup_tag_cached(&socket->subscription_table, tag, cache, &start_idx);
     563             :   } else {
     564     2501586 :     num_subscriptions = lookup_tag(&socket->subscription_table, tag, &start_idx);
     565             :   }
     566             : 
     567    84069425 :   log_trc("unbuffered publishv: num_subscriptions for tag %lu: %d, start_idx: %d", tag, num_subscriptions, start_idx);
     568    84069425 :   if(num_subscriptions == 0) {
     569    45322239 :     if(socket->cb_msg_published) {
     570    45322239 :       socket->cb_msg_published(socket, *key);
     571             :     }
     572    45322239 :     return NETIO_STATUS_OK_NOSUB;
     573             :   }
     574             : 
     575    38747186 :   int used_completion = 0;
     576    38747186 :   struct netio_completion_object* completion = NULL;
     577    38747186 :   if(flags & NETIO_REENTRY) {
     578           0 :     log_trc("unbuffered publishv: REENTRY - fetching completion from user");
     579           0 :     completion = (struct netio_completion_object*)(*key);
     580           0 :     used_completion = 1;
     581             :   } else {
     582    38747186 :     log_trc("unbuffered publishv: fetching completion object");
     583    38747186 :     completion = fetch_completion_object(socket);
     584    38747186 :     if(completion) {
     585    36808994 :       netio_semphore_set_threshold(&completion->sem, num_subscriptions);
     586    36808994 :       completion->key = *key;
     587    36808994 :       completion->header.tag = tag;
     588    36808994 :       completion->header.usr = usr;
     589    36808994 :       completion->usr_size = usr_size;
     590    36808994 :       socket->completion_stack.key_array[socket->completion_stack.available_objects] = *key;
     591    36808994 :       *key = (uint64_t)completion;
     592    36808994 :       used_completion = 0;
     593    36808994 :       log_trc("fetched completion: %lu for tag 0%lx", completion->key, tag);
     594             :     } else {
     595             :       log_trc("unbuffered publishv: no completion available -> AGAIN");
     596             :       // When no completion is available, we return NETIO_STATUS_AGAIN
     597             :       // The user is supposed to call the same call again (no need to keep track of completion object)
     598             :       return NETIO_STATUS_AGAIN;
     599             :     }
     600             :   }
     601             : 
     602             : 
     603    73619018 :   for(unsigned i=start_idx; i<start_idx+num_subscriptions; i++) {
     604    36810024 :     struct netio_subscription* subscription = &socket->subscription_table.subscriptions[i];
     605    36810024 :     log_trc("subscription %d has tag %lu", i, subscription->tag);
     606    36810024 :     if(subscription->tag == tag) {
     607             :       // skip connections that were already successful if we are in reentry mode
     608    36810024 :       if(flags & NETIO_REENTRY) {
     609           0 :         if(subscription->again == 0) {
     610           0 :           continue;
     611             :         }
     612             :       }
     613             : 
     614    36810024 :       struct netio_send_socket* ssocket = (struct netio_send_socket*)(subscription->socket);
     615    36810024 :       log_trc("unbuffered publishv: send message on subscribed connection, iov-count=%lu, iov-len[0]=%lu", count, iov[0].iov_len);
     616    36810024 :       int result;
     617    36810024 :       struct iovec hdr_iov[NETIO_MAX_IOV_LEN];
     618    36810024 :       hdr_iov[0].iov_base = &completion->header;
     619    36810024 :       hdr_iov[0].iov_len = sizeof(netio_tag_t) + completion->usr_size;
     620   195350990 :       for(unsigned j=0; j<count; j++) {
     621   158540966 :         hdr_iov[1+j].iov_base = iov[j].iov_base;
     622   158540966 :         hdr_iov[1+j].iov_len = iov[j].iov_len;
     623             :       }
     624    36810024 :       log_trc("iov: [0]: %lu - compl: %lu", *((netio_tag_t*)hdr_iov[0].iov_base), completion->key);
     625    36810024 :       result = netio_sendv_imm(ssocket, socket->buf_array, hdr_iov, count+1, (uint64_t)completion, 0);
     626    36810024 :       log_trc("unbuffered publishv: result=%d", result);
     627             : 
     628    36810024 :       if(result == NETIO_STATUS_OK) {
     629    36810024 :         used_completion = 1;
     630    36810024 :         subscription->again = 0;
     631           0 :       } else if(result == NETIO_STATUS_AGAIN) {
     632           0 :         subscription->again = 1;
     633           0 :         used_completion = 1;
     634           0 :         ret = NETIO_STATUS_PARTIAL;
     635             :       } else {
     636           0 :         return result;
     637             :         // some error occured and we return immediately
     638             :         // TODO we should handle the error and unsubscribe the faulty remote
     639             :       }
     640             :     }
     641             :   }
     642             : 
     643    36808994 :   if(used_completion == 0) {
     644           0 :     netio_completion_stack_push(&socket->completion_stack, completion);
     645             :   }
     646             : 
     647             :   return ret;
     648             : }
     649             : 
     650             : 
     651             : static void
     652         116 : subscribe_socket_on_connection_established(struct netio_recv_socket* socket)
     653             : {
     654         116 :         struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->lsocket->usr;
     655         116 :   if(subscribe_socket->cb_connection_established) {
     656          95 :     subscribe_socket->cb_connection_established(subscribe_socket);
     657             :   }
     658         116 : }
     659             : 
     660             : static void
     661          90 : subscribe_socket_on_connection_closed(struct netio_recv_socket* socket)
     662             : {
     663          90 :         struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->lsocket->usr;
     664          90 :         log_info("subscribe socket: connection closed");
     665          90 :   if(subscribe_socket->cb_connection_closed)  {
     666          90 :     subscribe_socket->cb_connection_closed(subscribe_socket);
     667             :   }
     668          90 : }
     669             : 
     670             : static void
     671    48681059 : subscribe_socket_on_msg_received(struct netio_recv_socket* rsocket, struct netio_buffer* buf, void* data, size_t len)
     672             : {
     673    48681059 :     struct netio_unbuffered_subscribe_socket* socket = (struct netio_unbuffered_subscribe_socket*)rsocket->lsocket->usr;
     674             : 
     675    48681059 :     netio_tag_t tag;
     676    48681059 :     memcpy(&tag, data, sizeof(netio_tag_t));
     677    48681059 :     data = (char *)data + sizeof(netio_tag_t);
     678    48681059 :     len -= sizeof(netio_tag_t);
     679             : 
     680    48681059 :     log_trc("buffer received of length %lu for tag %lu", len, tag);
     681             : 
     682    48681059 :     if(socket->cb_msg_received) {
     683    48681059 :       socket->cb_msg_received(socket, tag, data, len);
     684             :     }
     685             : 
     686    48681043 :     netio_post_recv(rsocket, buf);
     687    48681043 : }
     688             : 
     689             : 
     690             : static void
     691         114 : subsocket_on_connection_established(struct netio_send_socket* socket)
     692             : {
     693         114 :     log_dbg("subsocket connection established");
     694         114 :     int ret = 0;
     695         114 :     struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->usr;
     696             : 
     697         114 :     subscribe_socket->msg.addrlen = NETIO_MAX_ADDRLEN;
     698         114 :     if((ret = fi_getname(&subscribe_socket->recv_socket.pep->fid,
     699         114 :                         subscribe_socket->msg.addr,
     700             :                         &subscribe_socket->msg.addrlen)) !=0) {
     701           0 :       log_fatal("Cannot get information on local interface %d: %s", ret, fi_strerror(-ret));
     702           0 :       exit(1);
     703             :     }
     704         114 :     subscribe_socket->buf.data = &subscribe_socket->msg;
     705         114 :     subscribe_socket->buf.size = sizeof(struct netio_subscription_message);
     706         114 :     netio_register_send_buffer(socket, &subscribe_socket->buf, 0);
     707             : 
     708         114 :     if(subscribe_socket->total_tags == 0){
     709           0 :       log_info("Closing send connection again because there is no tag to subscribe to.");
     710           0 :       netio_disconnect(socket);
     711             :     }
     712             : 
     713             :     // send tags one by one
     714         552 :     while(subscribe_socket->total_tags > 0){
     715         438 :       size_t idx = subscribe_socket->total_tags - 1;
     716         438 :       netio_tag_t tag = subscribe_socket->tags_to_subscribe[idx];
     717         438 :       log_info("subsocket_on_connection_established, sending subscription for tag 0x%lx pos %lu", tag, idx);
     718         438 :       ret = send_subscription_message(subscribe_socket, tag, NETIO_SUBSCRIBE);
     719         438 :       if (ret == NETIO_STATUS_OK){
     720         438 :         subscribe_socket->total_tags--;
     721             :       } else {
     722           0 :         log_warn("subsocket_on_connection_established: send_subscription_message returned %d", ret);
     723           0 :         break;
     724             :       }
     725             :     }
     726         114 : }
     727             : 
     728             : 
     729             : static void
     730          82 : subsocket_on_connection_closed(struct netio_send_socket* socket)
     731             : {
     732          82 :     struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->usr;
     733          82 :     subscribe_socket->state = NONE;
     734          82 : }
     735             : 
     736             : static void
     737          50 : subsocket_on_error_connection_refused(struct netio_send_socket* socket) {
     738          50 :     log_dbg("subsocket connection refused");
     739          50 :     struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->usr;
     740          50 :     subscribe_socket->state = NONE;
     741          50 :     handle_listen_socket_shutdown(&subscribe_socket->recv_socket);
     742          50 :     if(subscribe_socket->cb_error_connection_refused) {
     743          50 :       subscribe_socket->cb_error_connection_refused(subscribe_socket);
     744             :     }
     745          50 : }
     746             : 
     747             : 
     748             : static void
     749        2236 : subsocket_on_send_completed(struct netio_send_socket* socket, uint64_t key)
     750             : {
     751        2236 :     struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->usr;
     752             : 
     753             :     //check for remaining tags from on_connection_established
     754        2236 :     while(subscribe_socket->total_tags > 0){
     755           0 :       size_t idx = subscribe_socket->total_tags - 1;
     756           0 :       netio_tag_t tag = subscribe_socket->tags_to_subscribe[idx];
     757           0 :       int ret = send_subscription_message(subscribe_socket, tag, NETIO_SUBSCRIBE);
     758           0 :       if (ret == NETIO_STATUS_OK){
     759           0 :         subscribe_socket->total_tags--;
     760             :       } else {
     761           0 :         log_warn("subsocket_on_send_completed: send_subscription_message returned %d", ret);
     762           0 :         break;
     763             :       }
     764             :     }
     765        2236 : }
     766             : 
     767             : 
     768             : /**
     769             :  * Initialize an unbuffered subscribe socket
     770             :  *
     771             :  * @param socket: An unbuffered subscribe socket
     772             :  * @param ctx: A netio context
     773             :  * @param hostname: A local hostname or IP to bind to
     774             :  * @param remote_host: Hostname or IP of the remote publish socket
     775             :  * @param remote_port: Port of the remote publish socket
     776             :  * @param count: Size of the buffer array
     777             :  */
     778         164 : void netio_unbuffered_subscribe_socket_init(struct netio_unbuffered_subscribe_socket* socket,
     779             :                                             struct netio_context* ctx,
     780             :                                             const char* hostname,
     781             :                                             const char* remote_host,
     782             :                                             unsigned remote_port,
     783             :                                             size_t buffer_size,
     784             :                                             size_t count)
     785             : {
     786         164 :         memset(socket, 0, sizeof(*socket));
     787         164 :         socket->ctx = ctx;
     788         164 :         socket->state = NONE;
     789         164 :   struct netio_unbuffered_socket_attr attr = {count, buffer_size};
     790         164 :         netio_init_listen_socket(&socket->recv_socket, ctx, &attr);
     791         164 :         socket->recv_socket.usr = socket;
     792         164 :         socket->recv_socket.cb_msg_received = subscribe_socket_on_msg_received;
     793         164 :         socket->recv_socket.cb_connection_established = subscribe_socket_on_connection_established;
     794         164 :   socket->recv_socket.cb_connection_closed = subscribe_socket_on_connection_closed;
     795         164 :         netio_listen(&socket->recv_socket, hostname, 0);
     796             : 
     797         164 :         socket->remote_hostname = strdup(remote_host);
     798         164 :         socket->remote_port = remote_port;
     799             : 
     800         164 :         log_dbg("subscribe socket is listening");
     801             : 
     802         164 :         socket->total_tags = 0;
     803         164 : }
     804             : 
     805             : 
     806             : 
     807             : /**
     808             :  * Initializes an unbuffered send socket.
     809             :  *
     810             :  * @param socket: The socket to initialize
     811             :  * @param ctx: The NetIO context object in which to initialize the socket
     812             :  */
     813             : void
     814         282 : netio_unbuffered_send_socket_init(struct netio_send_socket* socket, struct netio_context* ctx)
     815             : {
     816         282 :   netio_init_send_socket(socket, ctx);
     817         282 :   socket->cb_internal_connection_closed = on_unbuffered_send_connection_closed;
     818         282 : }
     819             : 
     820             : 
     821             : 
     822             : /**
     823             :  * Subscribe an unbuffered subscribe socket to a given tag.
     824             :  *
     825             :  * @param socket: An unbuffered subscribe socket
     826             :  * @param tag: A netio tag
     827             :  */
     828             : int
     829        1184 : netio_unbuffered_subscribe(struct netio_unbuffered_subscribe_socket* socket, netio_tag_t tag)
     830             : {
     831        1184 :         if(socket->state == NONE) {
     832         164 :     log_dbg("Creating and connecting a new socket");
     833         164 :                 netio_init_send_socket(&socket->socket, socket->ctx);
     834         164 :                 socket->socket.usr = socket;
     835         164 :                 socket->socket.cb_connection_established = subsocket_on_connection_established;
     836         164 :     socket->socket.cb_connection_closed = subsocket_on_connection_closed;
     837         164 :                 socket->socket.cb_send_completed = subsocket_on_send_completed;
     838         164 :     socket->socket.cb_error_connection_refused = subsocket_on_error_connection_refused;
     839         164 :     socket->socket.cb_internal_connection_closed = on_unbuffered_send_connection_closed;
     840         164 :                 netio_connect(&socket->socket, socket->remote_hostname, socket->remote_port);
     841         164 :                 socket->state = INITIALIZED;
     842             :         }
     843             : 
     844        6944 :   for(unsigned i=0; i<socket->total_tags; i++) {
     845        5760 :     if(socket->tags_to_subscribe[i] == tag) {
     846             :       return 0;
     847             :     }
     848             :   }
     849             : 
     850             :   //if send socket connected send message
     851             :   //otherwise on_connection_established will do it
     852        1184 :   if (socket->socket.state){
     853         696 :     log_info("Sending subscription message for tag 0x%lx", tag);
     854         696 :     int ret = send_subscription_message(socket, tag, NETIO_SUBSCRIBE);
     855         696 :     return ret;
     856             :   } else {
     857         488 :     log_info("Queing subscription message for txg 0x%lx", tag);
     858         488 :     socket->tags_to_subscribe[socket->total_tags] = tag;
     859         488 :     socket->total_tags++;
     860         488 :     return 0;
     861             :   }
     862             : 
     863             :   return 0;
     864             : }
     865             : 
     866             : 
     867             : static int
     868           0 : remove_tag_to_subscribe(struct netio_unbuffered_subscribe_socket* socket, netio_tag_t tag)
     869             : {
     870           0 :   int found = 0;
     871           0 :   for(unsigned int i=0; i<socket->total_tags; ++i){
     872           0 :     if(socket->tags_to_subscribe[i] == tag){
     873           0 :       log_info("Removing tag 0x%lx from tags_to_subscribe of socket %p", socket);
     874           0 :       for(unsigned int j = i; j < socket->total_tags-1; ++j){
     875           0 :         socket->tags_to_subscribe[j] = socket->tags_to_subscribe[j+1];
     876             :       }
     877           0 :       found = 1;
     878           0 :       socket->total_tags--;
     879           0 :       break;
     880             :     }
     881             :   }
     882           0 :   if(found == 0){
     883           0 :     log_warn("tag 0x%lx expected in tags_to_subscribe of socket %p not found", tag);
     884             :   }
     885           0 :   return NETIO_STATUS_OK;
     886             : }
     887             : 
     888             : 
     889             : /**
     890             :  * Unsubscribe from a given message tag.
     891             :  *
     892             :  * For a given subscribe socket, `netio_unsubscribe` can be called multiple times.
     893             :  *
     894             :  * @param socket: The unbuffered subscribe socket.
     895             :  * @param tag: The tag to unsubscribe from.
     896             :  */
     897             : int
     898        1109 : netio_unbuffered_unsubscribe(struct netio_unbuffered_subscribe_socket* socket, netio_tag_t tag)
     899             : {
     900        1109 :   int ret = NETIO_STATUS_OK;
     901        1109 :   if(socket->state == INITIALIZED) {
     902        1109 :     log_dbg("Subscribe socket initialised, can proceed with usubscription");
     903        1109 :     if (socket->socket.state) {
     904        1109 :       ret = send_subscription_message(socket, tag, NETIO_UNSUBSCRIBE);
     905        1109 :       log_dbg("Unsubscription message for tag 0x%lx sent, return code %d", tag, ret);
     906             :     } else {
     907           0 :       ret = remove_tag_to_subscribe(socket, tag);
     908             :     }
     909             :   } else {
     910        1109 :     log_dbg("The connection has been already closed.");
     911             :   }
     912        1109 :   return ret;
     913             : }

Generated by: LCOV version 1.0