LCOV - code coverage report
Current view: top level - netio-next/src - netio.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 249 347 71.8 %
Date: 2025-06-10 03:23:28 Functions: 21 26 80.8 %

          Line data    Source code
       1             : #include <unistd.h>
       2             : #include <stdio.h>
       3             : #include <sys/uio.h>
       4             : #include <string.h>
       5             : #include "netio/netio.h"
       6             : #include "connection_event.h"
       7             : #include "completion_event.h"
       8             : #include <sys/socket.h>
       9             : #include <netinet/in.h>
      10             : #include <arpa/inet.h>
      11             : #include "log.h"
      12             : 
      13             : #if defined DEBUG || defined DEBUG_IO
      14             : #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
      15             : #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
      16             : #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
      17             : #else
      18             : #define log_dbg(...)
      19             : #define log_trc(...)
      20             : #endif
      21             : 
      22             : #define ON_ERROR_CONNECTION_REFUSED(s, msg, c) \
      23             : do { \
      24             :   s->fi_errno = -c; \
      25             :   s->fi_message = strdup(msg); \
      26             :         netio_error_connection_refused_fire(s); \
      27             : } while(0);
      28             : 
      29             : #define ON_ERROR_BIND_REFUSED(s, msg, c) \
      30             : do { \
      31             :   s->fi_errno = -c; \
      32             :   s->fi_message = strdup(msg); \
      33             :         netio_error_bind_refused_fire(s); \
      34             : } while(0);
      35             : 
      36             : 
      37             : // STATIC FUNCTIONS ////////////////////////////////////////////////////////////
      38             : 
      39             : static int
      40        1180 : _socket_init_info(struct netio_send_socket* socket, const char* hostname, unsigned port, void* addr, size_t addrlen)
      41             : {
      42        1180 :   log_dbg("Going trough _socket_init_info");
      43        1180 :   int ret=0;
      44        1180 :   struct fi_info* hints;
      45             : 
      46        1180 :   hints = fi_allocinfo();
      47        1180 :   hints->addr_format = FI_FORMAT_UNSPEC;
      48        1180 :   hints->ep_attr->type  = FI_EP_MSG;
      49        1180 :   hints->caps = FI_MSG;
      50        1180 :   hints->mode   = FI_LOCAL_MR;
      51             :   // As of libfabric 1.10, the tcp provider only support FI_PROGRESS_MANUAL
      52             :   // So the following will not allow the tcp provider to be used
      53        1180 :   hints->domain_attr->data_progress = FI_PROGRESS_AUTO;
      54        1180 :   hints->domain_attr->resource_mgmt = FI_RM_ENABLED;
      55             : 
      56        1180 :   char port_addr[32];
      57        1180 :   snprintf(port_addr, 32, "%u", port);
      58        1180 :   log_dbg("connecting to endpoint %s:%u", hostname, port);
      59             : 
      60        1180 :   uint64_t flags = 0;
      61        1180 :   if(hostname == NULL) {
      62          75 :     hostname = "127.0.0.1";
      63          75 :     flags = FI_SOURCE;
      64             :   }
      65             : 
      66        1180 :   if(addr) {
      67             :     // TODO we have to convert the sockaddr_in into hostname port - why can't we directly supply the addr+addrlen in hints->dest_addr?
      68          75 :     struct sockaddr_in* sockaddr = (struct sockaddr_in*)addr;
      69          75 :     char* str_addr = inet_ntoa(sockaddr->sin_addr);
      70          75 :     log_dbg("sockaddr: %s:%d", str_addr, ntohs(sockaddr->sin_port));
      71             : 
      72          75 :     hostname  = str_addr;
      73          75 :     snprintf(port_addr, 32, "%d", ntohs(sockaddr->sin_port));
      74          75 :     flags = 0;
      75             :   }
      76             : 
      77        1180 :   if((ret = fi_getinfo(FI_VERSION(1, 1), hostname, port_addr, flags, hints, &socket->fi)))
      78             :   {
      79           8 :     fi_freeinfo(hints);
      80           8 :     log_error("Failed to initialise socket, error %d: %s", ret, fi_strerror(-ret));
      81           8 :     ON_ERROR_CONNECTION_REFUSED(socket, "fi_getinfo failed", ret);
      82           8 :     return -1;
      83             :   }
      84             : 
      85        1172 :   log_trc("addr format: %x", socket->fi->addr_format);
      86        1172 :   log_dbg("fi_freeinfo");
      87        1172 :   fi_freeinfo(hints);
      88        1172 :   return 0;
      89             : }
      90             : 
      91             : 
      92             : static int
      93        1167 : _socket_init_domain(struct netio_send_socket* socket, const char* hostname, unsigned port, void* addr, size_t addrlen)
      94             : {
      95        1167 :   int ret=0;
      96        1167 :   struct netio_domain *domain = malloc(sizeof(struct netio_domain));
      97        1167 :   domain->reg_mr = 0;
      98        1167 :   domain->mr = malloc(NETIO_DOMAIN_MAX_MR*sizeof(struct fid*));
      99        1167 :   domain->nb_sockets = 1;
     100        1167 :   socket->domain = domain;
     101             : 
     102        1167 :   if((ret = fi_fabric(socket->fi->fabric_attr, &domain->fabric, NULL)))
     103             :   {
     104           0 :     log_error("Failed to initialise fabric, error %d: %s", ret, fi_strerror(-ret));
     105           0 :     ON_ERROR_CONNECTION_REFUSED(socket, "fi_fabric failed", ret);
     106           0 :     return -1;
     107             :   }
     108             : 
     109        1167 :   if((ret = fi_domain(domain->fabric, socket->fi, &domain->domain, NULL)))
     110             :   {
     111           0 :     log_error("Failed to initialise domain, error %d: %s", ret, fi_strerror(-ret));
     112           0 :     ON_ERROR_CONNECTION_REFUSED(socket, "fi_domain failed", ret);
     113           0 :     return -1;
     114             :   }
     115             :   return 0;
     116             : }
     117             : 
     118             : static int
     119        1172 : _socket_connect(struct netio_send_socket* socket, const char* hostname, unsigned port, void* addr, size_t addrlen)
     120             : {
     121        1172 :   int ret=0;
     122        1172 :   struct fi_eq_attr eq_attr;
     123        1172 :   eq_attr.wait_obj = FI_WAIT_FD;
     124             : 
     125             :   //Resources initialisation 
     126        1172 :   socket->eqfd = -1;
     127        1172 :   socket->cqfd = -1;
     128        1172 :   socket->ep = NULL;
     129        1172 :   socket->eq = NULL;
     130        1172 :   socket->cq = NULL;
     131             : 
     132        1172 :   if((ret = fi_eq_open(socket->domain->fabric, &eq_attr, &socket->eq, NULL)))
     133             :   {
     134           0 :     log_error("Failed to open Event Queue for send socket, error %d: %s", ret, fi_strerror(-ret));
     135           0 :     ON_ERROR_CONNECTION_REFUSED(socket, "fi_eq_open failed", ret);
     136           0 :     return -1;
     137             :   }
     138             : 
     139        1172 :   if((ret = fi_endpoint(socket->domain->domain, socket->fi, &socket->ep, NULL)))
     140             :   {
     141           0 :     log_error("Failed to open Endpoint for send socket, error %d: %s", ret, fi_strerror(-ret));
     142           0 :     ON_ERROR_CONNECTION_REFUSED(socket, "fi_endpoint failed", ret);
     143           0 :     return -1;
     144             :   }
     145             : 
     146        1172 :   if((ret = fi_ep_bind((socket->ep), &socket->eq->fid, 0)))
     147             :   {
     148           0 :     log_error("Failed to bind endpoint, error %d: %s", ret, fi_strerror(-ret));
     149           0 :     ON_ERROR_CONNECTION_REFUSED(socket, "fi_ep_bind failed", ret);
     150           0 :     return -1;
     151             :   }
     152             : 
     153        1172 :   struct fi_cq_attr cq_attr;
     154        1172 :   cq_attr.size = NETIO_MAX_CQ_ENTRIES;      /* # entries for CQ */
     155        1172 :   cq_attr.flags = 0;     /* operation flags */
     156        1172 :   cq_attr.format = FI_CQ_FORMAT_DATA; //FI_CQ_FORMAT_CONTEXT;    /* completion format */
     157        1172 :   cq_attr.wait_obj= FI_WAIT_FD;  /* requested wait object */
     158        1172 :   cq_attr.signaling_vector = 0; /* interrupt affinity */
     159        1172 :   cq_attr.wait_cond = FI_CQ_COND_NONE; /* wait condition format */ // The threshold indicates the number of entries that are to be queued before at the CQ before the wait is satisfied.
     160        1172 :   cq_attr.wait_set = NULL;  /* optional wait set */
     161             : 
     162             :   //FI_TRANSMIT CQ
     163        1172 :   if((ret = fi_cq_open(socket->domain->domain, &cq_attr, &socket->cq, NULL)) != 0)
     164             :   {
     165           0 :     log_error("Failed to open Completion Queue for send socket, error %d: %s", ret, fi_strerror(-ret));
     166           0 :     ON_ERROR_CONNECTION_REFUSED(socket, "fi_cq_open failed", ret);
     167           0 :     return -1;
     168             :   }
     169             : 
     170        1172 :   if((ret = fi_ep_bind((socket->ep), &socket->cq->fid, FI_TRANSMIT)) != 0)
     171             :   {
     172           0 :     log_error("Failed to open Completion Queue for send socket, error %d: %s", ret, fi_strerror(-ret));
     173           0 :     ON_ERROR_CONNECTION_REFUSED(socket, "fi_ep_bind failed", ret);
     174           0 :     return -1;
     175             :   }
     176             : 
     177             :   //FI_RECV CQ - also necessary
     178        1172 :   cq_attr.format = FI_CQ_FORMAT_UNSPEC;
     179        1172 :   cq_attr.wait_obj= FI_WAIT_NONE;
     180        1172 :   if((ret = fi_cq_open(socket->domain->domain, &cq_attr, &socket->rcq, NULL)) != 0)
     181             :   {
     182           0 :     log_error("Failed to open Completion Queue for send socket, error %d: %s", ret, fi_strerror(-ret));
     183           0 :     ON_ERROR_CONNECTION_REFUSED(socket, "fi_cq_open failed", ret);
     184           0 :     return -1;
     185             :   }
     186             : 
     187        1172 :   if((ret = fi_ep_bind((socket->ep), &socket->rcq->fid, FI_RECV)) != 0)
     188             :   {
     189           0 :     log_error("Failed to open Completion Queue for send socket, error %d: %s", ret, fi_strerror(-ret));
     190           0 :     ON_ERROR_CONNECTION_REFUSED(socket, "fi_ep_bind failed", ret);
     191           0 :     return -1;
     192             :   }
     193             : 
     194             : 
     195        1172 :   if((ret = fi_enable(socket->ep)) != 0)
     196             :   {
     197           0 :     log_error("Failed to enable endpoint for send socket, error %d: %s", ret, fi_strerror(-ret));
     198           0 :     ON_ERROR_CONNECTION_REFUSED(socket, "fi_enable failed", ret);
     199           0 :     return -1;
     200             :   }
     201             : 
     202             :   /* Connect to server */
     203        1172 :   if((ret = fi_connect(socket->ep, socket->fi->dest_addr, NULL, 0)) != 0)
     204             :   {
     205           8 :     log_warn("Connection to remote failed, error %d: %s", ret, fi_strerror(-ret));
     206           8 :     ON_ERROR_CONNECTION_REFUSED(socket, "fi_connect failed", ret);
     207           8 :     return -1;
     208             :   }
     209             : 
     210        1164 :   if((ret = fi_control(&socket->eq->fid, FI_GETWAIT, &socket->eqfd)) != 0)
     211             :   {
     212           0 :     log_error("Cannot retrieve the Event Queue wait object of send socket, error %d: %s", ret, fi_strerror(-ret));
     213           0 :     ON_ERROR_CONNECTION_REFUSED(socket, "fi_control failed", ret);
     214           0 :     return -1;
     215             :   }
     216             : 
     217        1164 :   socket->eq_ev_ctx.fd = socket->eqfd;
     218        1164 :   socket->eq_ev_ctx.data = socket;
     219        1164 :   socket->eq_ev_ctx.cb = on_send_socket_cm_event;
     220             : 
     221        1164 :   log_dbg("Adding SEND EQ polled fid %d %p \n", socket->eqfd, &socket->eq->fid);
     222        1164 :   add_polled_fid(&socket->ctx->evloop.pfids, socket->domain->fabric, &socket->eq->fid, socket->eqfd, socket, on_send_socket_cm_event);
     223        1164 :   add_open_fd(&socket->ctx->evloop.openfds, socket->eqfd, NETIO_EQ, USEND, socket);
     224        1164 :   netio_register_read_fd(&socket->ctx->evloop, &socket->eq_ev_ctx);
     225        1164 :   log_dbg("send_socket: EQ fd %d waiting for connection", socket->eqfd);
     226             : 
     227        1164 :   return 0;
     228             : }
     229             : 
     230             : 
     231             : // API FUNCTIONS ///////////////////////////////////////////////////////////////
     232             : 
     233             : /**
     234             :  * Configures the debug level.
     235             :  *
     236             :  * @param level: The debug level, an integer ranging from 0 (TRACE) to 5 (FATAL)
     237             :  */
     238             : void
     239           0 : netio_set_debug_level(int level)
     240             : {
     241           0 :   log_set_level(level);
     242           0 : }
     243             : 
     244             : /**
     245             :  * Initializes the eventloop first resetting the context passed as argument.
     246             :  *
     247             :  * @param ctx: The netio context
     248             :  */
     249             : void
     250         915 : netio_init(struct netio_context* ctx)
     251             : {
     252         915 :   log_set_level(DEFAULT_DEBUG_LEVEL);
     253         915 :   memset(ctx, 0, sizeof(*ctx));
     254         915 :   netio_eventloop_init(&ctx->evloop);
     255         915 : }
     256             : 
     257             : 
     258             : /**
     259             :  * Initializes an unbuffered send socket.
     260             :  *
     261             :  * @param socket: The socket to intialize
     262             :  * @param ctx: The netio context
     263             :  */
     264             : void
     265        1180 : netio_init_send_socket(struct netio_send_socket* socket, struct netio_context* ctx)
     266             : {
     267        1180 :   memset(socket, 0, sizeof(*socket));
     268        1180 :   socket->ctx = ctx;
     269        1180 :   socket->epollfd = socket->ctx->evloop.epollfd;
     270        1180 :   socket->state = UNCONNECTED;
     271        1180 :   socket->cq_size = NETIO_MAX_CQ_EVENTS;
     272        1180 :   socket->unbuf_pub_socket = NULL;
     273        1180 :   socket->cb_internal_connection_closed = NULL;
     274        1180 :   socket->deferred_subs = NULL;
     275        1180 :   socket->recv_socket = NULL;
     276        1180 : }
     277             : 
     278             : 
     279             : /**
     280             :  * Initializes an unbuffered listen socket.
     281             :  *
     282             :  * @param socket: The socket to intialize
     283             :  * @param ctx: The netio context
     284             :  */
     285             : void
     286         646 : netio_init_listen_socket(struct netio_listen_socket* socket, struct netio_context* ctx, struct netio_unbuffered_socket_attr* attr)
     287             : {
     288         646 :   memset(socket, 0, sizeof(*socket));
     289         646 :   socket->ctx = ctx;
     290         646 :   socket->recv_sockets = NULL;
     291         646 :   if (attr == NULL){
     292             :     socket->attr.buffer_size = 0;
     293             :     socket->attr.num_buffers = 0;
     294             :   } else {
     295         184 :     if (attr->num_buffers > NETIO_DOMAIN_MAX_MR){
     296           0 :       log_error("Number of recv buffers requested %lu exceeds max value. Using %lu.", attr->num_buffers, NETIO_DOMAIN_MAX_MR);
     297           0 :       attr->num_buffers = NETIO_DOMAIN_MAX_MR;
     298             :     }
     299         184 :     socket->attr = *attr;
     300             :   }
     301         646 : }
     302             : 
     303             : 
     304             : /**
     305             :  * Initializes an unbuffered recv socket.
     306             :  *
     307             :  * @param socket: The socket to intialize
     308             :  * @param ctx: The netio context
     309             :  */
     310             : void
     311         463 : netio_init_recv_socket(struct netio_recv_socket* socket, struct netio_listen_socket* lsocket)
     312             : {
     313         463 :   memset(socket, 0, sizeof(*socket));
     314         463 :   socket->ctx = lsocket->ctx;
     315         463 :   socket->lsocket = lsocket;
     316         463 :   socket->reg_mr = 0;
     317         463 :   socket->cq_size = NETIO_MAX_CQ_EVENTS;
     318         463 :   socket->sub_msg_buffers = NULL;
     319         463 :   socket->mr = malloc(NETIO_DOMAIN_MAX_MR*sizeof(struct fid*));
     320         463 : }
     321             : 
     322             : 
     323             : void
     324        1105 : netio_connect(struct netio_send_socket* socket, const char* hostname, unsigned port)
     325             : {
     326        1105 :   netio_connect_domain(socket, hostname, port, NULL);
     327        1105 : }
     328             : 
     329             : /**
     330             :  * Connect an unbuffered send socket to a remote endpoint.
     331             :  *
     332             :  * @param socket: An unbuffered send socket
     333             :  * @param hostname: Hostname or IP address of the remote endpoint
     334             :  * @param port: Port of the remote endpoint
     335             :  * @param domain: an already existing and initialized netio_domain that this connect will use instead of creating its own
     336             :  *        Obviously this can only work if the socket's actual domain (physical NIC) would refer to the same as domain's
     337             :  */
     338             : void
     339        1105 : netio_connect_domain(struct netio_send_socket* socket, const char* hostname, unsigned port, struct netio_domain* domain)
     340             : {
     341        1105 :   log_dbg("_socket_init_info");
     342        1105 :   if ((_socket_init_info(socket, hostname, port, NULL, 0))) return;
     343        1097 :   if (domain == NULL) {
     344        1097 :     log_dbg("_socket_init_domain in netio_connect_domain");
     345        1097 :     if ((_socket_init_domain(socket, hostname, port, NULL, 0))) return;
     346             :   } else {
     347           0 :     domain->nb_sockets += 1;
     348           0 :     socket->domain = domain;
     349             :   }
     350        1097 :   _socket_connect(socket, hostname, port, NULL, 0);
     351             : }
     352             : 
     353             : 
     354          43 : void netio_connect_rawaddr(struct netio_send_socket* socket, void* addr, size_t addrlen)
     355             : {
     356          43 :   netio_connect_rawaddr_domain(socket, addr, addrlen, NULL);
     357          43 : }
     358             : 
     359             : 
     360             : /**
     361             :  * Same as netio_connect_domain but using the ip address and port
     362             :  *
     363             :  * @param socket: An unbuffered send socket
     364             :  * @param addr: IP and port of the remote endpoint
     365             :  * @param addrlen: lenght of the address
     366             :  * @param domain: an already existing and initialized netio_domain that this connect will use instead of creating its own
     367             :  *        Obviously this can only work if the socket's actual domain (physical NIC) would refer to the same as domain's
     368             :  */
     369          75 : void netio_connect_rawaddr_domain(struct netio_send_socket* socket, void* addr, size_t addrlen, struct netio_domain* domain)
     370             : {
     371          75 :   log_dbg("_socket_init_info");
     372          75 :   if ((_socket_init_info(socket, NULL, 0, addr, addrlen))) return;
     373          75 :   if (domain == NULL) {
     374          70 :     log_dbg("_socket_init_domain in netio_connect_rawaddr_domain");
     375          70 :     if ((_socket_init_domain(socket, NULL, 0, addr, addrlen))) return;
     376             :   } else {
     377           5 :     log_dbg("socket %p keeping domain %p with %d sockets", socket, domain, domain->nb_sockets);
     378           5 :     domain->nb_sockets += 1;
     379           5 :     socket->domain = domain;
     380             :   }
     381          75 :   _socket_connect(socket, NULL, 0, addr, addrlen);
     382             : }
     383             : 
     384             : 
     385             : /**
     386             :  * Disconnect a connected unbuffered send socket.
     387             :  *
     388             :  * @param socket: A connected unbuffered send socket
     389             :  */
     390             : void
     391         767 : netio_disconnect(struct netio_send_socket* socket)
     392             : {
     393         767 :   int ret=0;
     394         767 :   if(!socket->ep){
     395             :     log_dbg("Send socket %p EP not initialised, not calling fi_shutdown.", socket);
     396             :     return;
     397             :   }
     398         767 :   if((ret = fi_shutdown(socket->ep, 0)))
     399             :   {
     400           0 :     log_error("Failed to shutdown send_socket, error %d: %s", ret, fi_strerror(-ret));
     401           0 :     ON_ERROR_CONNECTION_REFUSED(socket, "fi_shutdown failed", ret);
     402           0 :     return;
     403             :   }
     404             : }
     405             : 
     406             : 
     407             : 
     408             : void
     409           0 : netio_connection_shutdown(void* ptr)
     410             : {
     411           0 :   log_dbg("Handle_connection_shutdown.");
     412           0 :   struct signal_data* sd = (struct signal_data*)ptr;
     413           0 :   struct netio_send_socket* socket = (struct netio_send_socket*)sd->ptr;
     414           0 :   int ret=0;
     415           0 :   if(!socket->ep){
     416             :     log_dbg("Send socket %p EP not initialised, not calling fi_shutdown.", socket);
     417             :     return;
     418             :   }
     419           0 :   if((ret = fi_shutdown(socket->ep, 0)))
     420             :   {
     421           0 :     ON_ERROR_CONNECTION_REFUSED(socket, "fi_shutdown failed", ret);
     422           0 :     return;
     423             :   }
     424             : 
     425             :   //clean up signal
     426           0 :   netio_signal_close(sd->evloop, sd->signal);
     427           0 :   free(sd->signal);
     428           0 :   free(sd);
     429             : }
     430             : 
     431             : 
     432             : /**
     433             :  * Bind an unbuffered listen socket to an endpoint and listen for incoming connections.
     434             :  *
     435             :  * @param socket: An unbuffered listen socket
     436             :  * @param hostname: Hostname or IP address of an endpoint
     437             :  * @param port: A port number to listen on
     438             :  */
     439             : void
     440         335 : netio_listen(struct netio_listen_socket* socket, const char* hostname, unsigned port)
     441             : {
     442         335 :   int ret=0;
     443         335 :   struct fi_info* hints;
     444         335 :   struct fi_eq_attr eq_attr;
     445         335 :   eq_attr.wait_obj = FI_WAIT_FD;
     446             : 
     447         335 :   hints = fi_allocinfo();
     448         335 :   hints->addr_format = FI_FORMAT_UNSPEC;
     449         335 :   hints->ep_attr->type  = FI_EP_MSG;
     450         335 :   hints->caps = FI_MSG;
     451         335 :   hints->mode = FI_LOCAL_MR;
     452         335 :   char port_addr[32];
     453         335 :   snprintf(port_addr, 32, "%u", port);
     454             : 
     455             :   //Resource initialisation
     456         335 :   socket->eqfd = -1;
     457         335 :   socket->pep = NULL;
     458         335 :   socket->eq = NULL;
     459         335 :   socket->fi = NULL;
     460             : 
     461         335 :   if((ret = fi_getinfo(FI_VERSION(1, 1), hostname, port_addr, FI_SOURCE, hints,
     462             :                        &socket->fi)))
     463             :   {
     464           4 :     log_error("Failed to get info on local interface, error %d: %s", ret, fi_strerror(-ret));
     465           4 :     ON_ERROR_BIND_REFUSED(socket, "fi_getinfo failed", ret);
     466          23 :     return;
     467             :   }
     468         331 :   log_dbg("addr format: %x", socket->fi->addr_format);
     469             : 
     470         331 :   if((ret = fi_fabric(socket->fi->fabric_attr, &socket->fabric, NULL)))
     471             :   {
     472           0 :     log_error("Failed to open fabric for listen socket, error %d: %s", ret, fi_strerror(-ret));
     473           0 :     ON_ERROR_BIND_REFUSED(socket, "fi_fabric failed", ret);
     474           0 :     return;
     475             :   }
     476         331 :   if((ret = fi_eq_open(socket->fabric, &eq_attr, &socket->eq, NULL)))
     477             :   {
     478           0 :     log_error("Failed to open Event Queue for listen socket, error %d: %s", ret, fi_strerror(-ret));
     479           0 :     ON_ERROR_BIND_REFUSED(socket, "fi_eq_open failed", ret);
     480           0 :     return;
     481             :   }
     482             : 
     483         331 :   if((ret = fi_passive_ep(socket->fabric, socket->fi, &socket->pep, NULL)))
     484             :   {
     485           0 :     log_error("Failed to open passive endpoint for listen socket, error %d: %s", ret, fi_strerror(-ret));
     486           0 :     ON_ERROR_BIND_REFUSED(socket, "fi_passive_ep failed", ret);
     487           0 :     return;
     488             :   }
     489             : 
     490         331 :   if((ret = fi_pep_bind(socket->pep, &socket->eq->fid, 0)))
     491             :   {
     492           0 :     log_error("Failed to bind passive endpoint for listen socket, error %d: %s", ret, fi_strerror(-ret));
     493           0 :     ON_ERROR_BIND_REFUSED(socket, "fi_pep_bind failed", ret);
     494           0 :     return;
     495             :   }
     496             : 
     497         331 :   if((ret = fi_listen(socket->pep)))
     498             :   {
     499          15 :     log_error("Failed to enable listen socket, error %d: %s", ret, fi_strerror(-ret));
     500          15 :     ON_ERROR_BIND_REFUSED(socket, "fi_listen failed", ret);
     501          15 :     return;
     502             :   }
     503             : 
     504         316 :   if((ret = fi_control(&socket->eq->fid, FI_GETWAIT, &socket->eqfd)))
     505             :   {
     506           0 :     log_error("Failed to retrive listen socket Event Queue wait object, error %d: %s", ret, fi_strerror(-ret));
     507           0 :     ON_ERROR_BIND_REFUSED(socket, "fi_control failed", ret);
     508           0 :     return;
     509             :   }
     510             : 
     511         316 :   socket->eq_ev_ctx.fd = socket->eqfd;
     512         316 :   socket->eq_ev_ctx.data = socket;
     513         316 :   socket->eq_ev_ctx.cb = on_listen_socket_cm_event;
     514             : 
     515             :   //TODO: The listen EQ should be added too, but I don't know how to remove it to avoid a trywait SEGFAULT.
     516             :   //printf("Adding LISTEN EQ polled fid %d %p \n", socket->eqfd, &socket->eq->fid);
     517             :   //add_polled_fid(&socket->ctx->evloop.pfids, socket->fabric, &socket->eq->fid, socket->eqfd, socket, on_listen_socket_cm_event);
     518         316 :   add_open_fd(&socket->ctx->evloop.openfds, socket->eqfd, NETIO_EQ, ULISTEN, socket);
     519             : 
     520         316 :   netio_register_read_fd(&socket->ctx->evloop, &socket->eq_ev_ctx);
     521         316 :   log_dbg("netio_listen_socket: registering EQ fd %d", socket->eqfd);
     522         316 :   fi_freeinfo(hints);
     523             : }
     524             : 
     525             : 
     526             : /**
     527             :  * Retrieve the local endpoint address and store it in the sockaddr_storage sa.
     528             :  * The function returns the lenght of the address.
     529             :  *
     530             :  * @param socket: An unbuffered listen socket
     531             :  * @param sa: structure storing the local endpoint address
     532             :  */
     533          20 : size_t netio_listen_socket_endpoint(struct netio_listen_socket* socket, struct sockaddr_storage *sa)
     534             : {
     535          20 :   size_t addrlen;
     536             :   //memset(sa, 0, sizeof(*sa));
     537          20 :   addrlen = sizeof(struct sockaddr_storage);
     538             : 
     539          20 :   int ret=0;
     540          20 :   if((ret = fi_getname(&socket->pep->fid, sa, &addrlen)))
     541             :   {
     542           0 :     log_error("Failed to retrieve the local endpoint address, error %d: %s", ret, fi_strerror(-ret));
     543           0 :     ON_ERROR_BIND_REFUSED(socket, "fi_getname failed", ret);
     544           0 :     return 0;
     545             :   }
     546             : 
     547          20 :   return addrlen;
     548             : }
     549             : 
     550             : 
     551             : /**
     552             :  * Post a receive buffer to an unbuffered receive socket.
     553             :  *
     554             :  * Receive buffers must be registered using `netio_register_recv_buffer`.
     555             :  *
     556             :  * @param socket: An unbuffered receive socket
     557             :  * @param buf: A registered receive buffer.
     558             :  */
     559             : void
     560    66055566 : netio_post_recv(struct netio_recv_socket* socket, struct netio_buffer* buf)
     561             : {
     562    66055566 :   int ret=0;
     563    66055566 :   struct iovec iov;
     564    66055566 :   void* desc;
     565    66055566 :   uint64_t flags;
     566             : 
     567    66055566 :   iov.iov_base = buf->data;
     568    66055566 :   iov.iov_len = buf->size;
     569    66055566 :   desc = fi_mr_desc(buf->mr);
     570             : 
     571    66055566 :   struct fi_msg msg;
     572    66055566 :   msg.msg_iov = &iov; /* scatter-gather array */
     573    66055566 :   msg.desc = &desc;
     574    66055566 :   msg.iov_count = 1;
     575    66055566 :   msg.addr = 0;
     576    66055566 :   msg.context = buf;
     577    66055566 :   msg.data = 0;
     578             : 
     579    66055566 :   flags = FI_REMOTE_CQ_DATA;//FI_MULTI_RECV;
     580             : 
     581    66055566 :   if((ret = fi_recvmsg(socket->ep, &msg, flags)) != 0)
     582             :   {
     583           0 :     log_error("Failed to post a buffer to receive inbound messages, error %d: %s", ret, fi_strerror(-ret));
     584             :   }
     585    66055600 : }
     586             : 
     587             : /**
     588             :  * Removes recv socket from recv socket list of a listen socket.
     589             :  *
     590             :  * @param socket: An unbuffered receive socket.
     591             :  */
     592             : void
     593           0 : netio_remove_recv_socket(struct netio_recv_socket* socket){
     594           0 :   struct netio_listen_socket* lsocket = socket->lsocket;
     595           0 :   int ret = remove_socket(&lsocket->recv_sockets, (void*)socket);
     596           0 :   if (ret == 0){
     597           0 :     log_info("Unbuffered connection closed, recv socket deleted.");
     598             :   } else {
     599           0 :     log_warn("Unbuffered connection closed, could not delete recv socket.");
     600             :   }
     601           0 : }
     602             : 
     603             : /**
     604             :  * Removes buffered recv socket from recv socket list of a buffered listen socket.
     605             :  *
     606             :  * @param socket: An buffered receive socket.
     607             :  */
     608             : void 
     609           0 : netio_buffered_remove_recv_socket(struct netio_buffered_recv_socket* socket){
     610           0 :   struct netio_buffered_listen_socket* lsocket = socket->lsocket;
     611           0 :   int ret = remove_socket(&(lsocket->listen_socket.recv_sockets), (void*)socket);
     612           0 :   if (ret == 0){
     613           0 :     log_info("Buffered connection closed, buffered recv socket deleted.");
     614             :   } else {
     615           0 :     log_warn("Buffered connection closed, could not delete recv socket.");
     616             :   }
     617           0 : }
     618             : 
     619             : static int
     620    37922337 : generic_sendmsg(struct netio_send_socket* socket,
     621             :                 struct iovec* iov,
     622             :                 void** desc,
     623             :                 size_t count,
     624             :                 uint64_t key,
     625             :                 uint64_t add_flags,
     626             :                 uint64_t imm)
     627             : {
     628    37922337 :   int ret=0;
     629    37922337 :   uint64_t flags;
     630             : 
     631    37922337 :   struct fi_msg msg;
     632    37922337 :   msg.msg_iov = iov; /* scatter-gather array */
     633    37922337 :   msg.desc = desc;
     634    37922337 :   msg.iov_count = count;
     635    37922337 :   msg.addr = 0;
     636    37922337 :   msg.context = (void*)key;
     637    37922337 :   msg.data = imm;
     638             : 
     639    37922337 :   log_trc("sending iov message with immediate value 0x%lx", msg.data);
     640             : 
     641    37922337 :   flags = FI_INJECT_COMPLETE | add_flags;
     642             : 
     643    37922337 :   if(!socket->ep || !socket->ep->msg){
     644           0 :     log_error("Failed sending message because of null message or null endpoint.");
     645           0 :     return NETIO_STATUS_ERROR;
     646             :   }
     647             : 
     648    37922337 :   if((ret = fi_sendmsg(socket->ep, &msg, flags)) != 0)
     649             :   {
     650           0 :       if(ret == -FI_EAGAIN) {
     651             :           return NETIO_STATUS_AGAIN;
     652             :       }
     653           0 :       log_error("Failed to send message error (IOV count %lu, key %lu) %d: %s.", count, key, ret, fi_strerror(-ret));
     654           0 :       return NETIO_STATUS_ERROR;
     655             :   }
     656             :   return NETIO_STATUS_OK;
     657             : }
     658             : 
     659             : 
     660             : /**
     661             :  * Sends a full buffer over a connected unbuffered send socket.
     662             :  *
     663             :  * @param socket: A connected, unbuffered send socket
     664             :  * @param buf: A registered send buffer
     665             :  */
     666             : int
     667     1087090 : netio_send_buffer(struct netio_send_socket* socket, struct netio_buffer* buf)
     668             : {
     669     1087090 :   return netio_send(socket, buf, buf->data, buf->size, (uint64_t)buf);
     670             : }
     671             : 
     672             : 
     673             : /**
     674             :  * Sends a buffer over a connected unbuffered send socket as inline data.
     675             :  *
     676             :  * @param socket: A connected, unbuffered send socket
     677             :  * @param buf: A buffer whose size does not exceed FI_VERBS_INLINE_SIZE (libfabric default 64 bytes)
     678             :  * The buffer is reusable as soon as the function returns as it is copied by the provider.
     679             :  * The buffer is not required to be a registered memory region as no RDMA transfer occurs.
     680             :  * FI_VERBS_INLINE_SIZE corresponds to IBV_SEND_INLINE
     681             :  */
     682             : int
     683        4777 : netio_send_inline_buffer(struct netio_send_socket* socket, struct netio_buffer* buf)
     684             : {
     685        4777 :   struct iovec iov;
     686        4777 :   void* desc;
     687             : 
     688        4777 :   iov.iov_base = buf->data;
     689        4777 :   iov.iov_len = buf->size;
     690        4777 :   desc = fi_mr_desc(buf->mr);
     691        4777 :   uint64_t key = (uint64_t)buf;
     692        4777 :   uint64_t flags = FI_INJECT;
     693             : 
     694        4777 :   return generic_sendmsg(socket, /* struct netio_send_socket* socket */
     695             :                          &iov,   /* struct iovec* iov */
     696             :                          &desc,  /* void** desc */
     697             :                          1,      /* size_t count */
     698             :                          key,    /* uint64_t key */
     699             :                          flags,      /* uint64_t add_flags */
     700             :                          0       /* uint64_t imm */
     701             :          ); 
     702             : }
     703             : 
     704             : 
     705             : /**
     706             :  * Sends a partial buffer over a connected unbuffered send socket.
     707             :  *
     708             :  * @param socket: A connected, unbuffered send socket
     709             :  * @param buffer: A registered send buffer
     710             :  * @param addr: Pointer to message within the buffer
     711             :  * @param size: Size of the message
     712             :  * @param key: Message key used to track the message progress.
     713             :  */
     714             : int
     715     1087536 : netio_send(struct netio_send_socket* socket, struct netio_buffer* buf, void* addr, size_t size, uint64_t key)
     716             : {
     717     1087536 :   struct iovec iov;
     718     1087536 :   void* desc;
     719             : 
     720     1087536 :   iov.iov_base = addr;
     721     1087536 :   iov.iov_len = size;
     722     1087536 :   desc = fi_mr_desc(buf->mr);
     723             : 
     724     1087536 :   return generic_sendmsg(socket, /* struct netio_send_socket* socket */
     725             :                          &iov,   /* struct iovec* iov */
     726             :                          &desc,  /* void** desc */
     727             :                          1,      /* size_t count */
     728             :                          key,    /* uint64_t key */
     729             :                          0,      /* uint64_t add_flags */
     730             :                          0       /* uint64_t imm */
     731             :          );
     732             : }
     733             : 
     734             : 
     735             : /**
     736             :  * Sends a partial buffer with immediate data over a connected unbuffered send socket.
     737             :  *
     738             :  * @param socket: A connected, unbuffered send socket
     739             :  * @param buffer: A registered send buffer
     740             :  * @param addr: Pointer to message within the buffer
     741             :  * @param size: Size of the message
     742             :  * @param key: Message key used to track the message progress.
     743             :  * @param imm: Immediate data, up to 8 byte (size is implementation-dependent)
     744             :  */
     745             : int
     746       20000 : netio_send_imm(struct netio_send_socket* socket, struct netio_buffer* buf, void* addr, size_t size, uint64_t key, uint64_t imm)
     747             : {
     748       20000 :   struct iovec iov;
     749       20000 :   void* desc;
     750             : 
     751       20000 :   iov.iov_base = addr;
     752       20000 :   iov.iov_len = size;
     753       20000 :   desc = fi_mr_desc(buf->mr);
     754             : 
     755       20000 :   return generic_sendmsg(socket, /* struct netio_send_socket* socket */
     756             :                          &iov,   /* struct iovec* iov */
     757             :                          &desc,  /* void** desc */
     758             :                          1,      /* size_t count */
     759             :                          key,    /* uint64_t key */
     760             :                          FI_REMOTE_CQ_DATA,      /* uint64_t add_flags */
     761             :                          imm     /* uint64_t imm */
     762             :          );
     763             : }
     764             : 
     765             : 
     766             : /**
     767             :  * Sends a partial buffer data over a connected unbuffered send socket.
     768             :  *
     769             :  * @param socket: A connected, unbuffered send socket
     770             :  * @param buffer: A vector of registered send buffers
     771             :  * @param iov: Scatter/gather buffer describing message within the buffers
     772             :  * @param count: Size of the scatter/gather vector
     773             :  * @param key: Message key used to track the message progress.
     774             :  */
     775             : int
     776           0 : netio_sendv(struct netio_send_socket* socket, struct netio_buffer** buf, struct iovec* iov, size_t count, uint64_t key)
     777             : {
     778           0 :   void* descarray[NETIO_MAX_IOV_LEN];
     779             : 
     780           0 :   for(unsigned i=0; i<count; i++) {
     781           0 :     descarray[i] = fi_mr_desc(buf[i]->mr);
     782             :   }
     783             : 
     784           0 :   return generic_sendmsg(socket, /* struct netio_send_socket* socket */
     785             :                          iov,    /* struct iovec* iov */
     786             :                          descarray,  /* void** desc */
     787             :                          count,      /* size_t count */
     788             :                          key,    /* uint64_t key */
     789             :                          0,      /* uint64_t add_flags */
     790             :                          0       /* uint64_t imm */
     791             :          );
     792             : }
     793             : 
     794             : 
     795             : /**
     796             :  * Sends a partial buffer with immediate data over a connected unbuffered send socket.
     797             :  *
     798             :  * @param socket: A connected, unbuffered send socket
     799             :  * @param buffer: A vector of registered send buffers
     800             :  * @param iov: Scatter/gather buffer describing message within the buffers
     801             :  * @param count: Size of the scatter/gather vector
     802             :  * @param key: Message key used to track the message progress.
     803             :  * @param imm: Immediate data, up to 8 byte (size is implementation-dependent)
     804             :  */
     805             : int
     806    36810024 : netio_sendv_imm(struct netio_send_socket* socket, struct netio_buffer** buf, struct iovec* iov, size_t count, uint64_t key, uint64_t imm)
     807             : {
     808    36810024 :   void* descarray[NETIO_MAX_IOV_LEN];
     809             : 
     810   232161014 :   for(unsigned i=0; i<count; i++) {
     811   195350990 :     descarray[i] = fi_mr_desc(buf[i]->mr);
     812             :   }
     813             : 
     814    36810024 :   return generic_sendmsg(socket, /* struct netio_send_socket* socket */
     815             :                          iov,    /* struct iovec* iov */
     816             :                          descarray,  /* void** desc */
     817             :                          count,      /* size_t count */
     818             :                          key,    /* uint64_t key */
     819             :                          FI_REMOTE_CQ_DATA,      /* uint64_t add_flags */
     820             :                          imm     /* uint64_t imm */
     821             :          );
     822             : }

Generated by: LCOV version 1.0