LCOV - code coverage report
Current view: top level - netio-next/src - buffered.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 245 264 92.8 %
Date: 2025-08-12 04:15:35 Functions: 22 22 100.0 %

          Line data    Source code
       1             : #include <string.h>
       2             : #include <stdio.h>
       3             : #include <unistd.h>
       4             : #include "log.h"
       5             : #include "connection_event.h"
       6             : #include "netio/netio.h"
       7             : #include "netio/netio_tcp.h"
       8             : 
       9             : #if defined DEBUG || defined DEBUG_BUF
      10             : #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
      11             : #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
      12             : #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
      13             : #else
      14             : #define log_dbg(...)
      15             : #define log_trc(...)
      16             : #endif
      17             : 
      18             : #define FATAL(msg, c) \
      19             : do { \
      20             :         log_fatal("%s %d: %s", msg, c, fi_strerror(-c)); \
      21             :         exit(2); \
      22             : } while(0);
      23             : 
      24             : 
      25             : /* This type is used as a length-marker in buffers for encoded messages.
      26             :    Encoding of buffers is as follows: LEN0|MSG0|LEN1|MSG1|...|LEN_N|MSG_N */
      27             : typedef uint32_t msg_size_t;
      28             : 
      29             : 
      30             : // STATIC FUNCTIONS ////////////////////////////////////////////////////////////
      31             : 
      32             : 
      33             : static int
      34     1120641 : flush(struct netio_buffered_send_socket* socket)
      35             : {
      36     1120641 :   int ret = NETIO_STATUS_OK;
      37     1120641 :   if(socket->current_buffer)
      38             :   {
      39     1116749 :     socket->current_buffer->size = socket->pos;
      40     1116749 :     int send_status = netio_send_buffer(&socket->send_socket, socket->current_buffer);
      41     1116748 :     if (send_status == NETIO_STATUS_AGAIN){
      42           0 :       socket->busy = 1;
      43           0 :       log_dbg("netio_send_buffer returned %d, trying again", ret);
      44           0 :       ret = NETIO_STATUS_AGAIN;
      45             :     } else {
      46     1116748 :       socket->busy = 0;
      47     1116748 :       socket->current_buffer = NULL;
      48     1116748 :       if(socket->timeout_ms != 0){
      49       65999 :         netio_timer_stop(&socket->flush_timer);
      50             :       }
      51             :     }
      52             :   } else { //there is no current buffer. disable busy if on
      53        3892 :     socket->busy = 0;
      54             :   }
      55     1120640 :   return ret;
      56             : }
      57             : 
      58             : 
      59             : static void
      60          32 : flush_cb(void* ptr)
      61             : {
      62          32 :   struct netio_buffered_send_socket* socket = (struct netio_buffered_send_socket*)ptr;
      63          32 :   flush(socket);
      64          32 : }
      65             : 
      66             : 
      67             : static void
      68     1116103 : on_send_completed(struct netio_send_socket* socket, uint64_t key)
      69             : {
      70     1116103 :         struct netio_buffer* buf = (struct netio_buffer*)key;
      71     1116103 :   struct netio_buffered_send_socket* bs = (struct netio_buffered_send_socket*)(socket->usr);
      72     1116103 :   if(netio_bufferstack_push(&bs->buffers, buf)) {
      73           0 :     log_fatal("The buffer stack exceeded its limits.");
      74           0 :     exit(1);
      75             :   }
      76     1116103 :         if(bs->buffers.available_buffers == 1) {
      77      212274 :                 netio_signal_fire(&bs->signal_buffer_available);
      78             :         }
      79     1116103 : }
      80             : 
      81             : static void
      82         171 : on_connect(struct netio_send_socket* socket)
      83             : {
      84         171 :   struct netio_buffered_send_socket* bs = (struct netio_buffered_send_socket*)(socket->usr);
      85         171 :   netio_bufferstack_send_init(&bs->buffers, socket, bs->num_pages, bs->buffersize, 0);
      86         171 :   if(bs->cb_connection_established) {
      87         171 :     bs->cb_connection_established(bs);
      88             :   }
      89         171 : }
      90             : 
      91             : static void
      92         167 : on_buf_send_socket_connection_closed(struct netio_send_socket* ssocket)
      93             : {
      94         167 :     log_dbg("on_buf_send_socket_connection_closed callback");
      95         167 :     struct netio_buffered_send_socket* bs = (struct netio_buffered_send_socket*)(ssocket->usr);
      96         167 :     log_dbg("removing buffer available signal fd %d from evloop %d", bs->signal_buffer_available.ev_ctx.fd, bs->signal_buffer_available.epollfd);
      97         167 :     netio_signal_close(&ssocket->ctx->evloop, &bs->signal_buffer_available);
      98         167 :     if(bs->timeout_ms != 0){
      99         162 :       log_dbg("removing flush timer fd %d from evloop %d",  bs->flush_timer.ev_ctx.fd, bs->signal_buffer_available.epollfd);
     100         162 :       netio_timer_close(&ssocket->ctx->evloop, &bs->flush_timer);
     101             :     }
     102         167 :     bs->current_buffer = NULL;
     103         167 :     if(ssocket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
     104         159 :       handle_send_socket_shutdown(ssocket);
     105           8 :     } else if (ssocket->tcp_fi_mode == NETIO_MODE_TCP){
     106           8 :       handle_tcp_send_socket_shutdown(ssocket);
     107             :     }
     108             : 
     109         167 :     netio_bufferstack_close(&bs->buffers, bs->num_pages);
     110             : 
     111             :     // This is pubsocket_on_connection_closed in pubsub.c
     112             :     // pubsocket_on_connection_closed will call table_remove_subscription.
     113             :     // table_remove_subscription can call netio_disconnect that sends a shutdown
     114             :     // For RDMA shutdown goes via CM, for TCP/IP it requires the FD.
     115         167 :     if(bs->pub_socket) { //only remove when send socket is part of a publish socket
     116          23 :       if(bs->cb_connection_closed) {
     117          23 :         bs->cb_connection_closed(bs);
     118             :       }
     119          23 :       struct netio_publish_socket* psocket = (struct netio_publish_socket*)bs->pub_socket;
     120          23 :       remove_socket(&(psocket->subscription_table.socket_list), (void*)bs);
     121             :     } else {
     122         144 :       if(bs->cb_connection_closed) {
     123         144 :         bs->cb_connection_closed(bs);
     124             :       }
     125             :     }
     126         167 : }
     127             : 
     128             : static void
     129          26 : on_error_connection_refused(struct netio_send_socket* socket) {
     130          26 :   struct netio_buffered_send_socket* bs = (struct netio_buffered_send_socket*)(socket->usr);
     131          26 :   netio_signal_close(&(socket->ctx->evloop), &bs->signal_buffer_available);
     132          26 :   if(bs->timeout_ms != 0){
     133          23 :     netio_timer_close(&(socket->ctx->evloop), &bs->flush_timer);
     134             :   }
     135          26 :   if(bs->cb_error_connection_refused) {
     136          26 :     bs->cb_error_connection_refused(bs);
     137             :   }
     138          26 : }
     139             : 
     140             : static void
     141     7327586 : on_buffered_msg_received(struct netio_recv_socket* socket, struct netio_buffer* buf, void* data, size_t len)
     142             : {
     143     7327586 :     struct netio_buffered_recv_socket* bs = (struct netio_buffered_recv_socket*)socket->usr;
     144     7327586 :     if(bs->lsocket->cb_msg_received) {
     145             :       size_t pos = 0;
     146  1670207131 :       while(pos < len) {
     147  1662879745 :         msg_size_t* s = (msg_size_t*)((char *)buf->data + pos);
     148  1662879745 :         pos += sizeof(msg_size_t);
     149  1662879745 :         bs->lsocket->cb_msg_received(bs, (char *)buf->data + pos, *s);
     150  1662879545 :         pos += *s;
     151             :       }
     152             :     }
     153             : 
     154             :     //to study the L1ID pileup
     155     7327386 :     struct netio_subscribe_socket* ssocket = bs->lsocket->listen_socket.usr;
     156     7327386 :     if(ssocket && ssocket->cb_buf_received) {
     157         600 :       ssocket->cb_buf_received(ssocket, buf, len);
     158             :     }
     159             : 
     160     7327384 :     netio_post_recv(socket, buf);
     161     7327574 : }
     162             : 
     163             : 
     164             : 
     165             : // API FUNCTIONS ///////////////////////////////////////////////////////////////
     166             : 
     167             : /**
     168             :  * Initializes a buffered listen socket.
     169             :  *
     170             :  * @param socket: The socket to initialize
     171             :  * @param ctx: The NetIO context object in which to initialize the socket
     172             :  * @param attr: Buffer attributes of the socket. Attributes need to match on
     173             :  *              the sending and receiving side of a socket
     174             :  */
     175             : void
     176         137 : netio_buffered_listen_socket_init(struct netio_buffered_listen_socket* socket,
     177             :                                   struct netio_context* ctx,
     178             :                                   struct netio_buffered_socket_attr* attr)
     179             : {
     180         137 :           memset(socket, 0, sizeof(*socket));
     181         137 :     if (attr->num_pages > NETIO_DOMAIN_MAX_MR){
     182           0 :       log_error("Number of recv pages requested %lu exceeds max value. Using %lu.", attr->num_pages, NETIO_DOMAIN_MAX_MR);
     183           0 :       attr->num_pages = NETIO_DOMAIN_MAX_MR;
     184             :     }
     185         137 :     socket->pagesize = attr->pagesize;
     186         137 :     socket->num_pages = attr->num_pages;
     187         137 :     netio_init_listen_socket(&socket->listen_socket, ctx, NULL);
     188         137 : }
     189             : 
     190             : 
     191             : /* Same as above except _tcp in netio_init_listen_socket */
     192             : void
     193          58 : netio_buffered_listen_tcp_socket_init(struct netio_buffered_listen_socket* socket,
     194             :                                   struct netio_context* ctx,
     195             :                                   struct netio_buffered_socket_attr* attr)
     196             : {
     197          58 :           memset(socket, 0, sizeof(*socket));
     198          58 :     socket->pagesize = attr->pagesize;
     199          58 :     socket->num_pages = attr->num_pages;
     200          58 :     netio_init_listen_tcp_socket(&socket->listen_socket, ctx, NULL);
     201          58 : }
     202             : 
     203             : 
     204             : /**
     205             :  * Bind the listen socket to an interface and port number and bring the listen socket to 'listening' state.
     206             :  *
     207             :  * @param socket: The buffered listen socket
     208             :  * @param hostname: A hostname, typically an IP address, which identifies the interface on which to bind
     209             :  * @param port: The port name to listen on
     210             :  */
     211             : void
     212         137 : netio_buffered_listen(struct netio_buffered_listen_socket* socket, const char* hostname, unsigned port)
     213             : {
     214         137 :   log_info("netio_buffered_listen %s", hostname);
     215         137 :   int ret;
     216         137 :   struct fi_info* hints;
     217         137 :   struct fi_eq_attr eq_attr = {10000, 0, FI_WAIT_FD, 0, 0};
     218             : 
     219         137 :   hints = fi_allocinfo();
     220         137 :   hints->ep_attr->type  = FI_EP_MSG;
     221         137 :   hints->caps = FI_MSG;
     222         137 :   hints->mode = FI_LOCAL_MR;
     223         137 :   char port_addr[32];
     224         137 :   snprintf(port_addr, 32, "%u", port);
     225             : 
     226         137 :   log_dbg("listening (libfabric) on %s:%s", hostname, port_addr);
     227             : 
     228         137 :   if((ret = fi_getinfo(FI_VERSION(1, 1), hostname, port_addr, FI_SOURCE, hints,
     229             :                        &socket->listen_socket.fi)))
     230             :   {
     231           0 :       FATAL("Buf-listen socket, fail to get interface info, error ", ret);
     232             :   }
     233             :   // printf("%s\n", fi_tostr(socket->listen_socket.fi, FI_TYPE_INFO));
     234             : 
     235         137 :   if((ret = fi_fabric(socket->listen_socket.fi->fabric_attr, &socket->listen_socket.fabric, NULL)))
     236             :   {
     237           0 :       FATAL("Buf-listen socket, cannot open fabric, error ", ret);
     238             :   }
     239             : 
     240         137 :   if((ret = fi_eq_open(socket->listen_socket.fabric, &eq_attr, &socket->listen_socket.eq, NULL)))
     241             :   {
     242           0 :       FATAL("Buf-listen socket, cannot open Event Queue, error ", ret);
     243             :   }
     244             : 
     245         137 :   if((ret = fi_passive_ep(socket->listen_socket.fabric, socket->listen_socket.fi, &socket->listen_socket.pep, NULL)))
     246             :   {
     247           0 :       FATAL("Buf-listen socket, cannot open passive endpoint, error ", ret);
     248             :   }
     249             : 
     250         137 :   if((ret = fi_pep_bind(socket->listen_socket.pep, &socket->listen_socket.eq->fid, 0)))
     251             :   {
     252           0 :       FATAL("Buf-listen socket, cannot bind passive endpoint, error ", ret);
     253             :   }
     254             : 
     255         137 :   if((ret = fi_listen(socket->listen_socket.pep)))
     256             :   {
     257           0 :       FATAL("Buf-listen socket, cannot enable, error ", ret);
     258             :   }
     259             : 
     260         137 :   if((ret = fi_control(&socket->listen_socket.eq->fid, FI_GETWAIT, &socket->listen_socket.eqfd)))
     261             :   {
     262           0 :       FATAL("Buf-listen socket failed to obtain Event Queue wait object", ret);
     263             :   }
     264             : 
     265         137 :   socket->listen_socket.eq_ev_ctx.fd = socket->listen_socket.eqfd;
     266         137 :   socket->listen_socket.eq_ev_ctx.data = socket;
     267         137 :   socket->listen_socket.eq_ev_ctx.cb = on_buffered_listen_socket_libfabric_cm_event;
     268         137 :   struct netio_eventloop* eloop = &socket->listen_socket.ctx->evloop;
     269         137 :   netio_register_read_fd(eloop, &socket->listen_socket.eq_ev_ctx);
     270         137 :   add_open_fd(&eloop->openfds, socket->listen_socket.eqfd, NETIO_EQ, BLISTEN, socket);
     271         137 :   log_dbg("netio_buffered_listen_socket: registering EQ fd %d", socket->listen_socket.eqfd);
     272         137 :   fi_freeinfo(hints);
     273         137 : }
     274             : 
     275             : 
     276             : /* _tcp version of above. This time there are more differences */
     277             : void
     278          58 : netio_buffered_listen_tcp(struct netio_buffered_listen_socket* socket,
     279             :                           const char* hostname, unsigned port)
     280             : {
     281          58 :   log_info("Buffered TCP/IP listening on %s:%d", hostname, port);
     282             : 
     283          58 :   netio_listen_tcp(&socket->listen_socket, hostname, port);
     284             : 
     285          58 :   socket->listen_socket.eq_ev_ctx.cb = on_buffered_listen_socket_tcp_cm_event;
     286          58 :   netio_register_read_tcp_fd(&socket->listen_socket.ctx->evloop, &socket->listen_socket.eq_ev_ctx);
     287          58 : }
     288             : 
     289             : 
     290             : 
     291             : /**
     292             :  * Initializes a buffered send socket.
     293             :  *
     294             :  * @param socket: The socket to initialize
     295             :  * @param ctx: The NetIO context object in which to initialize the socket
     296             :  * @param attr: Buffer attributes of the socket. Attributes need to match on
     297             :  *              the sending and receiving side of a socket
     298             :  */
     299             : void
     300         188 : netio_buffered_send_socket_init(struct netio_buffered_send_socket* socket, struct netio_context* ctx, struct netio_buffered_socket_attr* attr)
     301             : {
     302         188 :         memset(socket, 0, sizeof(*socket));
     303         188 :   netio_init_send_socket(&socket->send_socket, ctx);
     304         188 :   socket->send_socket.cq_size = NETIO_MAX_CQ_EVENTS_BUFFERED;
     305         188 :         socket->send_socket.usr = socket;
     306         188 :   socket->send_socket.cb_send_completed = on_send_completed;
     307         188 :   socket->send_socket.cb_connection_established = on_connect;
     308         188 :   socket->send_socket.cb_connection_closed = on_buf_send_socket_connection_closed;
     309         188 :   socket->send_socket.cb_error_connection_refused = on_error_connection_refused;
     310         188 :   socket->current_buffer = NULL;
     311         188 :   socket->pub_socket = NULL;
     312         188 :   socket->pos = 0;
     313         188 :   socket->busy = 0;
     314         188 :   socket->watermark = attr->watermark;
     315         188 :   if (attr->num_pages > NETIO_DOMAIN_MAX_MR){
     316           0 :     log_error("Number of send buffers requested %lu exceeds max value. Using %lu.", attr->num_pages, NETIO_DOMAIN_MAX_MR);
     317           0 :     attr->num_pages = NETIO_DOMAIN_MAX_MR;
     318             :   }
     319         188 :   socket->num_pages = attr->num_pages;
     320         188 :   socket->buffersize = attr->pagesize;
     321         188 :   netio_signal_init(&ctx->evloop, &socket->signal_buffer_available);
     322         188 :   socket->signal_buffer_available.cb = NULL; //deactivated by default
     323         188 :   socket->timeout_ms = attr->timeout_ms;
     324         188 :   if(attr->timeout_ms != 0){
     325         178 :     netio_timer_init(&ctx->evloop, &socket->flush_timer);
     326         178 :     socket->flush_timer.cb = flush_cb;
     327         178 :     socket->flush_timer.data = socket;
     328             :   } else {
     329          10 :     socket->flush_timer.cb = NULL;
     330             :   }
     331         188 : }
     332             : 
     333             : /* Same as above except for the _tcp in netio_init_send_tcp_socket  */
     334             : /* If this works, consider factoring out common code                */
     335             : void
     336           9 : netio_buffered_send_tcp_socket_init(struct netio_buffered_send_socket* socket, struct netio_context* ctx, struct netio_buffered_socket_attr* attr)
     337             : {
     338           9 :         memset(socket, 0, sizeof(*socket));
     339           9 :   netio_init_send_tcp_socket(&socket->send_socket, ctx);
     340           9 :         socket->send_socket.usr = socket;
     341           9 :   socket->send_socket.cb_send_completed = on_send_completed;
     342           9 :   socket->send_socket.cb_connection_established = on_connect;
     343           9 :   socket->send_socket.cb_connection_closed = on_buf_send_socket_connection_closed;
     344           9 :   socket->send_socket.cb_error_connection_refused = on_error_connection_refused;
     345           9 :   socket->current_buffer = NULL;
     346           9 :   socket->pub_socket = NULL;
     347           9 :   socket->pos = 0;
     348           9 :   socket->busy = 0;
     349           9 :   socket->watermark = attr->watermark;
     350           9 :   socket->num_pages = attr->num_pages;
     351           9 :   socket->buffersize = attr->pagesize;
     352           9 :   netio_signal_init(&ctx->evloop, &socket->signal_buffer_available);
     353           9 :   socket->signal_buffer_available.cb = NULL; //deactivated by default
     354           9 :   socket->timeout_ms = attr->timeout_ms;
     355           9 :   if(attr->timeout_ms != 0){
     356           9 :     netio_timer_init(&ctx->evloop, &socket->flush_timer);
     357           9 :     socket->flush_timer.cb = flush_cb;
     358           9 :     socket->flush_timer.data = socket;
     359             :   } else {
     360           0 :     socket->flush_timer.cb = NULL;
     361             :   }
     362           9 : }
     363             : 
     364           4 : void netio_buffered_send_socket_init_and_connect(struct netio_buffered_send_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffered_socket_attr* attr) {
     365           4 :   int tcp = netio_tcp_mode(hostname);
     366           4 :   const char* host = netio_hostname(hostname);
     367           4 :   if (tcp) {
     368           2 :     netio_buffered_send_tcp_socket_init(socket, ctx, attr);
     369             :   } else {
     370           2 :     netio_buffered_send_socket_init(socket, ctx, attr);
     371             :   }
     372           4 :   netio_buffered_connect(socket, host, port);
     373           4 : }
     374             : 
     375           2 : void netio_buffered_listen_socket_init_and_listen(struct netio_buffered_listen_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffered_socket_attr* attr) {
     376           2 :   int tcp = netio_tcp_mode(hostname);
     377           2 :   const char* host = netio_hostname(hostname);
     378           2 :   if (tcp) {
     379           1 :     netio_buffered_listen_tcp_socket_init(socket, ctx, attr);
     380           1 :     netio_buffered_listen_tcp(socket, host, port);
     381             :   } else {
     382           1 :     netio_buffered_listen_socket_init(socket, ctx, attr);
     383           1 :     netio_buffered_listen(socket, host, port);
     384             :   }
     385           2 : }
     386             : 
     387             : /**
     388             :  * Connect a buffered send socket to a remote.
     389             :  *
     390             :  * @param socket: The buffered send socket
     391             :  * @param hostname: Hostname or IP address of the remote endpoint
     392             :  * @param port: Port number of the remote endpoint
     393             :  */
     394             : void
     395         173 : netio_buffered_connect(struct netio_buffered_send_socket* socket, const char* hostname, unsigned port)
     396             : {
     397         173 :   netio_connect(&socket->send_socket, netio_hostname(hostname), port);
     398         173 : }
     399             : 
     400             : 
     401             : void
     402          22 : netio_buffered_connect_rawaddr(struct netio_buffered_send_socket* socket, void* addr, size_t addrlen)
     403             : {
     404          22 :         netio_connect_rawaddr(&socket->send_socket, addr, addrlen);
     405          22 : }
     406             : 
     407             : 
     408             : /**
     409             :  * Send a message on a buffered connection.
     410             :  *
     411             :  * @param socket: The buffered send socket
     412             :  * @param data: Pointer to message
     413             :  * @param size: Size of the message
     414             :  *
     415             :  * @return
     416             :  *         - `NETIO_STATUS_TOO_BIG` The message is too big to fit in the internal buffers.
     417             :  *           Increase `pagesize` in the buffer attributes.
     418             :  *         - `NETIO_STATUS_AGAIN` Socket is busy, no buffers are available. Try again later
     419             :  *         - `NETIO_STATUS_OK` Message was successfully copied to internal buffers */
     420             : int
     421       16064 : netio_buffered_send(struct netio_buffered_send_socket* socket, void* data, size_t size)
     422             : {
     423       16064 :         struct iovec iov;
     424       16064 :         iov.iov_base = data;
     425       16064 :         iov.iov_len = size;
     426       16064 :         return netio_buffered_sendv(socket, &iov, 1);
     427             : }
     428             : 
     429             : 
     430             : /**
     431             :  * Send a message on a buffered connection.
     432             :  *
     433             :  * @param socket: The buffered send socket
     434             :  * @param iov: Pointer to a scatter/gather buffer
     435             :  * @param num: Number of elements in the scatter/gather buffer
     436             :  *
     437             :  * @return
     438             :  *         - `NETIO_STATUS_TOO_BIG` The message is too big to fit in the internal buffers.
     439             :  *           Increase `pagesize` in the buffer attributes.
     440             :  *         - `NETIO_STATUS_AGAIN` Socket is busy, no buffers are available. Try again later
     441             :  *         - `NETIO_STATUS_OK` Message was successfully copied to internal buffers
     442             :  */
     443             : int
     444    62633908 : netio_buffered_sendv(struct netio_buffered_send_socket* socket, struct iovec* iov, size_t num)
     445             : {
     446    62633908 :   if(socket->busy){
     447           0 :     int ret = flush(socket);
     448           0 :     if (ret == NETIO_STATUS_AGAIN){
     449             :       return NETIO_STATUS_AGAIN;
     450             :     }
     451             :   }
     452             : 
     453    62633908 :         size_t total_size = 0;
     454   251045586 :         for(unsigned int i=0; i<num; i++) {
     455   188411678 :                 total_size += iov[i].iov_len;
     456             :         }
     457             : 
     458             :   //if current message is larger than the whole buffer
     459    62633908 :         if(total_size+sizeof(msg_size_t) > socket->buffersize) {
     460             :                 return NETIO_STATUS_TOO_BIG;
     461             :         }
     462             : 
     463    62633908 :         if(socket->current_buffer == NULL) {
     464     1335603 :                 if(netio_bufferstack_pop(&socket->buffers, &socket->current_buffer)) {
     465             :                         return NETIO_STATUS_AGAIN;
     466             :                 }
     467     1116748 :                 socket->pos = 0;
     468             :     //Enable flush timer
     469     1116748 :     if(socket->timeout_ms != 0 ){
     470       65999 :       netio_timer_start_ms(&socket->flush_timer, socket->timeout_ms);
     471             :     }
     472             :         } else {
     473             :     //if current message is larger than remaining space
     474             :     //flush buffer and retry with a new one
     475    61298305 :     if(socket->pos+total_size+sizeof(msg_size_t) >= socket->buffersize){
     476        4000 :       flush(socket);
     477        4000 :       return NETIO_STATUS_AGAIN;
     478             :     }
     479             :   }
     480             : 
     481    62411053 :         *(msg_size_t*)((char*)socket->current_buffer->data + socket->pos) = total_size;
     482    62411053 :         socket->pos += sizeof(msg_size_t);
     483   250154252 :         for(unsigned int i=0; i<num; i++) {
     484   187743199 :                 memcpy((char *)socket->current_buffer->data + socket->pos, iov[i].iov_base, iov[i].iov_len);
     485   187743199 :                 socket->pos += iov[i].iov_len;
     486             :         }
     487             : 
     488    62411053 :         if(socket->pos > socket->watermark) {
     489       51793 :                 flush(socket);
     490             :         }
     491             :         return NETIO_STATUS_OK;
     492             : }
     493             : 
     494             : void
     495         113 : netio_buffered_recv_socket_init(struct netio_buffered_recv_socket* socket, struct netio_buffered_listen_socket* lsocket)
     496             : {
     497         113 :           memset(socket, 0, sizeof(*socket));
     498         113 :     socket->lsocket = lsocket;
     499         113 :     netio_init_recv_socket(&socket->recv_socket, &socket->lsocket->listen_socket);
     500         113 :     socket->recv_socket.cq_size = NETIO_MAX_CQ_EVENTS_BUFFERED;
     501         113 :     socket->recv_socket.usr = socket;
     502             : 
     503         113 :     socket->num_pages = socket->lsocket->num_pages;
     504         113 :     socket->pages = (struct netio_buffer*)malloc(socket->num_pages * sizeof(struct netio_buffer));
     505       24649 :     for(unsigned int i=0; i<socket->num_pages; i++) {
     506       24536 :       socket->pages[i].data = malloc(socket->lsocket->pagesize);
     507       24536 :       socket->pages[i].size = socket->lsocket->pagesize;
     508             :     }
     509         113 :     socket->recv_socket.lsocket->cb_msg_received = on_buffered_msg_received;
     510         113 : }
     511             : 
     512             : /* Same as above except for the _tcp in netio_init_recv_tcp_socket  */
     513             : void
     514          58 : netio_buffered_recv_tcp_socket_init(struct netio_buffered_recv_socket* socket, struct netio_buffered_listen_socket* lsocket)
     515             : {
     516          58 :   memset(socket, 0, sizeof(*socket));
     517          58 :   socket->lsocket = lsocket;
     518          58 :   netio_init_recv_tcp_socket(&socket->recv_socket, &socket->lsocket->listen_socket);
     519          58 :   socket->recv_socket.usr = socket;
     520             : 
     521          58 :   socket->num_pages = socket->lsocket->num_pages;
     522          58 :   socket->pages = (struct netio_buffer*)malloc(socket->num_pages * sizeof(struct netio_buffer));
     523        7959 :   for(unsigned int i=0; i<socket->num_pages; i++) {
     524        7901 :     socket->pages[i].data = malloc(socket->lsocket->pagesize);
     525        7901 :     socket->pages[i].size = socket->lsocket->pagesize;
     526             :   }
     527          58 :   socket->recv_socket.lsocket->cb_msg_received = on_buffered_msg_received;
     528          58 : }
     529             : 
     530             : 
     531             : /**
     532             :  * Flushes the current buffer of the given buffered send socket.
     533             :  *
     534             :  * @param socket The buffered send socket
     535             :  */
     536             : void
     537     1064816 : netio_buffered_flush(struct netio_buffered_send_socket* socket)
     538             : {
     539     1064816 :         flush(socket);
     540     1064816 : }

Generated by: LCOV version 1.0