LCOV - code coverage report
Current view: top level - netio-next/src - buffered.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 176 196 89.8 %
Date: 2025-06-10 03:23:28 Functions: 16 16 100.0 %

          Line data    Source code
       1             : #include <string.h>
       2             : #include <stdio.h>
       3             : #include <unistd.h>
       4             : #include "log.h"
       5             : #include "connection_event.h"
       6             : #include "netio/netio.h"
       7             : 
       8             : #if defined DEBUG || defined DEBUG_BUF
       9             : #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
      10             : #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
      11             : #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
      12             : #else
      13             : #define log_dbg(...)
      14             : #define log_trc(...)
      15             : #endif
      16             : 
      17             : #define FATAL(msg, c) \
      18             : do { \
      19             :         log_fatal("%s %d: %s", msg, c, fi_strerror(-c)); \
      20             :         exit(2); \
      21             : } while(0);
      22             : 
      23             : 
      24             : /* This type is used as a length-marker in buffers for encoded messages.
      25             :    Encoding of buffers is as follows: LEN0|MSG0|LEN1|MSG1|...|LEN_N|MSG_N */
      26             : typedef uint32_t msg_size_t;
      27             : 
      28             : 
      29             : // STATIC FUNCTIONS ////////////////////////////////////////////////////////////
      30             : 
      31             : 
      32             : static int
      33     1087084 : flush(struct netio_buffered_send_socket* socket)
      34             : {
      35     1087084 :   int ret = NETIO_STATUS_OK; 
      36     1087084 :   if(socket->current_buffer)
      37             :   {
      38     1087082 :     socket->current_buffer->size = socket->pos;
      39     1087082 :     int send_status = netio_send_buffer(&socket->send_socket, socket->current_buffer);
      40     1087082 :     if (send_status == NETIO_STATUS_AGAIN){
      41           0 :       socket->busy = 1;
      42           0 :       log_dbg("netio_send_buffer returned %d, trying again", ret);
      43           0 :       ret = NETIO_STATUS_AGAIN;
      44             :     } else {
      45     1087082 :       socket->busy = 0;
      46     1087082 :       socket->current_buffer = NULL;
      47     1087082 :       if(socket->timeout_ms != 0){
      48      286437 :         netio_timer_stop(&socket->flush_timer);
      49             :       }
      50             :     }
      51             :   } else { //there is no current buffer. disable busy if on
      52           2 :     socket->busy = 0;
      53             :   }
      54     1087084 :   return ret;
      55             : }
      56             : 
      57             : 
      58             : static void
      59          82 : flush_cb(void* ptr)
      60             : {
      61          82 :   struct netio_buffered_send_socket* socket = (struct netio_buffered_send_socket*)ptr;
      62          82 :   flush(socket);
      63          82 : }
      64             : 
      65             : 
      66             : static void
      67     1084072 : on_send_completed(struct netio_send_socket* socket, uint64_t key)
      68             : {
      69     1084072 :         struct netio_buffer* buf = (struct netio_buffer*)key;
      70     1084072 :   struct netio_buffered_send_socket* bs = (struct netio_buffered_send_socket*)(socket->usr);
      71     1084072 :   if(netio_bufferstack_push(&bs->buffers, buf)) {
      72           0 :     log_fatal("The buffer stack exceeded its limits.");
      73           0 :     exit(1);
      74             :   }
      75     1084072 :         if(bs->buffers.available_buffers == 1) {
      76      214525 :                 netio_signal_fire(&bs->signal_buffer_available);
      77             :         }
      78     1084072 : }
      79             : 
      80             : static void
      81         328 : on_connect(struct netio_send_socket* socket)
      82             : {
      83         328 :   struct netio_buffered_send_socket* bs = (struct netio_buffered_send_socket*)(socket->usr);
      84         328 :   netio_bufferstack_send_init(&bs->buffers, socket, bs->num_pages, bs->buffersize, 0);
      85         328 :   if(bs->cb_connection_established) {
      86         328 :     bs->cb_connection_established(bs);
      87             :   }
      88         328 : }
      89             : 
      90             : static void
      91         310 : on_buf_send_socket_connection_closed(struct netio_send_socket* ssocket)
      92             : {
      93         310 :     log_dbg("on_buf_send_socket_connection_closed callback");
      94         310 :     struct netio_buffered_send_socket* bs = (struct netio_buffered_send_socket*)(ssocket->usr);
      95         310 :     log_dbg("removing buffer available signal fd %d from evloop %d", bs->signal_buffer_available.ev_ctx.fd, bs->signal_buffer_available.epollfd);
      96         310 :     netio_signal_close(&ssocket->ctx->evloop, &bs->signal_buffer_available);
      97         310 :     if(bs->timeout_ms != 0){
      98         304 :       log_dbg("removing flush timer fd %d from evloop %d",  bs->flush_timer.ev_ctx.fd, bs->signal_buffer_available.epollfd);
      99         304 :       netio_timer_close(&ssocket->ctx->evloop, &bs->flush_timer);
     100             :     }
     101         310 :     bs->current_buffer = NULL;
     102         310 :     handle_send_socket_shutdown(ssocket);
     103         310 :     netio_bufferstack_close(&bs->buffers, bs->num_pages);
     104             : 
     105         310 :     if(bs->pub_socket) { //only remove when send socket is part of a publish socket
     106          39 :       if(bs->cb_connection_closed) {
     107          39 :         bs->cb_connection_closed(bs);
     108             :       }
     109          39 :       struct netio_publish_socket* psocket = (struct netio_publish_socket*)bs->pub_socket;
     110          39 :       remove_socket(&(psocket->subscription_table.socket_list), (void*)bs);
     111             :     } else {
     112         271 :       if(bs->cb_connection_closed) {
     113         271 :         bs->cb_connection_closed(bs);
     114             :       }
     115             :     }
     116         310 : }
     117             : 
     118             : static void
     119          70 : on_error_connection_refused(struct netio_send_socket* socket) {
     120          70 :   struct netio_buffered_send_socket* bs = (struct netio_buffered_send_socket*)(socket->usr);
     121          70 :   netio_signal_close(&(socket->ctx->evloop), &bs->signal_buffer_available);
     122          70 :   if(bs->timeout_ms != 0){
     123          64 :     netio_timer_close(&(socket->ctx->evloop), &bs->flush_timer);
     124             :   }
     125          70 :   if(bs->cb_error_connection_refused) {
     126          70 :     bs->cb_error_connection_refused(bs);
     127             :   }
     128          70 : }
     129             : 
     130             : static void
     131    17250790 : on_buffered_msg_received(struct netio_recv_socket* socket, struct netio_buffer* buf, void* data, size_t len)
     132             : {
     133    17250790 :     struct netio_buffered_recv_socket* bs = (struct netio_buffered_recv_socket*)socket->usr;
     134    17250790 :     if(bs->lsocket->cb_msg_received) {
     135             :       size_t pos = 0;
     136  3423792159 :       while(pos < len) {
     137  3406541450 :         msg_size_t* s = (msg_size_t*)((char *)buf->data + pos);
     138  3406541450 :         pos += sizeof(msg_size_t);
     139  3406541450 :         bs->lsocket->cb_msg_received(bs, (char *)buf->data + pos, *s);
     140  3406541369 :         pos += *s;
     141             :       }
     142             :     }
     143             : 
     144             :     //to study the L1ID pileup
     145    17250709 :     struct netio_subscribe_socket* ssocket = bs->lsocket->listen_socket.usr;
     146    17250709 :     if(ssocket && ssocket->cb_buf_received) {
     147         600 :       ssocket->cb_buf_received(ssocket, buf, len);
     148             :     }
     149             : 
     150    17250707 :     netio_post_recv(socket, buf);
     151    17250735 : }
     152             : 
     153             : 
     154             : 
     155             : // API FUNCTIONS ///////////////////////////////////////////////////////////////
     156             : 
     157             : /**
     158             :  * Initializes a buffered listen socket.
     159             :  *
     160             :  * @param socket: The socket to initialize
     161             :  * @param ctx: The NetIO context object in which to initialize the socket
     162             :  * @param attr: Buffer attributes of the socket. Attributes need to match on
     163             :  *              the sending and receiving side of a socket
     164             :  */
     165             : void
     166         311 : netio_buffered_listen_socket_init(struct netio_buffered_listen_socket* socket,
     167             :                                   struct netio_context* ctx,
     168             :                                   struct netio_buffered_socket_attr* attr)
     169             : {
     170         311 :           memset(socket, 0, sizeof(*socket));
     171         311 :     if (attr->num_pages > NETIO_DOMAIN_MAX_MR){
     172           0 :       log_error("Number of recv pages requested %lu exceeds max value. Using %lu.", attr->num_pages, NETIO_DOMAIN_MAX_MR);
     173           0 :       attr->num_pages = NETIO_DOMAIN_MAX_MR;
     174             :     } 
     175         311 :     socket->pagesize = attr->pagesize;
     176         311 :     socket->num_pages = attr->num_pages;
     177         311 :     netio_init_listen_socket(&socket->listen_socket, ctx, NULL);
     178         311 : }
     179             : 
     180             : 
     181             : /**
     182             :  * Bind the listen socket to an interface and port number and bring the listen socket to 'listening' state.
     183             :  *
     184             :  * @param socket: The buffered listen socket
     185             :  * @param hostname: A hostname, typically an IP address, which identifies the interface on which to bind
     186             :  * @param port: The port name to listen on
     187             :  */
     188             : void
     189         311 : netio_buffered_listen(struct netio_buffered_listen_socket* socket, const char* hostname, unsigned port)
     190             : {
     191         311 :   int ret;
     192         311 :   struct fi_info* hints;
     193         311 :   struct fi_eq_attr eq_attr;
     194         311 :   eq_attr.wait_obj = FI_WAIT_FD;
     195             : 
     196         311 :   hints = fi_allocinfo();
     197         311 :   hints->ep_attr->type  = FI_EP_MSG;
     198         311 :   hints->caps = FI_MSG;
     199         311 :   hints->mode = FI_LOCAL_MR;
     200         311 :   char port_addr[32];
     201         311 :   snprintf(port_addr, 32, "%u", port);
     202             : 
     203         311 :   log_dbg("listening on %s:%s", hostname, port_addr);
     204             : 
     205         311 :   if((ret = fi_getinfo(FI_VERSION(1, 1), hostname, port_addr, FI_SOURCE, hints,
     206             :                        &socket->listen_socket.fi)))
     207             :   {
     208           0 :       FATAL("Buf-listen socket, fail to get interface info, error ", ret);
     209             :   }
     210             :   // printf("%s\n", fi_tostr(socket->listen_socket.fi, FI_TYPE_INFO));
     211             : 
     212         311 :   if((ret = fi_fabric(socket->listen_socket.fi->fabric_attr, &socket->listen_socket.fabric, NULL)))
     213             :   {
     214           0 :       FATAL("Buf-listen socket, cannot open fabric, error ", ret);
     215             :   }
     216             : 
     217         311 :   if((ret = fi_eq_open(socket->listen_socket.fabric, &eq_attr, &socket->listen_socket.eq, NULL)))
     218             :   {
     219           0 :       FATAL("Buf-listen socket, cannot open Event Queue, error ", ret);
     220             :   }
     221             : 
     222         311 :   if((ret = fi_passive_ep(socket->listen_socket.fabric, socket->listen_socket.fi, &socket->listen_socket.pep, NULL)))
     223             :   {
     224           0 :       FATAL("Buf-listen socket, cannot open passive endpoint, error ", ret);
     225             :   }
     226             : 
     227         311 :   if((ret = fi_pep_bind(socket->listen_socket.pep, &socket->listen_socket.eq->fid, 0)))
     228             :   {
     229           0 :       FATAL("Buf-listen socket, cannot bind passive endpoint, error ", ret);
     230             :   }
     231             : 
     232         311 :   if((ret = fi_listen(socket->listen_socket.pep)))
     233             :   {
     234           0 :       FATAL("Buf-listen socket, cannot enable, error ", ret);
     235             :   }
     236             : 
     237         311 :   if((ret = fi_control(&socket->listen_socket.eq->fid, FI_GETWAIT, &socket->listen_socket.eqfd)))
     238             :   {
     239           0 :       FATAL("Buf-listen socket failed to obtain Event Queue wait object", ret);
     240             :   }
     241             : 
     242         311 :   socket->listen_socket.eq_ev_ctx.fd = socket->listen_socket.eqfd;
     243         311 :   socket->listen_socket.eq_ev_ctx.data = socket;
     244         311 :   socket->listen_socket.eq_ev_ctx.cb = on_buffered_listen_socket_cm_event;
     245         311 :   struct netio_eventloop* eloop = &socket->listen_socket.ctx->evloop;
     246         311 :   netio_register_read_fd(eloop, &socket->listen_socket.eq_ev_ctx);
     247         311 :   add_open_fd(&eloop->openfds, socket->listen_socket.eqfd, NETIO_EQ, BLISTEN, socket);
     248         311 :   log_dbg("netio_buffered_listen_socket: registering EQ fd %d", socket->listen_socket.eqfd);
     249         311 :   fi_freeinfo(hints);
     250         311 : }
     251             : 
     252             : /**
     253             :  * Initializes a buffered send socket.
     254             :  *
     255             :  * @param socket: The socket to initialize
     256             :  * @param ctx: The NetIO context object in which to initialize the socket
     257             :  * @param attr: Buffer attributes of the socket. Attributes need to match on
     258             :  *              the sending and receiving side of a socket
     259             :  */
     260             : void
     261         398 : netio_buffered_send_socket_init(struct netio_buffered_send_socket* socket, struct netio_context* ctx, struct netio_buffered_socket_attr* attr)
     262             : {
     263         398 :         memset(socket, 0, sizeof(*socket));
     264         398 :   netio_init_send_socket(&socket->send_socket, ctx);
     265         398 :   socket->send_socket.cq_size = NETIO_MAX_CQ_EVENTS_BUFFERED;
     266         398 :         socket->send_socket.usr = socket;
     267         398 :   socket->send_socket.cb_send_completed = on_send_completed;
     268         398 :   socket->send_socket.cb_connection_established = on_connect;
     269         398 :   socket->send_socket.cb_connection_closed = on_buf_send_socket_connection_closed;
     270         398 :   socket->send_socket.cb_error_connection_refused = on_error_connection_refused;
     271         398 :   socket->current_buffer = NULL;
     272         398 :   socket->pub_socket = NULL;
     273         398 :   socket->pos = 0;
     274         398 :   socket->busy = 0;
     275         398 :   socket->watermark = attr->watermark;
     276         398 :   if (attr->num_pages > NETIO_DOMAIN_MAX_MR){
     277           0 :     log_error("Number of send buffers requested %lu exceeds max value. Using %lu.", attr->num_pages, NETIO_DOMAIN_MAX_MR);
     278           0 :     attr->num_pages = NETIO_DOMAIN_MAX_MR;
     279             :   }
     280         398 :   socket->num_pages = attr->num_pages;
     281         398 :   socket->buffersize = attr->pagesize;
     282         398 :         netio_signal_init(&ctx->evloop, &socket->signal_buffer_available);
     283         398 :         socket->signal_buffer_available.cb = NULL; //deactivated by default
     284         398 :   socket->timeout_ms = attr->timeout_ms;
     285         398 :   if(attr->timeout_ms != 0){
     286         382 :     netio_timer_init(&ctx->evloop, &socket->flush_timer);
     287         382 :     socket->flush_timer.cb = flush_cb;
     288         382 :     socket->flush_timer.data = socket;
     289             :   } else {
     290          16 :     socket->flush_timer.cb = NULL;
     291             :   }
     292         398 : }
     293             : 
     294             : 
     295             : /**
     296             :  * Connect a buffered send socket to a remote.
     297             :  *
     298             :  * @param socket: The buffered send socket
     299             :  * @param hostname: Hostname or IP address of the remote endpoint
     300             :  * @param port: Port number of the remote endpoint
     301             :  */
     302             : void
     303         355 : netio_buffered_connect(struct netio_buffered_send_socket* socket, const char* hostname, unsigned port)
     304             : {
     305         355 :   netio_connect(&socket->send_socket, hostname, port);
     306         355 : }
     307             : 
     308             : 
     309             : void
     310          43 : netio_buffered_connect_rawaddr(struct netio_buffered_send_socket* socket, void* addr, size_t addrlen)
     311             : {
     312          43 :         netio_connect_rawaddr(&socket->send_socket, addr, addrlen);
     313          43 : }
     314             : 
     315             : 
     316             : /**
     317             :  * Send a message on a buffered connection.
     318             :  *
     319             :  * @param socket: The buffered send socket
     320             :  * @param data: Pointer to message
     321             :  * @param size: Size of the message
     322             :  *
     323             :  * @return
     324             :  *         - `NETIO_STATUS_TOO_BIG` The message is too big to fit in the internal buffers.
     325             :  *           Increase `pagesize` in the buffer attributes.
     326             :  *         - `NETIO_STATUS_AGAIN` Socket is busy, no buffers are available. Try again later
     327             :  *         - `NETIO_STATUS_OK` Message was successfully copied to internal buffers */
     328             : int
     329         462 : netio_buffered_send(struct netio_buffered_send_socket* socket, void* data, size_t size)
     330             : {
     331         462 :         struct iovec iov;
     332         462 :         iov.iov_base = data;
     333         462 :         iov.iov_len = size;
     334         462 :         return netio_buffered_sendv(socket, &iov, 1);
     335             : }
     336             : 
     337             : 
     338             : /**
     339             :  * Send a message on a buffered connection.
     340             :  *
     341             :  * @param socket: The buffered send socket
     342             :  * @param iov: Pointer to a scatter/gather buffer
     343             :  * @param num: Number of elements in the scatter/gather buffer
     344             :  *
     345             :  * @return
     346             :  *         - `NETIO_STATUS_TOO_BIG` The message is too big to fit in the internal buffers.
     347             :  *           Increase `pagesize` in the buffer attributes.
     348             :  *         - `NETIO_STATUS_AGAIN` Socket is busy, no buffers are available. Try again later
     349             :  *         - `NETIO_STATUS_OK` Message was successfully copied to internal buffers
     350             :  */
     351             : int
     352    87570933 : netio_buffered_sendv(struct netio_buffered_send_socket* socket, struct iovec* iov, size_t num)
     353             : {
     354             : 
     355    87570933 :   if(socket->busy){
     356           0 :     int ret = flush(socket);
     357           0 :     if (ret == NETIO_STATUS_AGAIN){
     358             :       return NETIO_STATUS_AGAIN;
     359             :     }
     360             :   }
     361             : 
     362    87570933 :         size_t total_size = 0;
     363   262712337 :         for(unsigned int i=0; i<num; i++) {
     364   175141404 :                 total_size += iov[i].iov_len;
     365             :         }
     366             : 
     367             :   //if current message is larger than the whole buffer
     368    87570933 :         if(total_size+sizeof(msg_size_t) > socket->buffersize) {
     369             :                 return NETIO_STATUS_TOO_BIG;
     370             :         }
     371             : 
     372    87570933 :         if(socket->current_buffer == NULL) {
     373     1515764 :                 if(netio_bufferstack_pop(&socket->buffers, &socket->current_buffer)) {
     374             :                         return NETIO_STATUS_AGAIN;
     375             :                 }
     376     1087083 :                 socket->pos = 0;
     377             :     //Enable flush timer
     378     1087083 :     if(socket->timeout_ms != 0 ){
     379      286438 :       netio_timer_start_ms(&socket->flush_timer, socket->timeout_ms);
     380             :     }
     381             :         } else {
     382             :     //if current message is larger than remaining space
     383             :     //flush buffer and retry with a new one
     384    86055169 :     if(socket->pos+total_size+sizeof(msg_size_t) >= socket->buffersize){
     385           0 :       flush(socket);
     386           0 :       return NETIO_STATUS_AGAIN;
     387             :     }
     388             :   }
     389             : 
     390    87142252 :         *(msg_size_t*)((char*)socket->current_buffer->data + socket->pos) = total_size;
     391    87142252 :         socket->pos += sizeof(msg_size_t);
     392   261426294 :         for(unsigned int i=0; i<num; i++) {
     393   174284042 :                 memcpy((char *)socket->current_buffer->data + socket->pos, iov[i].iov_base, iov[i].iov_len);
     394   174284042 :                 socket->pos += iov[i].iov_len;
     395             :         }
     396             : 
     397    87142252 :         if(socket->pos > socket->watermark) {
     398      285895 :                 flush(socket);
     399             :         }
     400             :         return NETIO_STATUS_OK;
     401             : }
     402             : 
     403             : void
     404         252 : netio_buffered_recv_socket_init(struct netio_buffered_recv_socket* socket, struct netio_buffered_listen_socket* lsocket)
     405             : {
     406         252 :           memset(socket, 0, sizeof(*socket));
     407         252 :     socket->lsocket = lsocket;
     408         252 :     netio_init_recv_socket(&socket->recv_socket, &socket->lsocket->listen_socket);
     409         252 :     socket->recv_socket.cq_size = NETIO_MAX_CQ_EVENTS_BUFFERED;
     410         252 :     socket->recv_socket.usr = socket;
     411             : 
     412         252 :     socket->num_pages = socket->lsocket->num_pages;
     413         252 :     socket->pages = (struct netio_buffer*)malloc(socket->num_pages * sizeof(struct netio_buffer));
     414       71082 :     for(unsigned int i=0; i<socket->num_pages; i++) {
     415       70830 :       socket->pages[i].data = malloc(socket->lsocket->pagesize);
     416       70830 :       socket->pages[i].size = socket->lsocket->pagesize;
     417             :     }
     418         252 :     socket->recv_socket.lsocket->cb_msg_received = on_buffered_msg_received;
     419         252 : }
     420             : 
     421             : 
     422             : /**
     423             :  * Flushes the current buffer of the given buffered send socket.
     424             :  *
     425             :  * @param socket The buffered send socket
     426             :  */
     427             : void
     428      801107 : netio_buffered_flush(struct netio_buffered_send_socket* socket)
     429             : {
     430      801107 :                 flush(socket);
     431      801107 : }

Generated by: LCOV version 1.0