LCOV - code coverage report
Current view: top level - netio-next/src - netio.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 320 425 75.3 %
Date: 2025-08-12 04:15:35 Functions: 26 31 83.9 %

          Line data    Source code
       1             : #include <unistd.h>
       2             : #include <stdio.h>
       3             : #include <sys/uio.h>
       4             : #include <string.h>
       5             : #include "netio/netio.h"
       6             : #include "netio/netio_tcp.h"
       7             : #include "connection_event.h"
       8             : #include "completion_event.h"
       9             : #include <stdlib.h>
      10             : #include <sys/types.h>
      11             : #include <errno.h>
      12             : #include <netdb.h>
      13             : #include <fcntl.h>
      14             : #include <sys/socket.h>
      15             : #include <netinet/in.h>
      16             : #include <arpa/inet.h>
      17             : #include "log.h"
      18             : 
      19             : #if defined DEBUG || defined DEBUG_IO
      20             : #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
      21             : #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
      22             : #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
      23             : #else
      24             : #define log_dbg(...)
      25             : #define log_trc(...)
      26             : #endif
      27             : 
      28             : # define ERROR_LOG( ... ) do { log_fatal(__VA_ARGS__); fflush(stdout); exit(-2);} while(0)
      29             : 
      30             : #define ON_ERROR_CONNECTION_REFUSED(s, msg, c) \
      31             : do { \
      32             :   s->fi_errno = -c; \
      33             :   s->fi_message = strdup(msg); \
      34             :         netio_error_connection_refused_fire(s); \
      35             : } while(0);
      36             : 
      37             : #define ON_ERROR_BIND_REFUSED(s, msg, c) \
      38             : do { \
      39             :   s->fi_errno = -c; \
      40             :   s->fi_message = strdup(msg); \
      41             :         netio_error_bind_refused_fire(s); \
      42             : } while(0);
      43             : 
      44             : 
      45             : //Globals
      46             : 
      47             : 
      48             : // STATIC FUNCTIONS ////////////////////////////////////////////////////////////
      49             : 
      50             : static int
      51         569 : _socket_init_info(struct netio_send_socket* socket, const char* hostname, unsigned port, void* addr, size_t addrlen)
      52             : {
      53         569 :   log_dbg("Going trough _socket_init_info");
      54         569 :   int ret=0;
      55         569 :   struct fi_info* hints;
      56             : 
      57         569 :   hints = fi_allocinfo();
      58         569 :   hints->addr_format = FI_FORMAT_UNSPEC;
      59         569 :   hints->ep_attr->type  = FI_EP_MSG;
      60         569 :   hints->caps = FI_MSG;
      61         569 :   hints->mode   = FI_LOCAL_MR;
      62             :   // As of libfabric 1.10, the tcp provider only support FI_PROGRESS_MANUAL
      63             :   // So the following will not allow the tcp provider to be used
      64         569 :   hints->domain_attr->data_progress = FI_PROGRESS_AUTO;
      65         569 :   hints->domain_attr->resource_mgmt = FI_RM_ENABLED;
      66             : 
      67         569 :   char port_addr[32];
      68         569 :   snprintf(port_addr, 32, "%u", port);
      69         569 :   log_dbg("connecting to endpoint %s:%u", hostname, port);
      70             : 
      71         569 :   uint64_t flags = 0;
      72         569 :   if(hostname == NULL) {
      73          43 :     hostname = "127.0.0.1";
      74          43 :     flags = FI_SOURCE;
      75             :   }
      76             : 
      77         569 :   if(addr) {
      78             :     // TODO we have to convert the sockaddr_in into hostname port - why can't we directly supply the addr+addrlen in hints->dest_addr?
      79          43 :     struct sockaddr_in* sockaddr = (struct sockaddr_in*)addr;
      80          43 :     char* str_addr = inet_ntoa(sockaddr->sin_addr);
      81          43 :     log_dbg("sockaddr: %s:%d", str_addr, ntohs(sockaddr->sin_port));
      82             : 
      83          43 :     hostname  = str_addr;
      84          43 :     snprintf(port_addr, 32, "%d", ntohs(sockaddr->sin_port));
      85          43 :     flags = 0;
      86             :   }
      87             : 
      88         569 :   if((ret = fi_getinfo(FI_VERSION(1, 1), hostname, port_addr, flags, hints, &socket->fi)))
      89             :   {
      90           4 :     fi_freeinfo(hints);
      91           4 :     log_error("Failed to initialise socket, error %d: %s", ret, fi_strerror(-ret));
      92           4 :     ON_ERROR_CONNECTION_REFUSED(socket, "fi_getinfo failed", ret);
      93           4 :     return -1;
      94             :   }
      95             : 
      96         565 :   log_dbg("addr format: %x", socket->fi->addr_format);
      97         565 :   log_dbg("fi_freeinfo");
      98         565 :   fi_freeinfo(hints);
      99         565 :   return 0;
     100             : }
     101             : 
     102             : 
     103             : static int
     104         561 : _socket_init_domain(struct netio_send_socket* socket)
     105             : {
     106         561 :   int ret=0;
     107         561 :   struct netio_domain *domain = malloc(sizeof(struct netio_domain));
     108         561 :   domain->reg_mr = 0;
     109         561 :   domain->mr = malloc(NETIO_DOMAIN_MAX_MR*sizeof(struct fid*));
     110         561 :   domain->nb_sockets = 1;
     111         561 :   socket->domain = domain;
     112             : 
     113         561 :   if((ret = fi_fabric(socket->fi->fabric_attr, &domain->fabric, NULL)))
     114             :   {
     115           0 :     log_error("Failed to initialise fabric, error %d: %s", ret, fi_strerror(-ret));
     116           0 :     ON_ERROR_CONNECTION_REFUSED(socket, "fi_fabric failed", ret);
     117           0 :     return -1;
     118             :   }
     119             : 
     120         561 :   if((ret = fi_domain(domain->fabric, socket->fi, &domain->domain, NULL)))
     121             :   {
     122           0 :     log_error("Failed to initialise domain, error %d: %s", ret, fi_strerror(-ret));
     123           0 :     ON_ERROR_CONNECTION_REFUSED(socket, "fi_domain failed", ret);
     124           0 :     return -1;
     125             :   }
     126             :   return 0;
     127             : }
     128             : 
     129             : static int
     130         565 : _socket_connect(struct netio_send_socket* socket)
     131             : {
     132         565 :   int ret=0;
     133         565 :   struct fi_eq_attr eq_attr = {10000, 0, FI_WAIT_FD, 0, 0};
     134             : 
     135             :   //Resources initialisation
     136         565 :   socket->eqfd = -1;
     137         565 :   socket->cqfd = -1;
     138         565 :   socket->ep = NULL;
     139         565 :   socket->eq = NULL;
     140         565 :   socket->cq = NULL;
     141             : 
     142         565 :   if((ret = fi_eq_open(socket->domain->fabric, &eq_attr, &socket->eq, NULL)))
     143             :   {
     144           0 :     log_error("Failed to open Event Queue for send socket, error %d: %s", ret, fi_strerror(-ret));
     145           0 :     ON_ERROR_CONNECTION_REFUSED(socket, "fi_eq_open failed", ret);
     146           0 :     return -1;
     147             :   }
     148             : 
     149         565 :   if((ret = fi_endpoint(socket->domain->domain, socket->fi, &socket->ep, NULL)))
     150             :   {
     151           0 :     log_error("Failed to open Endpoint for send socket, error %d: %s", ret, fi_strerror(-ret));
     152           0 :     ON_ERROR_CONNECTION_REFUSED(socket, "fi_endpoint failed", ret);
     153           0 :     return -1;
     154             :   }
     155             : 
     156         565 :   if((ret = fi_ep_bind((socket->ep), &socket->eq->fid, 0)))
     157             :   {
     158           0 :     log_error("Failed to bind endpoint, error %d: %s", ret, fi_strerror(-ret));
     159           0 :     ON_ERROR_CONNECTION_REFUSED(socket, "fi_ep_bind failed", ret);
     160           0 :     return -1;
     161             :   }
     162             : 
     163         565 :   struct fi_cq_attr cq_attr;
     164         565 :   cq_attr.size = NETIO_MAX_CQ_ENTRIES;      /* # entries for CQ */
     165         565 :   cq_attr.flags = 0;     /* operation flags */
     166         565 :   cq_attr.format = FI_CQ_FORMAT_DATA; //FI_CQ_FORMAT_CONTEXT;    /* completion format */
     167         565 :   cq_attr.wait_obj= FI_WAIT_FD;  /* requested wait object */
     168         565 :   cq_attr.signaling_vector = 0; /* interrupt affinity */
     169         565 :   cq_attr.wait_cond = FI_CQ_COND_NONE; /* wait condition format */ // The threshold indicates the number of entries that are to be queued before at the CQ before the wait is satisfied.
     170         565 :   cq_attr.wait_set = NULL;  /* optional wait set */
     171             : 
     172             :   //FI_TRANSMIT CQ
     173         565 :   if((ret = fi_cq_open(socket->domain->domain, &cq_attr, &socket->cq, NULL)) != 0)
     174             :   {
     175           0 :     log_error("Failed to open Completion Queue for send socket, error %d: %s", ret, fi_strerror(-ret));
     176           0 :     ON_ERROR_CONNECTION_REFUSED(socket, "fi_cq_open failed", ret);
     177           0 :     return -1;
     178             :   }
     179             : 
     180         565 :   if((ret = fi_ep_bind((socket->ep), &socket->cq->fid, FI_TRANSMIT)) != 0)
     181             :   {
     182           0 :     log_error("Failed to open Completion Queue for send socket, error %d: %s", ret, fi_strerror(-ret));
     183           0 :     ON_ERROR_CONNECTION_REFUSED(socket, "fi_ep_bind failed", ret);
     184           0 :     return -1;
     185             :   }
     186             : 
     187             :   //FI_RECV CQ - also necessary
     188         565 :   cq_attr.format = FI_CQ_FORMAT_UNSPEC;
     189         565 :   cq_attr.wait_obj= FI_WAIT_NONE;
     190         565 :   if((ret = fi_cq_open(socket->domain->domain, &cq_attr, &socket->rcq, NULL)) != 0)
     191             :   {
     192           0 :     log_error("Failed to open Completion Queue for send socket, error %d: %s", ret, fi_strerror(-ret));
     193           0 :     ON_ERROR_CONNECTION_REFUSED(socket, "fi_cq_open failed", ret);
     194           0 :     return -1;
     195             :   }
     196             : 
     197         565 :   if((ret = fi_ep_bind((socket->ep), &socket->rcq->fid, FI_RECV)) != 0)
     198             :   {
     199           0 :     log_error("Failed to open Completion Queue for send socket, error %d: %s", ret, fi_strerror(-ret));
     200           0 :     ON_ERROR_CONNECTION_REFUSED(socket, "fi_ep_bind failed", ret);
     201           0 :     return -1;
     202             :   }
     203             : 
     204             : 
     205         565 :   if((ret = fi_enable(socket->ep)) != 0)
     206             :   {
     207           0 :     log_error("Failed to enable endpoint for send socket, error %d: %s", ret, fi_strerror(-ret));
     208           0 :     ON_ERROR_CONNECTION_REFUSED(socket, "fi_enable failed", ret);
     209           0 :     return -1;
     210             :   }
     211             : 
     212             :   /* Connect to server */
     213         565 :   if((ret = fi_connect(socket->ep, socket->fi->dest_addr, NULL, 0)) != 0)
     214             :   {
     215           4 :     log_warn("Connection to remote failed, error %d: %s", ret, fi_strerror(-ret));
     216           4 :     ON_ERROR_CONNECTION_REFUSED(socket, "fi_connect failed", ret);
     217           4 :     return -1;
     218             :   }
     219             : 
     220         561 :   if((ret = fi_control(&socket->eq->fid, FI_GETWAIT, &socket->eqfd)) != 0)
     221             :   {
     222           0 :     log_error("Cannot retrieve the Event Queue wait object of send socket, error %d: %s", ret, fi_strerror(-ret));
     223           0 :     ON_ERROR_CONNECTION_REFUSED(socket, "fi_control failed", ret);
     224           0 :     return -1;
     225             :   }
     226             : 
     227         561 :   socket->eq_ev_ctx.fd = socket->eqfd;
     228         561 :   socket->eq_ev_ctx.data = socket;
     229         561 :   socket->eq_ev_ctx.cb = on_send_socket_libfabric_cm_event;
     230             : 
     231         561 :   log_dbg("Adding SEND EQ polled fid %d %p \n", socket->eqfd, &socket->eq->fid);
     232         561 :   add_polled_fid(&socket->ctx->evloop.pfids, socket->domain->fabric, &socket->eq->fid, socket->eqfd, socket, on_send_socket_libfabric_cm_event);
     233         561 :   add_open_fd(&socket->ctx->evloop.openfds, socket->eqfd, NETIO_EQ, USEND, socket);
     234         561 :   netio_register_read_fd(&socket->ctx->evloop, &socket->eq_ev_ctx);
     235         561 :   log_dbg("send_socket: EQ fd %d waiting for connection", socket->eqfd);
     236             : 
     237         561 :   return 0;
     238             : }
     239             : 
     240             : 
     241             : // API FUNCTIONS ///////////////////////////////////////////////////////////////
     242             : 
     243             : /**
     244             :  * Configures the debug level.
     245             :  *
     246             :  * @param level: The debug level, an integer ranging from 0 (TRACE) to 5 (FATAL)
     247             :  */
     248             : void
     249         419 : netio_set_debug_level(int level)
     250             : {
     251         419 :   log_set_level(level);
     252         419 : }
     253             : 
     254             : /**
     255             :  * Return 1 if hostname contains the tcp protocol (prefix tcp:)
     256             :  *
     257             :  * @param hostname: Hostname or IP address of the remote endpoint
     258             :  */
     259        1597 : int netio_tcp_mode(const char* hostname) {
     260        1597 :   return (strncmp( hostname, "tcp:", 4) == 0);
     261             : }
     262             : 
     263             : /**
     264             :  * Return protocol (tcp:, libfabric:)
     265             :  * No other implementation than tcp or libfabric at this time.
     266             :  *
     267             :  * @param hostname: Hostname or IP address of the remote endpoint
     268             :  */
     269        1260 : const char* netio_protocol(const char* hostname) {
     270        1260 :   return netio_tcp_mode(hostname) ? "tcp" : "libfabric";
     271             : }
     272             : 
     273             : /**
     274             :  * Return hostname only of hostname without any prefix (tcp:, libfabric:)
     275             :  *
     276             :  * @param hostname: Hostname or IP address of the remote endpoint
     277             :  */
     278        2724 : const char* netio_hostname(const char* hostname) {
     279        2724 :   const char* split = strchr(hostname, ':');
     280        2724 :   return split ? &split[1] : hostname;
     281             : }
     282             : 
     283             : /**
     284             :  * Initializes the eventloop first resetting the context passed as argument.
     285             :  *
     286             :  * @param ctx: The netio context
     287             :  */
     288             : void
     289         552 : netio_init(struct netio_context* ctx)
     290             : {
     291         552 :   log_set_level(DEFAULT_DEBUG_LEVEL);
     292         552 :   memset(ctx, 0, sizeof(*ctx));
     293         552 :   netio_eventloop_init(&ctx->evloop);
     294         552 : }
     295             : 
     296             : 
     297             : /**
     298             :  * Initializes an unbuffered send socket.
     299             :  *
     300             :  * @param socket: The socket to intialize
     301             :  * @param ctx: The netio context
     302             :  */
     303             : void
     304         569 : netio_init_send_socket(struct netio_send_socket* socket, struct netio_context* ctx)
     305             : {
     306         569 :   memset(socket, 0, sizeof(*socket));
     307         569 :   socket->ctx = ctx;
     308         569 :   socket->epollfd = socket->ctx->evloop.epollfd;
     309         569 :   socket->state = UNCONNECTED;
     310         569 :   socket->tcp_fi_mode = NETIO_MODE_LIBFABRIC;
     311         569 :   socket->cq_size = NETIO_MAX_CQ_EVENTS;
     312         569 :   socket->unbuf_pub_socket = NULL;
     313         569 :   socket->cb_internal_connection_closed = NULL;
     314         569 :   socket->deferred_subs = NULL;
     315         569 :   socket->recv_socket = NULL;
     316         569 : }
     317             : 
     318             : 
     319             : /**
     320             :  * Initializes an unbuffered listen socket.
     321             :  *
     322             :  * @param socket: The socket to intialize
     323             :  * @param ctx: The netio context
     324             :  */
     325             : void
     326         265 : netio_init_listen_socket(struct netio_listen_socket* socket, struct netio_context* ctx, struct netio_unbuffered_socket_attr* attr)
     327             : {
     328         265 :   memset(socket, 0, sizeof(*socket));
     329         265 :   socket->ctx = ctx;
     330         265 :   socket->recv_sockets = NULL;
     331         265 :   socket->tcp_fi_mode = NETIO_MODE_LIBFABRIC;
     332         265 :   if (attr == NULL){
     333             :     socket->attr.buffer_size = 0;
     334             :     socket->attr.num_buffers = 0;
     335             :   } else {
     336          88 :     if (attr->num_buffers > NETIO_DOMAIN_MAX_MR){
     337           0 :       log_error("Number of recv buffers requested %lu exceeds max value. Using %lu.", attr->num_buffers, NETIO_DOMAIN_MAX_MR);
     338           0 :       attr->num_buffers = NETIO_DOMAIN_MAX_MR;
     339             :     }
     340          88 :     socket->attr = *attr;
     341             :   }
     342         265 : }
     343             : 
     344             : 
     345             : /**
     346             :  * Initializes an unbuffered recv socket.
     347             :  *
     348             :  * @param socket: The socket to intialize
     349             :  * @param ctx: The netio context
     350             :  */
     351             : void
     352         220 : netio_init_recv_socket(struct netio_recv_socket* socket, struct netio_listen_socket* lsocket)
     353             : {
     354         220 :   memset(socket, 0, sizeof(*socket));
     355         220 :   socket->ctx = lsocket->ctx;
     356         220 :   socket->lsocket = lsocket;
     357         220 :   socket->reg_mr = 0;
     358         220 :   socket->cq_size = NETIO_MAX_CQ_EVENTS;
     359         220 :   socket->sub_msg_buffers = NULL;
     360         220 :   socket->mr = malloc(NETIO_DOMAIN_MAX_MR*sizeof(struct fid*));
     361         220 :   socket->tcp_fi_mode = NETIO_MODE_LIBFABRIC;
     362         220 : }
     363             : 
     364             : void
     365           4 : netio_send_socket_init_and_connect(struct netio_send_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port) {
     366           4 :   int tcp = netio_tcp_mode(hostname);
     367           4 :   const char* host = netio_hostname(hostname);
     368           4 :   if (tcp) {
     369           2 :     netio_init_send_tcp_socket(socket, ctx);
     370           2 :     netio_connect_tcp(socket, host, port);
     371             :   } else {
     372           2 :     netio_init_send_socket(socket, ctx);
     373           2 :     netio_connect_domain(socket, host, port, NULL);
     374             :   }
     375           4 : }
     376             : 
     377             : void
     378           2 : netio_listen_socket_init_and_listen(struct netio_listen_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_unbuffered_socket_attr* attr) {
     379           2 :   int tcp = netio_tcp_mode(hostname);
     380           2 :   const char* host = netio_hostname(hostname);
     381           2 :   if (tcp) {
     382           1 :     netio_init_listen_tcp_socket(socket, ctx, attr);
     383           1 :     netio_listen_tcp(socket, host, port);
     384             :   } else {
     385           1 :     netio_init_listen_socket(socket, ctx, attr);
     386           1 :     netio_listen(socket, host, port);
     387             :   }
     388           2 : }
     389             : 
     390             : void
     391         541 : netio_connect(struct netio_send_socket* socket, const char* hostname, unsigned port)
     392             : {
     393         541 :   if (socket->tcp_fi_mode == NETIO_MODE_TCP){
     394          17 :     netio_connect_tcp(socket, netio_hostname(hostname), port);
     395             :   }
     396             :   else{
     397         524 :     netio_connect_domain(socket, netio_hostname(hostname), port, NULL);
     398             :   }
     399         541 : }
     400             : 
     401             : /**
     402             :  * Connect an unbuffered send socket to a remote endpoint.
     403             :  *
     404             :  * @param socket: An unbuffered send socket
     405             :  * @param hostname: Hostname or IP address of the remote endpoint
     406             :  * @param port: Port of the remote endpoint
     407             :  * @param domain: an already existing and initialized netio_domain that this connect will use instead of creating its own
     408             :  *        Obviously this can only work if the socket's actual domain (physical NIC) would refer to the same as domain's
     409             :  */
     410             : void
     411         526 : netio_connect_domain(struct netio_send_socket* socket, const char* hostname, unsigned port, struct netio_domain* domain)
     412             : {
     413         526 :   log_dbg("_socket_init_info");
     414         526 :   if ((_socket_init_info(socket, hostname, port, NULL, 0))) return;
     415         522 :   if (domain == NULL) {
     416         522 :     log_dbg("_socket_init_domain in netio_connect_domain");
     417         522 :     if ( _socket_init_domain(socket) ) return;
     418             :   } else {
     419           0 :     domain->nb_sockets += 1;
     420           0 :     socket->domain = domain;
     421             :   }
     422         522 :   _socket_connect(socket);
     423             : }
     424             : 
     425             : 
     426          22 : void netio_connect_rawaddr(struct netio_send_socket* socket, void* addr, size_t addrlen)
     427             : {
     428          22 :   netio_connect_rawaddr_domain(socket, addr, addrlen, NULL);
     429          22 : }
     430             : 
     431             : 
     432             : /**
     433             :  * Same as netio_connect_domain but using the ip address and port
     434             :  *
     435             :  * @param socket: An unbuffered send socket
     436             :  * @param addr: IP and port of the remote endpoint
     437             :  * @param addrlen: lenght of the address
     438             :  * @param domain: an already existing and initialized netio_domain that this connect will use instead of creating its own
     439             :  *        Obviously this can only work if the socket's actual domain (physical NIC) would refer to the same as domain's
     440             :  */
     441          43 : void netio_connect_rawaddr_domain(struct netio_send_socket* socket, void* addr, size_t addrlen, struct netio_domain* domain)
     442             : {
     443          43 :   log_dbg("_socket_init_info");
     444          43 :   if ((_socket_init_info(socket, NULL, 0, addr, addrlen))) return;
     445          43 :   if (domain == NULL) {
     446          39 :     log_dbg("_socket_init_domain in netio_connect_rawaddr_domain");
     447          39 :     if ( _socket_init_domain(socket) ) return;
     448             :   } else {
     449           4 :     log_dbg("socket %p keeping domain %p with %d sockets", socket, domain, domain->nb_sockets);
     450           4 :     domain->nb_sockets += 1;
     451           4 :     socket->domain = domain;
     452             :   }
     453          43 :   _socket_connect(socket);
     454             : }
     455             : 
     456             : 
     457             : /**
     458             :  * Disconnect a connected unbuffered send socket.
     459             :  *
     460             :  * @param socket: A connected unbuffered send socket
     461             :  */
     462             : void
     463         404 : netio_disconnect(struct netio_send_socket* socket)
     464             : {
     465         404 :   if(socket->tcp_fi_mode == NETIO_MODE_TCP){
     466          15 :     shutdown(socket->cq_ev_ctx.fd, SHUT_RDWR);
     467             :   } else {
     468         389 :     if(!socket->ep) {
     469             :       log_dbg("Send socket %p EP not initialised, not calling fi_shutdown.", socket);
     470             :       return;
     471             :     }
     472         389 :     int ret=0;
     473         389 :     if((ret = fi_shutdown(socket->ep, 0))){
     474           0 :       log_error("Failed to shutdown send_socket, error %d: %s", ret, fi_strerror(-ret));
     475           0 :       ON_ERROR_CONNECTION_REFUSED(socket, "fi_shutdown failed", ret);
     476           0 :       return;
     477             :     }
     478             :   }
     479             : }
     480             : 
     481             : 
     482             : 
     483             : void
     484           0 : netio_connection_shutdown(void* ptr)
     485             : {
     486           0 :   log_dbg("Handle_connection_shutdown.");
     487           0 :   struct signal_data* sd = (struct signal_data*)ptr;
     488           0 :   struct netio_send_socket* socket = (struct netio_send_socket*)sd->ptr;
     489           0 :   int ret=0;
     490           0 :   if(!socket->ep){
     491             :     log_dbg("Send socket %p EP not initialised, not calling fi_shutdown.", socket);
     492             :     return;
     493             :   }
     494           0 :   if((ret = fi_shutdown(socket->ep, 0)))
     495             :   {
     496           0 :     ON_ERROR_CONNECTION_REFUSED(socket, "fi_shutdown failed", ret);
     497           0 :     return;
     498             :   }
     499             :   //clean up signal
     500           0 :   netio_signal_close(sd->evloop, sd->signal);
     501           0 :   free(sd->signal);
     502           0 :   free(sd);
     503             : }
     504             : 
     505             : 
     506             : /**
     507             :  * Bind an unbuffered listen socket to an endpoint and listen for incoming connections.
     508             :  *
     509             :  * @param socket: An unbuffered listen socket
     510             :  * @param hostname: Hostname or IP address of an endpoint
     511             :  * @param port: A port number to listen on
     512             :  */
     513             : void
     514         128 : netio_listen(struct netio_listen_socket* socket, const char* hostname, unsigned port)
     515             : {
     516         128 :   int ret=0;
     517         128 :   struct fi_info* hints;
     518         128 :   struct fi_eq_attr eq_attr = {10000, 0, FI_WAIT_FD, 0, 0};
     519             : 
     520         128 :   hints = fi_allocinfo();
     521         128 :   hints->addr_format = FI_FORMAT_UNSPEC;
     522         128 :   hints->ep_attr->type  = FI_EP_MSG;
     523         128 :   hints->caps = FI_MSG;
     524         128 :   hints->mode = FI_LOCAL_MR;
     525         128 :   char port_addr[32];
     526         128 :   snprintf(port_addr, 32, "%u", port);
     527             : 
     528             :   //Resource initialisation
     529         128 :   socket->eqfd = -1;
     530         128 :   socket->pep = NULL;
     531         128 :   socket->eq = NULL;
     532         128 :   socket->fi = NULL;
     533             : 
     534         128 :   if((ret = fi_getinfo(FI_VERSION(1, 1), hostname, port_addr, FI_SOURCE, hints,
     535             :                        &socket->fi)))
     536             :   {
     537           2 :     log_error("Failed to get info on local interface, error %d: %s", ret, fi_strerror(-ret));
     538           2 :     ON_ERROR_BIND_REFUSED(socket, "fi_getinfo failed", ret);
     539           8 :     return;
     540             :   }
     541         126 :   log_dbg("addr format: %x", socket->fi->addr_format);
     542             : 
     543         126 :   if((ret = fi_fabric(socket->fi->fabric_attr, &socket->fabric, NULL)))
     544             :   {
     545           0 :     log_error("Failed to open fabric for listen socket, error %d: %s", ret, fi_strerror(-ret));
     546           0 :     ON_ERROR_BIND_REFUSED(socket, "fi_fabric failed", ret);
     547           0 :     return;
     548             :   }
     549         126 :   if((ret = fi_eq_open(socket->fabric, &eq_attr, &socket->eq, NULL)))
     550             :   {
     551           0 :     log_error("Failed to open Event Queue for listen socket, error %d: %s", ret, fi_strerror(-ret));
     552           0 :     ON_ERROR_BIND_REFUSED(socket, "fi_eq_open failed", ret);
     553           0 :     return;
     554             :   }
     555             : 
     556         126 :   if((ret = fi_passive_ep(socket->fabric, socket->fi, &socket->pep, NULL)))
     557             :   {
     558           0 :     log_error("Failed to open passive endpoint for listen socket, error %d: %s", ret, fi_strerror(-ret));
     559           0 :     ON_ERROR_BIND_REFUSED(socket, "fi_passive_ep failed", ret);
     560           0 :     return;
     561             :   }
     562             : 
     563         126 :   if((ret = fi_pep_bind(socket->pep, &socket->eq->fid, 0)))
     564             :   {
     565           0 :     log_error("Failed to bind passive endpoint for listen socket, error %d: %s", ret, fi_strerror(-ret));
     566           0 :     ON_ERROR_BIND_REFUSED(socket, "fi_pep_bind failed", ret);
     567           0 :     return;
     568             :   }
     569             : 
     570         126 :   if((ret = fi_listen(socket->pep)))
     571             :   {
     572           4 :     log_error("Failed to enable listen socket, error %d: %s", ret, fi_strerror(-ret));
     573           4 :     ON_ERROR_BIND_REFUSED(socket, "fi_listen failed", ret);
     574           4 :     return;
     575             :   }
     576             : 
     577         122 :   if((ret = fi_control(&socket->eq->fid, FI_GETWAIT, &socket->eqfd)))
     578             :   {
     579           0 :     log_error("Failed to retrive listen socket Event Queue wait object, error %d: %s", ret, fi_strerror(-ret));
     580           0 :     ON_ERROR_BIND_REFUSED(socket, "fi_control failed", ret);
     581           0 :     return;
     582             :   }
     583             : 
     584         122 :   socket->eq_ev_ctx.fd = socket->eqfd;
     585         122 :   socket->eq_ev_ctx.data = socket;
     586         122 :   socket->eq_ev_ctx.cb = on_listen_socket_libfabric_cm_event;
     587             : 
     588             :   //TODO: The listen EQ should be added too, but I don't know how to remove it to avoid a trywait SEGFAULT.
     589             :   //printf("Adding LISTEN EQ polled fid %d %p \n", socket->eqfd, &socket->eq->fid);
     590             :   //add_polled_fid(&socket->ctx->evloop.pfids, socket->fabric, &socket->eq->fid, socket->eqfd, socket, on_listen_socket_cm_event);
     591         122 :   add_open_fd(&socket->ctx->evloop.openfds, socket->eqfd, NETIO_EQ, ULISTEN, socket);
     592             : 
     593         122 :   netio_register_read_fd(&socket->ctx->evloop, &socket->eq_ev_ctx);
     594         122 :   log_dbg("netio_listen_socket: registering EQ fd %d", socket->eqfd);
     595         122 :   fi_freeinfo(hints);
     596             : }
     597             : 
     598             : 
     599             : /**
     600             :  * Retrieve the local endpoint address and store it in the sockaddr_storage sa.
     601             :  * The function returns the lenght of the address.
     602             :  *
     603             :  * @param socket: An unbuffered listen socket
     604             :  * @param sa: structure storing the local endpoint address
     605             :  */
     606           9 : size_t netio_listen_socket_endpoint(struct netio_listen_socket* socket, struct sockaddr_storage *sa)
     607             : {
     608           9 :   size_t addrlen;
     609             :   //memset(sa, 0, sizeof(*sa));
     610           9 :   addrlen = sizeof(struct sockaddr_storage);
     611             : 
     612           9 :   int ret=0;
     613           9 :   if((ret = fi_getname(&socket->pep->fid, sa, &addrlen)))
     614             :   {
     615           0 :     log_error("Failed to retrieve the local endpoint address, error %d: %s", ret, fi_strerror(-ret));
     616           0 :     ON_ERROR_BIND_REFUSED(socket, "fi_getname failed", ret);
     617           0 :     return 0;
     618             :   }
     619             : 
     620           9 :   return addrlen;
     621             : }
     622             : 
     623             : 
     624             : /**
     625             :  * Post a receive buffer to an unbuffered receive socket.
     626             :  *
     627             :  * Receive buffers must be registered using `netio_register_recv_buffer`.
     628             :  *
     629             :  * @param socket: An unbuffered receive socket
     630             :  * @param buf: A registered receive buffer.
     631             :  */
     632             : void
     633    36966138 : netio_post_recv(struct netio_recv_socket* socket, struct netio_buffer* buf)
     634             : {
     635    36966138 :   struct iovec iov;
     636    36966138 :   void* desc;
     637             : 
     638    36966138 :   struct netio_tcp_recv_item *mrdn;
     639             : 
     640    36966138 :   if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC)
     641             :   {
     642             : 
     643    21642596 :     iov.iov_base = buf->data;
     644    21642596 :     iov.iov_len = buf->size;
     645    21642596 :     desc = fi_mr_desc(buf->mr);
     646             : 
     647    21642596 :     struct fi_msg msg;
     648    21642596 :     msg.msg_iov = &iov; /* scatter-gather array */
     649    21642596 :     msg.desc = &desc;
     650    21642596 :     msg.iov_count = 1;
     651    21642596 :     msg.addr = 0;
     652    21642596 :     msg.context = buf;
     653    21642596 :     msg.data = 0;
     654             : 
     655    21642596 :     uint64_t flags;
     656    21642596 :     flags = FI_REMOTE_CQ_DATA;//FI_MULTI_RECV;
     657             : 
     658    21642596 :     int ret=0;
     659    21642596 :     if((ret = fi_recvmsg(socket->ep, &msg, flags)) != 0)
     660             :     {
     661           0 :       log_error("Failed to post a buffer to receive inbound messages, error %d: %s", ret, fi_strerror(-ret));
     662             :     }
     663             :   }
     664             :   else
     665             :   {
     666             :     //Allocate memory for a message request descriptor
     667    15323542 :     struct netio_tcp_recv_item *mrd;
     668    15323542 :     mrd = (struct netio_tcp_recv_item *) malloc(sizeof(struct netio_tcp_recv_item));
     669    15323542 :     if(mrd == NULL)
     670             :     {
     671           0 :       ERROR_LOG("cannot allocate memory for descriptor");
     672             :     }
     673             :     /* log_debug("mrd is at %p", (void *)mrd); */
     674             : 
     675    15323542 :     mrd->element_active = 1;           //MJ do we need this variable?
     676    15323542 :     mrd->socket         = socket;      //this is a netio_recv_socket
     677    15323542 :     mrd->buffer         = buf;
     678    15323542 :     mrd->next_element   = NULL;
     679    15323542 :     mrd->bytes_received = 0;
     680    15323542 :     mrd->message_size   = 0;
     681             : 
     682             :     /* log_debug("receive descriptor allocated and initialized"); */
     683             : 
     684             :     //Append the descriptor to the list
     685    15323542 :     if(socket->message_request_header == NULL)
     686             :     {
     687         100 :       socket->message_request_header = (void *)mrd;
     688             :       /* log_debug("descriptor linked to head of queue"); */
     689             :     }
     690             :     else
     691             :     {
     692             :       int free_item = 1;
     693             :       struct netio_tcp_recv_item *mrdq;
     694             :       mrdq = (struct netio_tcp_recv_item *)socket->message_request_header;
     695             :       /* log_debug("Head of list points at = %p", mrdq); */
     696             :       int mrd_linked = 0;
     697             : 
     698  4508172256 :       do
     699             :       {
     700  4508172256 :         if (mrdq->next_element == NULL)
     701             :         {
     702    15323446 :           mrdq->next_element = (void *)mrd;
     703    15323446 :           mrd_linked = 1;
     704             :           //                log_error("End of list found at entry %d. Appending mrd to the desriptor at %p", free_item, mrdq);
     705             :         }
     706             :         else
     707             :         {
     708             :           mrdn = (struct netio_tcp_recv_item *)mrdq->next_element;
     709             :           //                log_error("item = %d. %p points at %p", free_item, mrdq, mrdn);
     710             :           free_item++;
     711  4508172256 :           mrdq = mrdn;
     712             :         }
     713             :       }
     714  4508172256 :       while(!mrd_linked);
     715             :     }
     716             :     /*
     717             :     log_debug("Calling netio_signal_fire");
     718             :     log_debug("&socket->tcp_signal = %p", &socket->tcp_signal);
     719             :     netio_signal_fire(&socket->tcp_signal);
     720             :     */
     721    15323542 :     return;
     722             :   }
     723             : }
     724             : 
     725             : /**
     726             :  * Removes recv socket from recv socket list of a listen socket.
     727             :  *
     728             :  * @param socket: An unbuffered receive socket.
     729             :  */
     730             : void
     731           0 : netio_remove_recv_socket(struct netio_recv_socket* socket){
     732           0 :   struct netio_listen_socket* lsocket = socket->lsocket;
     733           0 :   int ret = remove_socket(&lsocket->recv_sockets, (void*)socket);
     734           0 :   if (ret == 0){
     735           0 :     log_info("Unbuffered connection closed, recv socket deleted.");
     736             :   } else {
     737           0 :     log_warn("Unbuffered connection closed, could not delete recv socket.");
     738             :   }
     739           0 : }
     740             : 
     741             : /**
     742             :  * Removes buffered recv socket from recv socket list of a buffered listen socket.
     743             :  *
     744             :  * @param socket: An buffered receive socket.
     745             :  */
     746             : void
     747           0 : netio_buffered_remove_recv_socket(struct netio_buffered_recv_socket* socket){
     748           0 :   struct netio_buffered_listen_socket* lsocket = socket->lsocket;
     749           0 :   int ret = remove_socket(&(lsocket->listen_socket.recv_sockets), (void*)socket);
     750           0 :   if (ret == 0){
     751           0 :     log_info("Buffered connection closed, buffered recv socket deleted.");
     752             :   } else {
     753           0 :     log_warn("Buffered connection closed, could not delete recv socket.");
     754             :   }
     755           0 : }
     756             : 
     757             : static int
     758    12424354 : generic_sendmsg(struct netio_send_socket* socket,
     759             :                 struct iovec* iov,
     760             :                 void** desc,
     761             :                 size_t count,
     762             :                 uint64_t key,
     763             :                 uint64_t add_flags,
     764             :                 uint64_t imm)
     765             : {
     766    12424354 :   int ret=0;
     767    12424354 :   uint64_t flags;
     768             : 
     769    12424354 :   struct fi_msg msg;
     770    12424354 :   msg.msg_iov = iov; /* scatter-gather array */
     771    12424354 :   msg.desc = desc;
     772    12424354 :   msg.iov_count = count;
     773    12424354 :   msg.addr = 0;
     774    12424354 :   msg.context = (void*)key;
     775    12424354 :   msg.data = imm;
     776             : 
     777    12424354 :   log_trc("sending iov message with immediate value 0x%lx", msg.data);
     778             : 
     779    12424354 :   flags = FI_INJECT_COMPLETE | add_flags;
     780             : 
     781    12424354 :   if(!socket->ep || !socket->ep->msg){
     782           0 :     log_error("Failed sending message because of null message or null endpoint.");
     783           0 :     return NETIO_STATUS_ERROR;
     784             :   }
     785             : 
     786    12424354 :   if((ret = fi_sendmsg(socket->ep, &msg, flags)) != 0)
     787             :   {
     788           0 :       if(ret == -FI_EAGAIN) {
     789             :           return NETIO_STATUS_AGAIN;
     790             :       }
     791           0 :       log_error("Failed to send message error (IOV count %lu, key %lu) %d: %s.", count, key, ret, fi_strerror(-ret));
     792           0 :       return NETIO_STATUS_ERROR;
     793             :   }
     794             :   return NETIO_STATUS_OK;
     795             : }
     796             : 
     797             : 
     798             : /**
     799             :  * Sends a full buffer over a connected unbuffered send socket.
     800             :  *
     801             :  * @param socket: A connected, unbuffered send socket
     802             :  * @param buf: A registered send buffer
     803             :  */
     804             : int
     805     1121463 : netio_send_buffer(struct netio_send_socket* socket, struct netio_buffer* buf)
     806             : {
     807     1121463 :   return netio_send(socket, buf, buf->data, buf->size, (uint64_t)buf);
     808             : }
     809             : 
     810             : 
     811             : /**
     812             :  * Sends a buffer over a connected unbuffered send socket as inline data.
     813             :  *
     814             :  * @param socket: A connected, unbuffered send socket
     815             :  * @param buf: A buffer whose size does not exceed FI_VERBS_INLINE_SIZE (libfabric default 64 bytes)
     816             :  * The buffer is reusable as soon as the function returns as it is copied by the provider.
     817             :  * The buffer is not required to be a registered memory region as no RDMA transfer occurs.
     818             :  * FI_VERBS_INLINE_SIZE corresponds to IBV_SEND_INLINE
     819             :  */
     820             : int
     821        2433 : netio_send_inline_buffer(struct netio_send_socket* socket, struct netio_buffer* buf)
     822             : {
     823        2433 :   struct iovec iov;
     824        2433 :   void* desc;
     825             : 
     826        2433 :   iov.iov_base = buf->data;
     827        2433 :   iov.iov_len = buf->size;
     828        2433 :   desc = fi_mr_desc(buf->mr);
     829        2433 :   uint64_t key = (uint64_t)buf;
     830        2433 :   uint64_t flags = FI_INJECT;
     831             : 
     832        2433 :   return generic_sendmsg(socket, /* struct netio_send_socket* socket */
     833             :                          &iov,   /* struct iovec* iov */
     834             :                          &desc,  /* void** desc */
     835             :                          1,      /* size_t count */
     836             :                          key,    /* uint64_t key */
     837             :                          flags,      /* uint64_t add_flags */
     838             :                          0       /* uint64_t imm */
     839             :          );
     840             : }
     841             : 
     842             : 
     843             : /**
     844             :  * Sends a partial buffer over a connected unbuffered send socket.
     845             :  *
     846             :  * @param socket: A connected, unbuffered send socket
     847             :  * @param buffer: A registered send buffer
     848             :  * @param addr: Pointer to message within the buffer
     849             :  * @param size: Size of the message
     850             :  * @param key: Message key used to track the message progress.
     851             :  */
     852             : int
     853     1121615 : netio_send(struct netio_send_socket* socket, struct netio_buffer* buf, void* addr, size_t size, uint64_t key)
     854             : {
     855     1121615 :   struct iovec iov;
     856     1121615 :   void* desc;
     857     1121615 :   struct netio_tcp_send_item *mrdn;
     858             : 
     859     1121615 :   if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC)
     860             :   {
     861     1118126 :     iov.iov_base = addr;
     862     1118126 :     iov.iov_len = size;
     863     1118126 :     desc = fi_mr_desc(buf->mr);
     864             : 
     865     1118126 :     return generic_sendmsg(socket, /* struct netio_send_socket* socket */
     866             :                            &iov,   /* struct iovec* iov */
     867             :                            &desc,  /* void** desc */
     868             :                            1,      /* size_t count */
     869             :                            key,    /* uint64_t key */
     870             :                            0,      /* uint64_t add_flags */
     871             :                            0       /* uint64_t imm */
     872             :            );
     873             :   }
     874             :   else
     875             :   {
     876        3489 :     if (socket->state!=CONNECTED)
     877             :     {
     878           0 :       log_warn("socket not connected (state=%d)",socket->state);
     879             :     }
     880             :     //Allocate memory for a message request descriptor
     881        3489 :     struct netio_tcp_send_item *mrd;
     882        3489 :     mrd = (struct netio_tcp_send_item *) malloc(sizeof(struct netio_tcp_send_item));
     883        3489 :     if(mrd == NULL)
     884             :     {
     885           0 :       ERROR_LOG("cannot allocate memory for descriptor");
     886             :     }
     887        3489 :     mrd->element_active = NETIO_TCP_NEW;
     888        3489 :     mrd->socket         = socket;
     889        3489 :     mrd->buffer         = buf;
     890        3489 :     log_dbg("netio-tcp: setting buffer size to msg size and buffer data to addr");
     891        3489 :     mrd->buffer->size = size;
     892        3489 :     mrd->buffer->data = addr;
     893        3489 :     mrd->total_bytes    = size;
     894        3489 :     mrd->bytes_left     = sizeof(int);
     895        3489 :     mrd->next_element   = NULL;
     896        3489 :     mrd->key            = (uint64_t) buf;
     897             :     //    log_debug("send descriptor allocated and initialized");
     898             : 
     899             :     //Append the descriptor to the list
     900        3489 :     if(socket->message_request_header == NULL)
     901             :     {
     902        2047 :       socket->message_request_header = (void *)mrd;
     903             :       //      log_debug("List was empty. Descriptor linked to head of list");
     904             :     }
     905             :     else
     906             :     {
     907             :       int free_item = 1;
     908             :       struct netio_tcp_send_item *mrdq;
     909             :       mrdq = (struct netio_tcp_send_item *)socket->message_request_header;
     910             :       //      log_debug("Head of list points at = %p", mrdq);
     911             :       int mrd_linked = 0;
     912             : 
     913      290370 :       do
     914             :       {
     915      290370 :         if (mrdq->next_element == NULL)
     916             :         {
     917        1442 :           mrdq->next_element = (void *)mrd;
     918        1442 :           mrd_linked = 1;
     919             :           //                log_debug("End of list found at entry %d. Appending mrd to the desriptor at %p", free_item, mrdq);
     920             :         }
     921             :         else
     922             :         {
     923             :           mrdn = (struct netio_tcp_send_item *)mrdq->next_element;
     924             :           //                log_debug("Item = %d. %p points at %p", free_item, mrdq, mrdn);
     925             :           free_item++;
     926      290370 :           mrdq = mrdn;
     927             :         }
     928             :       }
     929      290370 :       while(!mrd_linked);
     930             :     }
     931             : 
     932             :     //    log_debug("Calling netio_signal_fire for signal at %p", &socket->tcp_signal);
     933        3489 :     netio_signal_fire(&socket->tcp_signal);
     934             : 
     935             :     //    log_info("done");
     936        3489 :     return(NETIO_STATUS_OK);
     937             :   }
     938             : 
     939             : }
     940             : 
     941             : 
     942             : /**
     943             :  * Sends a partial buffer with immediate data over a connected unbuffered send socket.
     944             :  *
     945             :  * @param socket: A connected, unbuffered send socket
     946             :  * @param buffer: A registered send buffer
     947             :  * @param addr: Pointer to message within the buffer
     948             :  * @param size: Size of the message
     949             :  * @param key: Message key used to track the message progress.
     950             :  * @param imm: Immediate data, up to 8 byte (size is implementation-dependent)
     951             :  */
     952             : int
     953           0 : netio_send_imm(struct netio_send_socket* socket, struct netio_buffer* buf, void* addr, size_t size, uint64_t key, uint64_t imm)
     954             : {
     955           0 :   struct iovec iov;
     956           0 :   void* desc;
     957             : 
     958           0 :   iov.iov_base = addr;
     959           0 :   iov.iov_len = size;
     960           0 :   desc = fi_mr_desc(buf->mr);
     961             : 
     962           0 :   return generic_sendmsg(socket, /* struct netio_send_socket* socket */
     963             :                          &iov,   /* struct iovec* iov */
     964             :                          &desc,  /* void** desc */
     965             :                          1,      /* size_t count */
     966             :                          key,    /* uint64_t key */
     967             :                          FI_REMOTE_CQ_DATA,      /* uint64_t add_flags */
     968             :                          imm     /* uint64_t imm */
     969             :          );
     970             : }
     971             : 
     972             : 
     973             : /**
     974             :  * Sends a partial buffer data over a connected unbuffered send socket.
     975             :  *
     976             :  * @param socket: A connected, unbuffered send socket
     977             :  * @param buffer: A vector of registered send buffers
     978             :  * @param iov: Scatter/gather buffer describing message within the buffers
     979             :  * @param count: Size of the scatter/gather vector
     980             :  * @param key: Message key used to track the message progress.
     981             :  */
     982             : int
     983           0 : netio_sendv(struct netio_send_socket* socket, struct netio_buffer** buf, struct iovec* iov, size_t count, uint64_t key)
     984             : {
     985           0 :   void* descarray[NETIO_MAX_IOV_LEN];
     986             : 
     987           0 :   for(unsigned i=0; i<count; i++) {
     988           0 :     descarray[i] = fi_mr_desc(buf[i]->mr);
     989             :   }
     990             : 
     991           0 :   return generic_sendmsg(socket, /* struct netio_send_socket* socket */
     992             :                          iov,    /* struct iovec* iov */
     993             :                          descarray,  /* void** desc */
     994             :                          count,      /* size_t count */
     995             :                          key,    /* uint64_t key */
     996             :                          0,      /* uint64_t add_flags */
     997             :                          0       /* uint64_t imm */
     998             :          );
     999             : }
    1000             : 
    1001             : 
    1002             : /**
    1003             :  * Sends a partial buffer with immediate data over a connected unbuffered send socket.
    1004             :  *
    1005             :  * @param socket: A connected, unbuffered send socket
    1006             :  * @param buffer: A vector of registered send buffers
    1007             :  * @param iov: Scatter/gather buffer describing message within the buffers
    1008             :  * @param count: Size of the scatter/gather vector
    1009             :  * @param key: Message key used to track the message progress.
    1010             :  * @param imm: Immediate data, up to 8 byte (size is implementation-dependent)
    1011             :  */
    1012             : int
    1013    11306030 : netio_sendv_imm(struct netio_send_socket* socket, struct netio_buffer** buf, struct iovec* iov, size_t count, uint64_t key, uint64_t imm)
    1014             : {
    1015    11306030 :   void* descarray[NETIO_MAX_IOV_LEN];
    1016             : 
    1017    66118099 :   for(unsigned i=0; i<count; i++) {
    1018    54812069 :     descarray[i] = fi_mr_desc(buf[i]->mr);
    1019             :   }
    1020             : 
    1021    11306030 :   return generic_sendmsg(socket, /* struct netio_send_socket* socket */
    1022             :                          iov,    /* struct iovec* iov */
    1023             :                          descarray,  /* void** desc */
    1024             :                          count,      /* size_t count */
    1025             :                          key,    /* uint64_t key */
    1026             :                          FI_REMOTE_CQ_DATA,      /* uint64_t add_flags */
    1027             :                          imm     /* uint64_t imm */
    1028             :          );
    1029             : }

Generated by: LCOV version 1.0