LCOV - code coverage report
Current view: top level - netio-next/src - connection_event.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 485 743 65.3 %
Date: 2025-08-12 04:15:35 Functions: 20 23 87.0 %

          Line data    Source code
       1             : #include <stdio.h>
       2             : #include <unistd.h>
       3             : #include "log.h"
       4             : #include "netio/netio.h"
       5             : #include "netio/netio_tcp.h"
       6             : #include <sys/types.h>
       7             : #include <sys/socket.h>
       8             : #include <netdb.h>
       9             : 
      10             : #include "connection_event.h"
      11             : #include "completion_event.h"
      12             : #include "log.h"
      13             : 
      14             : #if defined DEBUG || defined DEBUG_CM
      15             : #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
      16             : #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
      17             : #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
      18             : #else
      19             : #define log_dbg(...)
      20             : #define log_trc(...)
      21             : #endif
      22             : 
      23             : #define FATAL(msg, c) \
      24             : do { \
      25             :     log_fatal("%s %d: %s", msg, c, fi_strerror(-c)); \
      26             :     exit(2); \
      27             : } while(0);
      28             : 
      29             : 
      30             : // STATIC FUNCTIONS ////////////////////////////////////////////////////////////
      31             : 
      32             : static int
      33        2104 : read_cm_event(struct fid_eq* eq, struct fi_info** info, struct fi_eq_err_entry* err_entry)
      34             : {
      35        2104 :     uint32_t event;
      36        2104 :     struct fi_eq_cm_entry entry;
      37             : 
      38        2104 :     ssize_t rd = fi_eq_sread(eq, &event, &entry, sizeof entry, 0, 0);
      39        2104 :     if(rd < 0)
      40             :     {
      41         633 :         if(rd == -FI_EAGAIN)
      42             :         {
      43         538 :             return rd;
      44             :         }
      45          95 :         if(rd == -FI_EAVAIL)
      46             :         {
      47          95 :             int r;
      48          95 :             if((r = fi_eq_readerr(eq, err_entry, 0)) < 0)
      49             :             {
      50           0 :                 FATAL("Failed to retrieve details on Event Queue error", r);
      51             :             }
      52          95 :             log_error("Event Queue error: %s (code: %d), provider specific: %s (code: %d)",
      53             :                 fi_strerror(err_entry->err), err_entry->err,
      54             :                 fi_eq_strerror(eq, err_entry->prov_errno, err_entry->err_data, NULL, 0),
      55             :                 err_entry->prov_errno);
      56          95 :             return rd;
      57             :         }
      58             :     }
      59        1471 :     if (rd != sizeof entry)
      60             :     {
      61           0 :         FATAL("Failed to read from Event Queue: %d", (int)rd);
      62             :     }
      63             : 
      64        1471 :     if(info != NULL)
      65         220 :         *info = entry.info;
      66             : 
      67        1471 :     return event;
      68             : }
      69             : 
      70             : 
      71             : static void
      72         220 : handle_connreq(struct netio_recv_socket* rsocket, struct netio_listen_socket* lsocket, struct fi_info *info, void (*cb)(int,void*), void* cbdata)
      73             : {
      74         220 :     int ret;
      75         220 :     struct fi_eq_attr eq_attr = {10000, 0, FI_WAIT_FD, 0, 0};
      76             : 
      77         220 :     if((ret = fi_domain(lsocket->fabric, info, &rsocket->domain, NULL)))
      78             :     {
      79           0 :         fi_reject(lsocket->pep, info->handle, NULL, 0);
      80           0 :         FATAL("Listen socket, cannot open fabric, error ", ret);
      81             :     }
      82             : 
      83         220 :     if((ret = fi_endpoint(rsocket->domain, info, &rsocket->ep, NULL)))
      84             :     {
      85           0 :         fi_reject(lsocket->pep, info->handle, NULL, 0);
      86           0 :         FATAL("Listen socket, cannot open endpoint, error ", ret);
      87             :     }
      88             : 
      89             :     /* Create a new event queue for the new active socket */
      90         220 :     if((ret = fi_eq_open(lsocket->fabric, &eq_attr, &rsocket->eq, NULL)))
      91             :     {
      92           0 :         fi_reject(lsocket->pep, info->handle, NULL, 0);
      93           0 :         FATAL("Listen socket, cannot open Event Queue, error ", ret);
      94             :     }
      95             : 
      96         220 :     if((ret = fi_ep_bind((rsocket->ep), &rsocket->eq->fid, 0)))
      97             :     {
      98           0 :         fi_reject(lsocket->pep, info->handle, NULL, 0);
      99           0 :         FATAL("Listen socket, cannot bind Event Queue to endpoint, error ", ret);
     100             :     }
     101             : 
     102         220 :     if((ret = fi_control(&rsocket->eq->fid, FI_GETWAIT, &rsocket->eqfd)))
     103             :     {
     104           0 :         fi_reject(lsocket->pep, info->handle, NULL, 0);
     105           0 :         FATAL("Listen socket failed to obtain Event Queue wait object", ret);
     106             :     }
     107         220 :     rsocket->eq_ev_ctx.fd = rsocket->eqfd;
     108         220 :     rsocket->eq_ev_ctx.data = cbdata;
     109         220 :     rsocket->eq_ev_ctx.cb = cb;
     110             : 
     111         220 :     log_dbg("Adding RECV EQ polled fid %d %p", rsocket->eqfd, &rsocket->eq->fid);
     112         220 :     add_polled_fid(&rsocket->ctx->evloop.pfids, lsocket->fabric, &rsocket->eq->fid, rsocket->eqfd, &rsocket, cb);
     113         220 :     add_open_fd(&rsocket->ctx->evloop.openfds, rsocket->eqfd, NETIO_EQ, URECV, &rsocket);
     114         220 :     netio_register_read_fd(&rsocket->ctx->evloop, &rsocket->eq_ev_ctx);
     115         220 :     log_dbg("recv_socket: EQ fd %d waiting for connection", rsocket->eqfd);
     116             : 
     117         220 :     struct fi_cq_attr cq_attr;
     118         220 :     cq_attr.size = NETIO_MAX_CQ_ENTRIES;      /* # entries for CQ */
     119         220 :     cq_attr.flags = 0;     /* operation flags */
     120         220 :     cq_attr.format = FI_CQ_FORMAT_DATA;    /* completion format */
     121         220 :     cq_attr.wait_obj= FI_WAIT_FD;  /* requested wait object */
     122         220 :     cq_attr.signaling_vector = 0; /* interrupt affinity */
     123         220 :     cq_attr.wait_cond = FI_CQ_COND_NONE; /* wait condition format */
     124         220 :     cq_attr.wait_set = NULL;  /* optional wait set */
     125             : 
     126             :     //FI_RECV CQ
     127         220 :     if((ret = fi_cq_open(rsocket->domain, &cq_attr, &rsocket->cq, NULL)))
     128             :     {
     129           0 :         fi_reject(lsocket->pep, info->handle, NULL, 0);
     130           0 :         FATAL("Listen socket, cannot open Completion Queue, error ", ret);
     131             :     }
     132             : 
     133         220 :     if((ret = fi_ep_bind((rsocket->ep), &rsocket->cq->fid, FI_RECV)))
     134             :     {
     135           0 :         fi_reject(lsocket->pep, info->handle, NULL, 0);
     136           0 :         FATAL("Listen socket, cannot bind Completion Queue to endpoint, error ", ret);;
     137             :     }
     138             : 
     139             :     //FI_TRANSMIT CQ - also necessary
     140         220 :     cq_attr.format = FI_CQ_FORMAT_UNSPEC;
     141         220 :     cq_attr.wait_obj= FI_WAIT_NONE;
     142         220 :     if((ret = fi_cq_open(rsocket->domain, &cq_attr, &rsocket->tcq, NULL)))
     143             :     {
     144           0 :         fi_reject(lsocket->pep, info->handle, NULL, 0);
     145           0 :         FATAL("Listen socket, cannot open Completion Queue, error ", ret);
     146             :     }
     147             : 
     148         220 :     if((ret = fi_ep_bind((rsocket->ep), &rsocket->tcq->fid, FI_TRANSMIT)))
     149             :     {
     150           0 :         fi_reject(lsocket->pep, info->handle, NULL, 0);
     151           0 :         FATAL("Listen socket, cannot bind Completion Queue to endpoint, error ", ret);;
     152             :     }
     153             : 
     154         220 :     if((ret = fi_enable(rsocket->ep)))
     155             :     {
     156           0 :         fi_reject(lsocket->pep, info->handle, NULL, 0);
     157           0 :         FATAL("Listen socket, cannot enable recv socket endpoint, error ", ret);
     158             :     }
     159             : 
     160         220 :     if((ret = fi_accept(rsocket->ep, NULL, 0)))
     161             :     {
     162           0 :         fi_reject(lsocket->pep, info->handle, NULL, 0);
     163           0 :         FATAL("Listen socket, connection rejected, error ", ret);;
     164             :     }
     165         220 :     log_dbg("connection accepted. Lsocket EQ: %d with evloop %d, rsocket EQ %d with evloop %d", lsocket->eqfd, lsocket->ctx->evloop.epollfd, rsocket->eqfd, rsocket->ctx->evloop.epollfd);
     166         220 : }
     167             : 
     168             : 
     169             : void
     170         172 : handle_recv_socket_shutdown(struct netio_recv_socket* socket)
     171             : {
     172         172 :     if(socket->eqfd < 0){
     173           0 :         log_info("handle_recv_socket_shutdown on closed socket (eqfd %d)", socket->eqfd);
     174           0 :         return;
     175             :     }
     176         172 :     int ret = 0;
     177         172 :     struct epoll_event ep_event; /* needed only for kernel <2.6.9  */
     178         172 :     log_dbg("Handle_recv_socket_shutdown for socket %p, evloop: %d", socket, socket->ctx->evloop.epollfd);
     179             : 
     180         172 :     if(socket->ep && (ret = fi_close(&socket->ep->fid)))
     181             :     {
     182           0 :         log_error("Failed to close recv socket Endpoint %d: %s", ret, fi_strerror(-ret));
     183             :     }
     184         172 :     log_dbg("fi_close done for endpoint.");
     185             : 
     186             :     //EQ
     187         172 :     remove_polled_fid(&socket->ctx->evloop.pfids, socket->eq_ev_ctx.fd);
     188         172 :     ret = epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->eq_ev_ctx.fd, &ep_event);
     189         172 :     if ( ret ){ log_warn("netio_recv_socket: cannot remove EQ fd %d from evloop %d: %s", socket->eq_ev_ctx.fd, socket->ctx->evloop.epollfd, strerror(errno));}
     190         172 :     log_dbg("netio_recv_socket: removing EQ fd %d from evloop %d", socket->eq_ev_ctx.fd, socket->ctx->evloop.epollfd);
     191         172 :     if(socket->eq && (ret = fi_close(&socket->eq->fid)))
     192             :     {
     193           0 :         log_error("Failed to fi_close recv socket Event Queue %d: %s", ret, fi_strerror(-ret));
     194           0 :         ret = close(socket->eq_ev_ctx.fd);
     195           0 :         if ( ret ) {log_warn("Cannot close recv socket EQ fd %d: %s", socket->eq_ev_ctx.fd, strerror(errno));}
     196             :     }
     197         172 :     remove_open_fd(&socket->ctx->evloop, socket->eq_ev_ctx.fd);
     198         172 :     socket->eqfd = -1;
     199             : 
     200             :     //FI_RECV CQ
     201         172 :     remove_polled_fid(&socket->ctx->evloop.pfids, socket->cq_ev_ctx.fd);
     202         172 :     ret = epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->cq_ev_ctx.fd, &ep_event);
     203         172 :     if ( ret ){ log_warn("netio_recv_socket: cannot remove CQ fd %d from evloop %d: %s", socket->cq_ev_ctx.fd, socket->ctx->evloop.epollfd, strerror(errno));}
     204         172 :     log_dbg("netio_recv_socket: removing CQ fd %d from evloop %d", socket->cq_ev_ctx.fd, socket->ctx->evloop.epollfd);
     205         172 :     if(socket->cq && (ret = fi_close(&socket->cq->fid)))
     206             :     {
     207           0 :         log_error("Failed to close recv socket Completion Queue %d: %s", ret, fi_strerror(-ret));
     208           0 :         ret = close(socket->cq_ev_ctx.fd);
     209           0 :         if ( ret ) {log_warn("Cannot close recv socket CQ fd %d: %s", socket->cq_ev_ctx.fd, strerror(errno));}
     210             :     }
     211         172 :     remove_open_fd(&socket->ctx->evloop, socket->cq_ev_ctx.fd);
     212         172 :     socket->cqfd = -1;
     213             :     //FI_TRANSMIT CQ
     214         172 :     if((ret = fi_close(&socket->tcq->fid)))
     215             :     {
     216           0 :         log_error("Failed to close FI_TRANSMIT recv socket Completion Queue %d: %s", ret, fi_strerror(-ret));
     217             :     }
     218             : 
     219             :     //MR
     220         172 :     uint16_t mem_regions = socket->reg_mr;
     221       29142 :     for(uint16_t i=0; i<mem_regions; ++i)
     222             :     {
     223       28970 :         if ((ret = fi_close(socket->mr[i]))){
     224           0 :             log_warn("Failed to close recv Memory Region %d, error %d.", i, ret);
     225             :         }
     226             :         else{
     227       28970 :             socket->reg_mr-=1;
     228             :         }
     229             :     }
     230         172 :     if(socket->reg_mr==0){free(socket->mr);socket->mr=NULL;}
     231         172 :     if((ret = fi_close(&socket->domain->fid)))
     232             :     {
     233           0 :         log_error("Failed to close recv socket Domain %d: %s", ret, fi_strerror(-ret));
     234             :     }
     235         172 :     if (socket->sub_msg_buffers != NULL){
     236        1155 :         for(int i = 0; i < 32; i++){
     237        1120 :             free(socket->sub_msg_buffers[i]->data);
     238        1120 :             free(socket->sub_msg_buffers[i]);
     239             :         }
     240          35 :         free(socket->sub_msg_buffers);
     241          35 :         socket->sub_msg_buffers = NULL;
     242             :     }
     243             : }
     244             : 
     245             : void
     246         391 : handle_send_socket_shutdown(struct netio_send_socket* socket)
     247             : {
     248             :     // if(socket->state != CONNECTED){
     249             :     //     log_dbg("Nothing to do, send socket has already been freed....");
     250             :     //     return;
     251             :     // }
     252         391 :     int ret = 0;
     253         391 :     struct epoll_event ep_event; /* needed only for kernel <2.6.9  */
     254         391 :     log_dbg("Handle_send_socket_shutdown. Socket with EQ: %d", socket->eqfd);
     255             : 
     256         391 :     if(socket->ep && (ret = fi_close(&socket->ep->fid)))
     257             :     {
     258           0 :         log_error("Failed to close send socket Endpoint %d: %s", ret, fi_strerror(-ret));
     259             :     }
     260         391 :     socket->ep = NULL;
     261             : 
     262             :     //EQ
     263         391 :     log_dbg("netio_send_socket: removing EQ fd %d from evloop %d", socket->eqfd, socket->ctx->evloop.epollfd);
     264         391 :     remove_polled_fid(&socket->ctx->evloop.pfids, socket->eqfd);
     265         391 :     ret = epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->eqfd, &ep_event);
     266         391 :     if ( ret ){ log_warn("netio_send_socket: cannot remove EQ fd %d from evloop %d: %s", socket->eqfd, socket->ctx->evloop.epollfd, strerror(errno));}
     267         391 :     if(socket->eq && (ret = fi_close(&socket->eq->fid)))
     268             :     {
     269           0 :         log_error("Failed to close send socket Event Queue %d: %s", ret, fi_strerror(-ret));
     270           0 :         ret = close(socket->eq_ev_ctx.fd);
     271           0 :         if ( ret ) { log_warn("Cannot close send socket eq fd %d, %s", socket->eq_ev_ctx.fd, strerror(errno));}
     272             :     }
     273         391 :     remove_open_fd(&socket->ctx->evloop, socket->eqfd);
     274         391 :     socket->eqfd = -1;
     275             : 
     276             :     //FI_TRANSMIT CQ
     277         391 :     log_dbg("netio_send_socket: removing CQ fd %d from evloop %d", socket->cq_ev_ctx.fd, socket->ctx->evloop.epollfd);
     278         391 :     remove_polled_fid(&socket->ctx->evloop.pfids, socket->cqfd);
     279         391 :     ret = epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->cqfd, &ep_event);
     280         391 :     if ( ret ){ log_warn("netio_send_socket: cannot remove CQ fd %d from evloop %d: %s", socket->cqfd, socket->ctx->evloop.epollfd, strerror(errno));}
     281         391 :     if(socket->cq && (ret = fi_close(&socket->cq->fid)))
     282             :     {
     283           0 :         log_error("Failed to close send socket Completion Queue %d: %s", ret, fi_strerror(-ret));
     284           0 :         ret = close(socket->cq_ev_ctx.fd);
     285           0 :         if ( ret ){ log_warn("Could not close send socket cq fd %d: %s", socket->cq_ev_ctx.fd, strerror(errno));}
     286             :     }
     287         391 :     remove_open_fd(&socket->ctx->evloop, socket->cqfd);
     288         391 :     socket->cqfd = -1;
     289             :     //FI_RECV CQ
     290         391 :     if((ret = fi_close(&socket->rcq->fid)))
     291             :     {
     292           0 :         log_error("Failed to close FI_RECV senf socket Completion Queue %d: %s", ret, fi_strerror(-ret));
     293             :     }
     294             : 
     295         391 :     socket->domain->nb_sockets -= 1;
     296         391 :     if (socket->domain->nb_sockets == 0) {
     297       37633 :         for(unsigned int i=0; i < socket->domain->reg_mr; ++i) {
     298       37244 :             if ((ret = fi_close(socket->domain->mr[i]))) {
     299       37244 :                 log_warn("Failed to close send Memory Region %d, error %d.", i, ret);;
     300             :             }
     301             :         }
     302         389 :         if ((ret = fi_close(&socket->domain->domain->fid))) {
     303           0 :             log_error("Failed to close send socket Domain %d: %s", ret, fi_strerror(-ret));
     304             :         }
     305             : 
     306         389 :         if(socket->ctx->evloop.pfids.count == 0){
     307         318 :             if ((ret = fi_close(&socket->domain->fabric->fid))) {
     308           0 :                 log_error("Failed to close send socket Fabric %d: %s", ret, fi_strerror(-ret));
     309             :             }
     310             :         }
     311         389 :         free(socket->domain->mr);
     312         389 :         socket->domain->mr = NULL;
     313         389 :         free(socket->domain);
     314         389 :         socket->domain = NULL;
     315             :     }
     316         391 :     if(socket->fi != NULL){
     317         391 :         fi_freeinfo(socket->fi);
     318         391 :         socket->fi = NULL;
     319             :     }
     320         391 :     socket->state = DISCONNECTED;
     321         391 : }
     322             : 
     323             : 
     324             : void
     325           8 : handle_tcp_send_socket_shutdown(struct netio_send_socket* socket)
     326             : {
     327           8 :     netio_signal_close(&socket->ctx->evloop, &socket->tcp_signal);
     328           8 :     epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->eq_ev_ctx.fd, NULL);
     329           8 :     int rc = close(socket->eq_ev_ctx.fd);
     330           8 :     if ( !rc ){ remove_open_fd(&socket->ctx->evloop, socket->eq_ev_ctx.fd); }
     331           8 : }
     332             : 
     333             : 
     334             : static void
     335          95 : handle_send_socket_shutdown_on_connection_refused(struct netio_send_socket* socket)
     336             : {
     337          95 :     int ret = 0;
     338             :     //struct fi_eq_err_entry err_entry;
     339          95 :     struct epoll_event ep_event; /* needed only for kernel <2.6.9  */
     340          95 :     if(socket->ep && (ret = fi_close(&socket->ep->fid)))
     341             :     {
     342           0 :         log_error("Failed to close send socket endpoint %d: %s", ret, fi_strerror(-ret));
     343             :     }
     344             : 
     345             :     //EQ
     346          95 :     remove_polled_fid(&socket->ctx->evloop.pfids, socket->eqfd);
     347          95 :     ret =  epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->eqfd, &ep_event);
     348          95 :     if (ret) {log_warn("Conn refused: cannot deregister send_socket EQ fd %d from evloop %d, %s", socket->eqfd, socket->ctx->evloop.epollfd, strerror(errno));}
     349          95 :     if(socket->eq && (ret = fi_close(&socket->eq->fid)))
     350             :     {
     351           0 :         log_error("Failed to close send socket Event Queue %d: %s", ret, fi_strerror(-ret));
     352           0 :         ret = close(socket->eq_ev_ctx.fd);
     353           0 :         if ( ret ) { log_warn("Cannot close send socket eq fd %d, %s", socket->eq_ev_ctx.fd, strerror(errno));}
     354             :     }
     355          95 :     remove_open_fd(&socket->ctx->evloop, socket->eqfd);
     356          95 :     socket->eqfd = -1;
     357             : 
     358             :     //FI_TRANSMIT CQ - wait object not retrieved yet
     359          95 :     if(socket->cq && (ret = fi_close(&socket->cq->fid)))
     360             :     {
     361           0 :         log_error("Failed to close FI_TRANSMIT send socket Completion Queue %d: %s", ret, fi_strerror(-ret));
     362             :     }
     363             :     //FI_RECV CQ
     364          95 :     if(fi_close(&socket->rcq->fid))
     365             :     {
     366           0 :         log_error("Failed to close FI_RECV send socket Completion Queue %d: %s", ret, fi_strerror(-ret));
     367             :     }
     368             : 
     369          95 :     socket->domain->nb_sockets -= 1;
     370          95 :     if (socket->domain->nb_sockets == 0) {
     371         118 :         for(unsigned int i=0; i < socket->domain->reg_mr; ++i) {
     372          23 :             if ((ret = fi_close(socket->domain->mr[i]))) {
     373           0 :                 log_warn("Failed to close send Memory Region %d, error %d.", i, ret);
     374             :             }
     375             :         }
     376          95 :         if ((ret = fi_close(&socket->domain->domain->fid))) {
     377           0 :             log_error("Failed to close send socket Domain %d: %s", ret, fi_strerror(-ret));
     378             :         }
     379          95 :         if(socket->ctx->evloop.pfids.count == 0){
     380          73 :             if ((ret = fi_close(&socket->domain->fabric->fid))) {
     381           0 :                 log_error("Failed to close send socket Fabric %d: %s", ret, fi_strerror(-ret));
     382             :             }
     383             :         }
     384          95 :         free(socket->domain->mr);
     385          95 :         socket->domain->mr = NULL;
     386          95 :         free(socket->domain);
     387          95 :         socket->domain = NULL;
     388             : 
     389             :     }
     390          95 :     if(socket->fi != NULL){
     391          95 :         fi_freeinfo(socket->fi);
     392          95 :         socket->fi = NULL;
     393             :     }
     394          95 :     socket->state = UNCONNECTED;
     395          95 : }
     396             : 
     397             : // WAIT OBJECT CALLBACKS ///////////////////////////////////////////////////////////
     398             : 
     399             : void
     400        1492 : on_send_socket_libfabric_cm_event(int fd, void* ptr)
     401             : {
     402        1492 :     struct netio_send_socket* socket = (struct netio_send_socket*)ptr;
     403        1492 :     struct fi_eq_err_entry err_entry;
     404        1492 :     log_dbg("send socket %p fd %d: connection event, evloop: %d", socket, fd, socket->ctx->evloop.epollfd);
     405             : 
     406        1492 :     if(socket->state == DISCONNECTED || socket->eqfd != fd){
     407             :         log_dbg("discarding send_socket cm events: state disconnected or fd mismatch eqfd %d fd %d", socket->eqfd, fd);
     408         954 :         return;
     409             :     } //Check if event queue was already closed. TODO: Need to fix receiving messages after closing
     410        1492 :     int event = read_cm_event(socket->eq, NULL, &err_entry);
     411        1492 :     int ret = 0;
     412             : 
     413        1492 :     log_dbg("event %d", event);
     414             : 
     415        1492 :     switch(event)
     416             :     {
     417         393 :     case FI_SHUTDOWN:
     418         393 :         if (socket->eqfd < 0 ){
     419           0 :             log_info("Ignoring FI_SHUTDOWN on send_socket, invalid eqfd (socket already closed)");
     420         954 :             return;
     421             :         }
     422         393 :         log_info("fi_verbs_process_send_socket_cm_event: FI_SHUTDOWN");
     423         393 :         if(socket->cb_internal_connection_closed){
     424         155 :             socket->cb_internal_connection_closed(socket);
     425             :         }
     426         393 :         if(socket->cb_connection_closed) {
     427         393 :             socket->cb_connection_closed(socket);
     428             :         }
     429             :         return;
     430             : 
     431         466 :     case FI_CONNECTED:
     432         466 :         socket->cqfd = -1;
     433         466 :         if((ret = fi_control(&socket->cq->fid, FI_GETWAIT, &socket->cqfd)))
     434             :         {
     435           0 :             FATAL("Failed to retrieve wait object for send socket Completion Queue", ret);
     436             :         }
     437             : 
     438         466 :         socket->cq_ev_ctx.fd = socket->cqfd;
     439         466 :         socket->cq_ev_ctx.data = socket;
     440         466 :         socket->cq_ev_ctx.cb = on_send_socket_cq_event;
     441             : 
     442         466 :         log_dbg("send_socket: EQ fd %d connected, CQ fd %d", socket->eqfd, socket->cqfd);
     443         466 :         log_dbg("Adding SEND CQ polled fid %d %p", socket->cqfd, &socket->cq->fid);
     444         466 :         add_polled_fid(&socket->ctx->evloop.pfids, socket->domain->fabric, &socket->cq->fid, socket->cqfd, socket, on_send_socket_cq_event);
     445         466 :         add_open_fd(&socket->ctx->evloop.openfds, socket->cqfd, NETIO_CQ, USEND, socket);
     446         466 :         netio_register_read_fd(&socket->ctx->evloop, &socket->cq_ev_ctx);
     447             : 
     448         466 :         socket->state = CONNECTED;
     449             : 
     450         466 :         if(socket->cb_connection_established) {
     451         466 :           socket->cb_connection_established(socket);
     452             :         }
     453             : 
     454             :         return;
     455             : 
     456             :     case FI_MR_COMPLETE:
     457             :     case FI_AV_COMPLETE:
     458             :     case FI_JOIN_COMPLETE:
     459             :         // Not implemented
     460         538 :         break;
     461             : 
     462             : 
     463          95 :     case -FI_EAVAIL:
     464             : 
     465          95 :         switch(err_entry.err) {
     466             : 
     467          95 :             case FI_ECONNREFUSED:
     468             : 
     469          95 :                 log_dbg("Connection refused (FI_ECONNREFUSED). Deallocating send_socket resources.");
     470             : 
     471          95 :                 if(socket->eqfd < 0){
     472             :                     log_dbg("FI_ECONNREFUSED on send socket with EQ fd %d. Not clearing it.", socket->eqfd);
     473             :                     return;
     474             :                 }
     475             : 
     476          95 :                 handle_send_socket_shutdown_on_connection_refused(socket);
     477             : 
     478          95 :                 if(socket->cb_error_connection_refused) {
     479          95 :                     socket->cb_error_connection_refused(socket);
     480             :                 }
     481             : 
     482          95 :                 if (socket->recv_socket != NULL){
     483           0 :                     if (socket->recv_socket->eqfd < 0 ){
     484           0 :                         log_info("Ignoring FI_ETIMEDOUT on recv_socket, invalid eqfd (socket already closed)");
     485           0 :                         return;
     486             :                     }
     487           0 :                     log_info("Shutting down receive socket on FI_ETIMEDOUT");
     488           0 :                     handle_recv_socket_shutdown(socket->recv_socket);
     489           0 :                     if(socket->recv_socket->lsocket->cb_connection_closed) {
     490           0 :                         socket->recv_socket->lsocket->cb_connection_closed(socket->recv_socket);
     491             :                     }
     492           0 :                     if (socket->recv_socket->lsocket->attr.num_buffers > 0){
     493           0 :                         for(int i = 0; i < socket->recv_socket->lsocket->attr.num_buffers; i++){
     494           0 :                             free(socket->recv_socket->recv_buffers[i].data);
     495             :                         }
     496           0 :                         free(socket->recv_socket->recv_buffers);
     497             :                     }
     498             :                 }
     499             : 
     500             :                 //print_openfds(&socket->ctx->evloop.openfds);
     501             :                 break;
     502             : 
     503           0 :             case FI_ETIMEDOUT:
     504           0 :                 log_info("fi_verbs_process_send_socket_cm_event: FI_ETIMEDOUT");
     505           0 :                 if (socket->eqfd < 0 ){
     506           0 :                     log_info("Ignoring FI_SHUTDOWN on send_socket, invalid eqfd (socket already closed)");
     507           0 :                     break;
     508             :                 }
     509             : 
     510             :                 // Need to take care of receive socket as well
     511           0 :                 if (socket->recv_socket != NULL){
     512           0 :                     if (socket->recv_socket->eqfd < 0 ){
     513           0 :                         log_info("Ignoring FI_ETIMEDOUT on recv_socket, invalid eqfd (socket already closed)");
     514           0 :                         return;
     515             :                     }
     516           0 :                     log_info("Shutting down receive socket on FI_ETIMEDOUT");
     517           0 :                     handle_recv_socket_shutdown(socket->recv_socket);
     518           0 :                     if(socket->recv_socket->lsocket->cb_connection_closed) {
     519           0 :                         socket->recv_socket->lsocket->cb_connection_closed(socket->recv_socket);
     520             :                     }
     521           0 :                     if (socket->recv_socket->lsocket->attr.num_buffers > 0){
     522           0 :                         for(int i = 0; i < socket->recv_socket->lsocket->attr.num_buffers; i++){
     523           0 :                             free(socket->recv_socket->recv_buffers[i].data);
     524             :                         }
     525           0 :                         free(socket->recv_socket->recv_buffers);
     526             :                     }
     527             :                 }
     528             : 
     529           0 :                 if(socket->cqfd < 0){ //cq not initalized yet
     530           0 :                     handle_send_socket_shutdown_on_connection_refused(socket);
     531             :                 } else {
     532           0 :                     if(socket->cb_internal_connection_closed){
     533           0 :                         socket->cb_internal_connection_closed(socket);
     534             :                     }
     535           0 :                     if(socket->cb_connection_closed) {
     536           0 :                         socket->cb_connection_closed(socket);
     537             :                     }
     538             :                 }
     539             : 
     540             :                 break;
     541             : 
     542           0 :             default:
     543             : 
     544           0 :                 log_error("Unhandled error in the Event Queue: %s (code: %d), provider specific: %s (code: %d)",
     545             :                         fi_strerror(err_entry.err), err_entry.err,
     546             :                         fi_eq_strerror(socket->eq, err_entry.prov_errno, err_entry.err_data, NULL, 0),
     547             :                         err_entry.prov_errno);
     548             :         }
     549             :         return;
     550             : 
     551         538 :     case -FI_EAGAIN:
     552         538 :         struct fid* fp = &socket->eq->fid;
     553         538 :         fi_trywait(socket->domain->fabric, &fp, 1);
     554         538 :         break;
     555             : 
     556           0 :     default:
     557           0 :         log_error("Unexpected event %d in send socket Event Queue", event);
     558           0 :         exit(2);
     559             :     }
     560             : }
     561             : 
     562             : 
     563             : void
     564         107 : on_listen_socket_libfabric_cm_event(int fd, void* ptr)
     565             : {
     566         107 :     struct netio_listen_socket* lsocket = (struct netio_listen_socket*)ptr;
     567         107 :     log_dbg("listen socket: connection event");
     568         107 :     if(lsocket->eqfd != fd){
     569           0 :         log_dbg("listen socket is already closed.");
     570           0 :         return;
     571             :     }
     572             : 
     573         107 :     struct fi_info *info = NULL;
     574         107 :     struct fi_eq_err_entry err_entry;
     575         107 :     int event = read_cm_event(lsocket->eq, &info, &err_entry);
     576             : 
     577             : 
     578         107 :     switch (event)
     579             :     {
     580         107 :         case FI_CONNREQ:
     581         107 :             log_dbg("fi_verbs_process_listen_socket_cm_event: FI_CONNREQ");
     582         107 :             struct netio_socket_list* new_entry = add_socket(&lsocket->recv_sockets, URECV);
     583         107 :             struct netio_recv_socket* rsocket = (struct netio_recv_socket*)new_entry->socket;
     584         107 :             netio_init_recv_socket(rsocket, lsocket);
     585         107 :             handle_connreq(rsocket, lsocket, info, on_recv_socket_libfabric_cm_event, rsocket);
     586         107 :             if(lsocket->recv_sub_msg == 1){
     587          44 :                 rsocket->sub_msg_buffers = malloc(32*sizeof(struct netio_buffer*));
     588        1452 :                 for (int i = 0; i < 32; i++){
     589        1408 :                     rsocket->sub_msg_buffers[i] = malloc(sizeof(struct netio_buffer));
     590        1408 :                     rsocket->sub_msg_buffers[i]->size = sizeof(struct netio_subscription_message);
     591        1408 :                     rsocket->sub_msg_buffers[i]->data = malloc(rsocket->sub_msg_buffers[i]->size);
     592        1408 :                     netio_register_recv_buffer(rsocket, rsocket->sub_msg_buffers[i], 0);
     593        1408 :                     netio_post_recv(rsocket, rsocket->sub_msg_buffers[i]);
     594             :                 }
     595             :                 log_dbg("Posted recv for subscription messages");
     596          63 :             } else if (lsocket->attr.num_buffers > 0) {
     597          63 :                 log_dbg("Connection established, posting %lu buffers", lsocket->attr.num_buffers);
     598          63 :                 rsocket->sub_msg_buffers = NULL;
     599          63 :                 rsocket->recv_buffers = malloc(lsocket->attr.num_buffers * sizeof(struct netio_buffer));
     600       14406 :                 for(unsigned i=0; i<lsocket->attr.num_buffers; i++) {
     601       14343 :                     log_trc("registering buffer of size %lu", lsocket->attr.buffer_size);
     602       14343 :                     rsocket->recv_buffers[i].size = lsocket->attr.buffer_size;
     603       14343 :                     rsocket->recv_buffers[i].data = malloc(lsocket->attr.buffer_size);
     604       14343 :                     netio_register_recv_buffer(rsocket, &rsocket->recv_buffers[i], 0);
     605       14343 :                     netio_post_recv(rsocket, &rsocket->recv_buffers[i]);
     606             :                 }
     607             :             } else {
     608           0 :                 log_error("Something went wrong. Not allocating any buffers for recv socket.");
     609             :             }
     610             :             break;
     611             : 
     612           0 :         case FI_CONNECTED:
     613           0 :             log_fatal("FI_CONNECTED received on listen socket");
     614           0 :             exit(2);
     615             : 
     616           0 :         case FI_SHUTDOWN:
     617           0 :             log_fatal("FI_SHUTDOWN received on listen socket");
     618           0 :             exit(2);
     619             : 
     620           0 :         case -FI_EAGAIN:
     621           0 :             struct fid* fp = &lsocket->eq->fid;
     622           0 :             fi_trywait(lsocket->fabric, &fp, 1);
     623           0 :             break;
     624             : 
     625           0 :         case -FI_EAVAIL:
     626           0 :             log_error("Unhandled error in listen socket EQ code: %d, provider specific code: %d",
     627             :                 err_entry.err, err_entry.prov_errno);
     628           0 :             break;
     629             :     }
     630         107 :     fi_freeinfo(info);
     631             : }
     632             : 
     633             : 
     634             : void
     635         113 : on_buffered_listen_socket_libfabric_cm_event(int fd, void* ptr)
     636             : {
     637         113 :     struct netio_buffered_listen_socket* lsocket = (struct netio_buffered_listen_socket*)ptr;
     638         113 :     log_dbg("buffered listen socket: connection event");
     639         113 :     if(lsocket->listen_socket.eqfd != fd){
     640           0 :         log_dbg("Buffered listen socket CM event: inconsistend fd. Ignoring event.");
     641           0 :         return;
     642             :     }
     643             : 
     644         113 :     struct fi_info *info = NULL;
     645         113 :     struct fi_eq_err_entry err_entry;
     646         113 :     int event = read_cm_event(lsocket->listen_socket.eq, &info, &err_entry);
     647             : 
     648         113 :     switch (event)
     649             :     {
     650         113 :         case FI_CONNREQ:
     651         113 :             log_dbg("FI_CONNREQ");
     652         113 :             struct netio_socket_list* new_entry = add_socket(&(lsocket->listen_socket.recv_sockets), BRECV);
     653         113 :             struct netio_buffered_recv_socket* rsocket = (struct netio_buffered_recv_socket*)new_entry->socket;
     654         113 :             netio_buffered_recv_socket_init(rsocket, lsocket);
     655         113 :             handle_connreq(&rsocket->recv_socket, &lsocket->listen_socket, info, on_buffered_recv_socket_libfabric_cm_event, rsocket);
     656         113 :             log_info("fi_verbs_process_recv_socket_cm_event: FI_CONNECTED, posting %d buffers", rsocket->num_pages);
     657       24649 :             for(unsigned int i=0; i<rsocket->num_pages; i++) {
     658       24536 :                 netio_register_recv_buffer(&rsocket->recv_socket, &rsocket->pages[i], 0);
     659       24536 :                 netio_post_recv(&rsocket->recv_socket, &rsocket->pages[i]);
     660             :             }
     661             :             break;
     662             : 
     663           0 :         case FI_CONNECTED:
     664           0 :             log_fatal("FI_CONNECTED received on buffered listen socket");
     665           0 :             exit(2);
     666             : 
     667           0 :         case FI_SHUTDOWN:
     668           0 :             log_fatal("FI_SHUTDOWN received on buffered listen socket");
     669           0 :             exit(2);
     670             : 
     671           0 :         case -FI_EAGAIN:
     672           0 :             struct fid* fp = &lsocket->listen_socket.eq->fid;
     673           0 :             fi_trywait(lsocket->listen_socket.fabric, &fp, 1);
     674           0 :             break;
     675             : 
     676           0 :         case -FI_EAVAIL:
     677           0 :             log_error("Unhandled error in buffer listen socket EQ code: %d, provider specific code: %d",
     678             :                 err_entry.err, err_entry.prov_errno);
     679           0 :             break;
     680             :     }
     681         113 :     fi_freeinfo(info);
     682             : }
     683             : 
     684             : 
     685             : void
     686         189 : on_recv_socket_libfabric_cm_event(int fd, void* ptr)
     687             : {
     688         189 :     struct netio_recv_socket* socket = (struct netio_recv_socket*)ptr;
     689         189 :     log_info("recv socket %d: connection event", socket->eqfd);
     690         189 :     if(socket->eqfd != fd){
     691             :         log_dbg("Recv socket CM event: inconsistend fd. Ignoring event.");
     692           0 :         return;
     693             :     }
     694         189 :     int ret;
     695         189 :     struct fi_eq_err_entry err_entry;
     696         189 :     uint32_t event = read_cm_event(socket->eq, NULL, &err_entry);
     697             : 
     698         189 :     switch (event)
     699             :     {
     700         107 :     case FI_CONNECTED:
     701         107 :         log_info("fi_verbs_process_recv_socket_cm_event: FI_CONNECTED");
     702         107 :         socket->cqfd = -1;
     703         107 :         if((ret = fi_control(&socket->cq->fid, FI_GETWAIT, &socket->cqfd)))
     704             :         {
     705           0 :             FATAL("Failed to retrieve recv socket Completion Queue wait object", ret);
     706             :         }
     707             : 
     708         107 :         socket->cq_ev_ctx.fd = socket->cqfd;
     709         107 :         socket->cq_ev_ctx.data = socket;
     710         107 :         socket->cq_ev_ctx.cb = on_recv_socket_cq_event;
     711             : 
     712         107 :         log_dbg("Adding recv CQ polled fid %d %p", socket->cqfd, &socket->cq->fid);
     713         107 :         add_polled_fid(&socket->ctx->evloop.pfids, socket->lsocket->fabric, &socket->cq->fid, socket->cqfd, socket, on_recv_socket_cq_event);
     714         107 :         add_open_fd(&socket->ctx->evloop.openfds, socket->cqfd, NETIO_CQ, URECV, socket);
     715         107 :         netio_register_read_fd(&socket->ctx->evloop, &socket->cq_ev_ctx);
     716             : 
     717         107 :         log_dbg("recv_socket: EQ fd %d CQ fd %d connected", socket->eqfd, socket->cqfd);
     718         107 :         if(socket->lsocket->cb_connection_established) {
     719         107 :             socket->lsocket->cb_connection_established(socket);
     720             :         }
     721             : 
     722         189 :         break;
     723             : 
     724          82 :     case FI_SHUTDOWN:
     725          82 :         if (socket->eqfd < 0 ){
     726           0 :             log_info("Ignoring FI_SHUTDOWN on recv_socket, invalid eqfd (socket already closed)");
     727           0 :             return;
     728             :         }
     729          82 :         log_info("fi_verbs_process_recv_socket_cm_event: FI_SHUTDOWN");
     730          82 :         handle_recv_socket_shutdown(socket);
     731          82 :         if(socket->lsocket->cb_connection_closed) {
     732          47 :             socket->lsocket->cb_connection_closed(socket);
     733             :         }
     734             : 
     735          82 :         if (socket->lsocket->attr.num_buffers > 0){
     736       11569 :             for(int i = 0; i < socket->lsocket->attr.num_buffers; i++){
     737       11522 :                 free(socket->recv_buffers[i].data);
     738             :             }
     739          47 :             free(socket->recv_buffers);
     740             :         }
     741             : 
     742          82 :         int return_value = remove_socket(&socket->lsocket->recv_sockets, socket);
     743          82 :         log_info("Recv socket removed, result: %d", return_value);
     744          82 :         break;
     745             : 
     746             :     case FI_MR_COMPLETE:
     747             :     case FI_AV_COMPLETE:
     748             :     case FI_JOIN_COMPLETE:
     749             :         // Not implemented
     750             :         break;
     751             : 
     752           0 :     case -FI_EAGAIN:
     753           0 :         struct fid* fp = &socket->eq->fid;
     754           0 :         fi_trywait(socket->lsocket->fabric, &fp, 1);
     755           0 :         break;
     756             : 
     757           0 :     case -FI_EAVAIL:
     758           0 :         log_error("Unhandled error in recv socket EQ code: %d, provider specific code: %d",
     759             :             err_entry.err, err_entry.prov_errno);
     760           0 :         break;
     761             : 
     762           0 :     default:
     763           0 :         log_error("Unexpected event %d in recv socket Event Queue", event);
     764           0 :         exit(2);
     765         189 :         break;
     766             :     }
     767             : }
     768             : 
     769             : 
     770             : void
     771         203 : on_buffered_recv_socket_libfabric_cm_event(int fd, void* ptr)
     772             : {
     773         203 :     struct netio_buffered_recv_socket* socket = (struct netio_buffered_recv_socket*)ptr;
     774         203 :     log_dbg("buffered recv socket %d: connection event (FD = %d)", socket->recv_socket.eqfd, fd);
     775         203 :     if(socket->recv_socket.eqfd != fd){
     776             :         log_dbg("Buffered recv socket CM event: inconsistend fd. Ignoring event.");
     777           0 :         return;
     778             :     }
     779         203 :     int ret;
     780         203 :     struct fi_eq_err_entry err_entry;
     781         203 :     uint32_t event = read_cm_event(socket->recv_socket.eq, NULL, &err_entry);
     782             : 
     783         203 :     switch (event)
     784             :     {
     785         113 :     case FI_CONNECTED:
     786         113 :         socket->recv_socket.cqfd = -1;
     787         113 :         if((ret = fi_control(&socket->recv_socket.cq->fid, FI_GETWAIT, &socket->recv_socket.cqfd)))
     788             :         {
     789           0 :             FATAL("Failed to retrieve recv socket Completion Queue wait object", ret);
     790             :         }
     791             : 
     792         113 :         socket->recv_socket.cq_ev_ctx.fd = socket->recv_socket.cqfd;
     793         113 :         socket->recv_socket.cq_ev_ctx.data = &socket->recv_socket;
     794         113 :         socket->recv_socket.cq_ev_ctx.cb = on_recv_socket_cq_event;
     795             : 
     796         113 :         log_dbg("Adding BUFFERED RECV CQ polled fid %d %p", socket->recv_socket.cqfd, &socket->recv_socket.cq->fid);
     797         113 :         add_open_fd(&socket->recv_socket.ctx->evloop.openfds, socket->recv_socket.cqfd, NETIO_CQ, BRECV, &socket->recv_socket);
     798         113 :         add_polled_fid(&socket->recv_socket.ctx->evloop.pfids,
     799         113 :                     socket->recv_socket.lsocket->fabric,
     800         113 :                     &socket->recv_socket.cq->fid,
     801             :                     socket->recv_socket.cqfd,
     802             :                     &socket->recv_socket,
     803             :                     on_recv_socket_cq_event);
     804             : 
     805         113 :         netio_register_read_fd(&socket->recv_socket.ctx->evloop, &socket->recv_socket.cq_ev_ctx);
     806         113 :         log_dbg("buffered_recv_socket: registering CQ fd %d", socket->recv_socket.cqfd);
     807             : 
     808         113 :         if(socket->lsocket->cb_connection_established) {
     809         113 :             socket->lsocket->cb_connection_established(socket);
     810             :         }
     811             : 
     812         203 :         break;
     813             : 
     814          90 :     case FI_SHUTDOWN:
     815          90 :         log_dbg("recv socket shutdown");
     816          90 :         if (socket->recv_socket.eqfd < 0 ){
     817           0 :             log_dbg("Ignoring FI_SHUTDOWN on buffered recv_socket, invalid eqfd (socket already closed)");
     818           0 :             return;
     819             :         }
     820          90 :         handle_recv_socket_shutdown(&socket->recv_socket);
     821       16418 :         for(unsigned int i=0; i<socket->num_pages; i++) {
     822       16328 :             free(socket->pages[i].data);
     823             :         }
     824          90 :         free(socket->pages);
     825          90 :         if(socket->lsocket->cb_connection_closed) {
     826          90 :             socket->lsocket->cb_connection_closed(socket);
     827             :         }
     828             : 
     829          90 :         remove_socket(&socket->lsocket->listen_socket.recv_sockets, socket);
     830          90 :         break;
     831             : 
     832             :     case FI_MR_COMPLETE:
     833             :     case FI_AV_COMPLETE:
     834             :     case FI_JOIN_COMPLETE:
     835             :         // Not implemented
     836             :         break;
     837             : 
     838           0 :     case -FI_EAGAIN:
     839           0 :         struct fid* fp = &socket->recv_socket.eq->fid;
     840           0 :         fi_trywait(socket->lsocket->listen_socket.fabric, &fp, 1);
     841           0 :         break;
     842             : 
     843           0 :     case -FI_EAVAIL:
     844             :         // error was found
     845           0 :         log_error("Unhandled error in buffered recv socket EQ code: %d, provider specific code: %d",
     846             :             err_entry.err, err_entry.prov_errno);
     847           0 :         break;
     848             : 
     849           0 :     default:
     850           0 :         log_error("Unexpected event %d in buffered recv socket Event Queue", event);
     851           0 :         exit(2);
     852         203 :         break;
     853             :     }
     854             : }
     855             : 
     856             : 
     857             : // CALLBACKS FOR GARBAGE COLLECTION  //////////////////////////////////////////////////////////
     858             : void
     859           0 : close_send_socket(void* ptr)
     860             : {
     861           0 :     log_info("Close_send_socket. Not supported anymore");
     862           0 :     struct signal_data* sd = (struct signal_data*)ptr;
     863             : 
     864           0 :     struct netio_send_socket* socket = (struct netio_send_socket*)sd->ptr;
     865           0 :     if(socket->tcp_fi_mode == NETIO_MODE_TCP){
     866           0 :         shutdown(socket->cq_ev_ctx.fd, SHUT_WR);
     867           0 :         netio_signal_close(&socket->ctx->evloop, &socket->tcp_signal);
     868           0 :         epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->eq_ev_ctx.fd, NULL);
     869           0 :         int rc = close(socket->cq_ev_ctx.fd);
     870           0 :         if ( rc ){ log_warn("Cannot close TCP send socket fd %d: %s", socket->cq_ev_ctx.fd, strerror(errno));}
     871           0 :         if ( !rc ){ remove_open_fd(&socket->ctx->evloop, socket->eq_ev_ctx.fd); }
     872           0 :         socket->state=DISCONNECTED;
     873           0 :         if(socket->cb_connection_closed) {
     874           0 :           socket->cb_connection_closed(socket);
     875             :         }
     876           0 :     } else if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC) {
     877             :         //commented out as closing libfabric sockets relies on exhanges of fi_shutdown
     878             :         //handle_send_socket_shutdown(socket);
     879             :     } else {
     880           0 :         log_error("Closing send socket with unknown netio mode %d", socket->tcp_fi_mode);
     881             :     }
     882             : 
     883             :     //delete the used signal
     884           0 :     netio_signal_close(sd->evloop, sd->signal);
     885           0 :     free(sd->signal);
     886           0 :     free(sd);
     887           0 : }
     888             : 
     889             : void
     890           0 : close_buffered_send_socket(void *ptr)
     891             : {
     892           0 :     log_dbg("Closing buffered_send_socket %p.", ptr);
     893           0 :     struct signal_data* sd = (struct signal_data*)ptr;
     894             : 
     895           0 :     struct netio_buffered_send_socket* socket = (struct netio_buffered_send_socket*)sd->ptr;
     896           0 :     struct netio_send_socket* ssocket = &socket->send_socket;
     897           0 :     if(ssocket->tcp_fi_mode == NETIO_MODE_TCP){
     898           0 :         netio_timer_close(&(ssocket->ctx->evloop), &socket->flush_timer);
     899           0 :         netio_signal_close(&(ssocket->ctx->evloop), &socket->signal_buffer_available);
     900           0 :         shutdown(ssocket->eq_ev_ctx.fd, SHUT_WR);
     901           0 :         netio_signal_close(&ssocket->ctx->evloop, &ssocket->tcp_signal);
     902           0 :         epoll_ctl(ssocket->ctx->evloop.epollfd, EPOLL_CTL_DEL, ssocket->eq_ev_ctx.fd, NULL);
     903           0 :         int rc = close(ssocket->eq_ev_ctx.fd);
     904           0 :         if ( !rc ){ remove_open_fd(&ssocket->ctx->evloop, ssocket->eq_ev_ctx.fd); }
     905           0 :         if(ssocket->cb_connection_closed) {
     906           0 :           ssocket->cb_connection_closed(ssocket);
     907             :         }
     908           0 :     } else if(ssocket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
     909             :         //commented out as closing libfabric sockets relies on exhanges of fi_shutdown
     910             :         //handle_send_socket_shutdown(ssocket);
     911             :     } else {
     912           0 :         log_error("Closing send socket with unkown netio mode %d", ssocket->tcp_fi_mode);
     913             :     }
     914           0 :     for(size_t i=0; i < socket->buffers.num_buffers; ++i ){
     915           0 :         free(socket->buffers.buffers[i]);
     916             :     }
     917             : 
     918             :     //delete the used signal
     919           0 :     netio_signal_close(sd->evloop, sd->signal);
     920           0 :     free(sd->signal);
     921           0 :     free(sd);
     922           0 : }
     923             : 
     924             : void
     925           2 : close_recv_socket(void* ptr)
     926             : {
     927           2 :     log_dbg("Closing recv_socket %p.", ptr);
     928           2 :     struct signal_data* sd = (struct signal_data*)ptr;
     929           2 :     struct netio_recv_socket* socket = (struct netio_recv_socket*)sd->ptr;
     930           2 :     if(socket->tcp_fi_mode == NETIO_MODE_TCP){
     931           2 :         shutdown(socket->eq_ev_ctx.fd, SHUT_RD);
     932             :         // netio_signal_close(&socket->ctx->evloop, &socket->tcp_signal); // causes bad file descriptor
     933           2 :         epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->eq_ev_ctx.fd, NULL);
     934           2 :         int rc = close(socket->eq_ev_ctx.fd);
     935           2 :         if ( !rc ){ remove_open_fd(&socket->ctx->evloop, socket->eq_ev_ctx.fd); }
     936           2 :         if(socket->lsocket->cb_connection_closed) {
     937           0 :           socket->lsocket->cb_connection_closed(socket);
     938             :         }
     939           0 :     } else if(socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
     940             :         //commented out as closing libfabric sockets relies on exhanges of fi_shutdown
     941             :         //handle_recv_socket_shutdown(socket);
     942             :     } else {
     943           0 :         log_error("Closing recv socket with unkown netio mode %d", socket->tcp_fi_mode);
     944             :     }
     945             : 
     946             :     //clean up signal
     947           2 :     netio_signal_close(sd->evloop, sd->signal);
     948           2 :     free(sd->signal);
     949           2 :     free(sd);
     950           2 : }
     951             : 
     952             : void
     953           0 : close_buffered_recv_socket(void* ptr)
     954             : {
     955           0 :     log_info("Closing buffered_recv_socket %p. Not supported anymore", ptr);
     956           0 :     struct signal_data* sd = (struct signal_data*)ptr;
     957             :     /*
     958             :     struct netio_buffered_recv_socket* socket = (struct netio_buffered_recv_socket*)sd->ptr;
     959             :     struct netio_recv_socket* rsocket = &socket->recv_socket;
     960             :     if(rsocket->tcp_fi_mode == NETIO_MODE_TCP){
     961             :         shutdown(rsocket->eq_ev_ctx.fd, SHUT_RD);
     962             :         netio_signal_close(&rsocket->ctx->evloop, &rsocket->tcp_signal);
     963             :         epoll_ctl(rsocket->ctx->evloop.epollfd, EPOLL_CTL_DEL, rsocket->eq_ev_ctx.fd, NULL);
     964             :         int rc = close(rsocket->eq_ev_ctx.fd);
     965             :         if ( !rc ){ remove_open_fd(&rsocket->ctx->evloop.openfds, rsocket->eq_ev_ctx.fd); }
     966             :     } else if (rsocket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
     967             :         handle_recv_socket_shutdown(rsocket);
     968             :     } else {
     969             :         log_error("Closing buffered recv socket with unkown netio mode %d", rsocket->tcp_fi_mode);
     970             :     }
     971             :     free(socket->pages);
     972             :     */
     973             :     //clean up signal
     974           0 :     netio_signal_close(sd->evloop, sd->signal);
     975           0 :     free(sd->signal);
     976           0 :     free(sd);
     977           0 : }
     978             : 
     979             : 
     980             : void
     981          41 : close_listen_socket(void* ptr)
     982             : {
     983          41 :     log_dbg("close_listen_socket");
     984          41 :     struct signal_data* sd = (struct signal_data*)ptr;
     985          41 :     struct netio_listen_socket* socket = (struct netio_listen_socket*)sd->ptr;
     986             : 
     987          41 :     if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
     988          41 :         if(socket->recv_sockets != NULL){
     989             :             struct netio_socket_list* entry = socket->recv_sockets;
     990             :             int still_open = 0;
     991           0 :             while(entry != NULL){
     992           0 :                 struct netio_recv_socket* recv_socket = entry->socket;
     993           0 :                 still_open += check_open_fd_exists(&socket->ctx->evloop.openfds, recv_socket->eqfd);
     994           0 :                 entry = entry->next;
     995             :             }
     996           0 :             if(still_open){
     997           0 :                 netio_signal_fire(sd->signal);
     998           0 :                 return;
     999             :             }
    1000           0 :             free_socket_list(&socket->recv_sockets);
    1001             :         }
    1002          41 :         handle_listen_socket_shutdown(socket);
    1003             :     }
    1004           0 :     else if (socket->tcp_fi_mode == NETIO_MODE_TCP){
    1005           0 :         if(socket->recv_sockets != NULL){
    1006             :             struct netio_socket_list* entry = socket->recv_sockets;
    1007           0 :             while(entry != NULL){
    1008           0 :                 struct netio_recv_socket* recv_socket = entry->socket;
    1009           0 :                 shutdown(recv_socket->eq_ev_ctx.fd, SHUT_RDWR);
    1010           0 :                 close(recv_socket->eq_ev_ctx.fd);
    1011           0 :                 remove_open_fd(&socket->ctx->evloop, recv_socket->eq_ev_ctx.fd);
    1012           0 :                 entry = entry->next;
    1013             :             }
    1014           0 :             free_socket_list(&socket->recv_sockets);
    1015             :         }
    1016             :     }
    1017             :     else {
    1018           0 :         log_error("Connection mode unsupported");
    1019             :     }
    1020             : 
    1021             :     //clean up signal
    1022          41 :     netio_signal_close(sd->evloop, sd->signal);
    1023          41 :     free(sd->signal);
    1024          41 :     free(sd);
    1025             : }
    1026             : 
    1027             : 
    1028             : 
    1029             : void
    1030          61 : close_buffered_listen_socket(void* ptr)
    1031             : {
    1032          61 :     log_dbg("close_buffered_listen_socket");
    1033          61 :     struct signal_data* sd = (struct signal_data*)ptr;
    1034          61 :     struct netio_listen_socket* socket = (struct netio_listen_socket*)sd->ptr;
    1035             : 
    1036          61 :     if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
    1037          61 :         if(socket->recv_sockets != NULL){
    1038             :             struct netio_socket_list* entry = socket->recv_sockets;
    1039             :             int still_open = 0;
    1040           0 :             while(entry != NULL){
    1041           0 :                 struct netio_buffered_recv_socket* recv_socket = entry->socket;
    1042           0 :                 still_open += check_open_fd_exists(&socket->ctx->evloop.openfds, recv_socket->recv_socket.eqfd);
    1043           0 :                 entry = entry->next;
    1044             :             }
    1045           0 :             if(still_open){
    1046           0 :                 netio_signal_fire(sd->signal);
    1047           0 :                 return;
    1048             :             }
    1049           0 :             free_socket_list(&socket->recv_sockets);
    1050             :         }
    1051          61 :         handle_listen_socket_shutdown(socket);
    1052             :     }
    1053           0 :     else if (socket->tcp_fi_mode == NETIO_MODE_TCP){
    1054           0 :         if(socket->recv_sockets != NULL){
    1055             :             struct netio_socket_list* entry = socket->recv_sockets;
    1056           0 :             while(entry != NULL){
    1057           0 :                 struct netio_buffered_recv_socket* recv_socket = entry->socket;
    1058           0 :                 shutdown(recv_socket->recv_socket.eqfd, SHUT_RDWR);
    1059           0 :                 close(recv_socket->recv_socket.eqfd);
    1060           0 :                 remove_open_fd(&socket->ctx->evloop, recv_socket->recv_socket.eqfd);
    1061           0 :                 entry = entry->next;
    1062             :             }
    1063           0 :             free_socket_list(&socket->recv_sockets);
    1064             :         }
    1065             :     }
    1066             :     else {
    1067           0 :         log_error("Connection mode unsupported");
    1068             :     }
    1069             : 
    1070             :     //clean up signal
    1071          61 :     netio_signal_close(sd->evloop, sd->signal);
    1072          61 :     free(sd->signal);
    1073          61 :     free(sd);
    1074             : }
    1075             : 
    1076             : 
    1077             : void
    1078         153 : handle_listen_socket_shutdown(struct netio_listen_socket* socket)
    1079             : {
    1080         153 :     if(socket->eqfd < 0){return;}//nothing to do
    1081         153 :     struct epoll_event ep_event; /* needed only for kernel <2.6.9  */
    1082         153 :     log_dbg("Handle_listen_socket_shutdown. Lsocket EQ: %d with evloop %d socket %p", socket->eqfd, socket->ctx->evloop.epollfd, socket);
    1083         153 :     int ret = epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->eqfd, &ep_event);
    1084         153 :     if(ret){ log_warn("Cannot deregister listen socket EQ %d from evloop %d", socket->eqfd, socket->ctx->evloop.epollfd); }
    1085             : 
    1086         153 :     if(socket->pep != NULL){
    1087         153 :        fi_close(&socket->pep->fid);
    1088         153 :        socket->pep = NULL;
    1089             :     }
    1090             : 
    1091         153 :     log_dbg("netio_listen_socket: removing EQ fd %d from evloop %d, ret %d", socket->eqfd, socket->ctx->evloop.epollfd,  ret);
    1092         153 :     if(socket->eq && (ret = fi_close(&socket->eq->fid)))
    1093             :     {
    1094           0 :         log_error("Failed to close listen socket %d: %s", ret, fi_strerror(-ret));
    1095           0 :         ret = close(socket->eq_ev_ctx.fd);
    1096           0 :         if ( ret ) {log_warn("Cannot close listen socket EQ fd %d: %s", socket->eq_ev_ctx.fd, strerror(errno));}
    1097             :     }
    1098         153 :     remove_open_fd(&socket->ctx->evloop, socket->eqfd);
    1099         153 :     socket->eqfd = -1;
    1100             : 
    1101         153 :     if(socket->fi != NULL){
    1102         153 :         fi_freeinfo(socket->fi);
    1103         153 :         socket->fi = NULL;
    1104             :     }
    1105             :     //fi_close(&socket->fabric->fid);
    1106             : }
    1107             : 
    1108             : 
    1109             : //This function is the TCP equivalent of on_send_socket_cm_event()
    1110             : void
    1111         117 : on_send_socket_tcp_cm_event(int fd, void* ptr)
    1112             : {
    1113         117 :   int ret, retVal;
    1114         117 :   socklen_t in_len;
    1115         117 :   struct netio_send_socket* socket = (struct netio_send_socket*)ptr;
    1116             : 
    1117         117 :   log_dbg("connection event called with fd = %d", fd);
    1118             : 
    1119         117 :   in_len = sizeof(retVal);
    1120         117 :   ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &retVal, &in_len);
    1121         117 :   if (ret < 0)
    1122             :   {
    1123           0 :     log_error("on_send_socket_tcp_cm_event: getsockopt failed", ret);
    1124             :   }
    1125             : 
    1126         117 :   if (retVal != 0)
    1127             :   {
    1128           0 :     log_error("on_send_socket_tcp_cm_event: connect did not work. retVal = %d, ret = %d, errno = %d", retVal, ret, errno);
    1129             :   }
    1130             : 
    1131             :   //MJ: The log_error above sometimes returns "retVal = 111, ret = 0, errno = 115". This error indicates that the receiving side temporarily
    1132             :   //MJ: cannot acknowledge the connection request.
    1133             :   //MJ: Should we somehow retry the connection request in netio or leave it to the user to redo it. In the latter case we have to give a clear error back to the user
    1134             :   //MJ: Experience shows that just waiting a few seconds and then retrying actually helps.
    1135             : 
    1136         117 :   log_dbg("Register the callback for on_send_completed. Using TCP socket %d", fd);
    1137         117 :   socket->cq_ev_ctx.fd   = fd;
    1138         117 :   socket->cq_ev_ctx.data = socket;
    1139         117 :   socket->cq_ev_ctx.cb   = on_send_socket_tcp_cq_event;
    1140         117 :   netio_register_write_tcp_fd(&socket->ctx->evloop, &socket->cq_ev_ctx);
    1141             : 
    1142             :   //MJ: In case of the TCP instruction chain "socket()->connect()->write()/recv()" one is always using the same FD.
    1143             :   //MJ: therefore we don't know if a connect() or a write()/recv() has completed when we receive a trigger from this FD.
    1144             :   //MJ: Therefore the code below may not work because cb_connection_established() will also get called on write()/recv()
    1145         117 :   socket->state = CONNECTED;
    1146             : 
    1147         117 :   if(socket->cb_connection_established)
    1148             :   {
    1149         117 :     log_dbg("Calling the callback that is registered for cb_connection_established");
    1150         117 :     socket->cb_connection_established(socket);
    1151             :   }
    1152         117 : }
    1153             : 
    1154             : 
    1155             : //This function is the TCP equivalent of on_listen_socket_cm_event()
    1156             : void
    1157          42 : on_listen_socket_tcp_cm_event(int fd, void* ptr)
    1158             : {
    1159          42 :   int infd;
    1160          42 :   socklen_t in_len;
    1161          42 :   struct sockaddr in_addr;
    1162          42 :   log_info("on_listen_socket_tcp_cm_event: called with fd = %d", fd);
    1163             : 
    1164          42 :   struct netio_listen_socket* lsocket = (struct netio_listen_socket*)ptr;
    1165          42 :   in_len = sizeof in_addr;
    1166          42 :   infd = accept(fd, &in_addr, &in_len);
    1167             : 
    1168          42 :   log_dbg("on_listen_socket_tcp_cm_event: listen socket: infd = %d", infd);
    1169          42 :   if (infd == -1) {
    1170           0 :     if ((errno == EAGAIN) || (errno == EWOULDBLOCK))      // We have processed all incoming connections.
    1171             :       log_dbg("on_listen_socket_tcp_cm_event: nothing to do?");
    1172             :     else
    1173             :     {
    1174           0 :       log_dbg("on_listen_socket_tcp_cm_event: ERROR. errno = %d", errno);
    1175           0 :       exit(-1);
    1176             :     }
    1177             :   }
    1178             : 
    1179             : #if defined DEBUG || defined DEBUG_CM
    1180             :   char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
    1181             :   int ret = getnameinfo(&in_addr, in_len, hbuf, sizeof hbuf, sbuf, sizeof sbuf, 0);
    1182             :   if (ret == 0) {
    1183             :     log_dbg("on_listen_socket_tcp_cm_event: Accepted connection on descriptor %d (host=%s, service=%s)", infd, hbuf, sbuf);
    1184             :   }
    1185             : #endif
    1186             : 
    1187          42 :   make_socket_non_blocking(infd);            // Make the incoming socket non-blocking
    1188          42 :   log_dbg("Adding TCP/IP recv socket to listen socket");
    1189          42 :   struct netio_socket_list* new_entry = add_socket(&lsocket->recv_sockets, URECV);
    1190          42 :   struct netio_recv_socket* rsocket = (struct netio_recv_socket*)new_entry->socket;
    1191          42 :   netio_init_recv_tcp_socket(rsocket, lsocket);
    1192             : 
    1193          42 :   log_dbg("Adding receive socket %d to epoll", infd);
    1194          42 :   rsocket->eq_ev_ctx.fd = infd;
    1195          42 :   rsocket->eq_ev_ctx.data = rsocket;
    1196          42 :   rsocket->eq_ev_ctx.cb = on_recv_socket_tcp_cm_event;
    1197          42 :   netio_register_read_tcp_fd(&rsocket->ctx->evloop, &rsocket->eq_ev_ctx);
    1198             : 
    1199          42 :   if(lsocket->recv_sub_msg == 1){
    1200           2 :     rsocket->sub_msg_buffers = malloc(32*sizeof(struct netio_buffer*));
    1201          66 :     for (int i = 0; i < 32; i++){
    1202          64 :       rsocket->sub_msg_buffers[i] = malloc(sizeof(struct netio_buffer));
    1203          64 :       rsocket->sub_msg_buffers[i]->size = sizeof(struct netio_subscription_message);
    1204          64 :       rsocket->sub_msg_buffers[i]->data = malloc(rsocket->sub_msg_buffers[i]->size);
    1205          64 :       netio_post_recv(rsocket, rsocket->sub_msg_buffers[i]);
    1206             :     }
    1207             :     log_dbg("Posted recv for subscription messages");
    1208          40 :   } else if (lsocket->attr.num_buffers > 0) {
    1209          40 :     log_dbg("Connection established, posting %lu buffers", lsocket->attr.num_buffers);
    1210          40 :     rsocket->sub_msg_buffers = NULL;
    1211          40 :     rsocket->recv_buffers = malloc(lsocket->attr.num_buffers * sizeof(struct netio_buffer));
    1212       10280 :     for(unsigned i=0; i<lsocket->attr.num_buffers; i++) {
    1213       10240 :       log_trc("registering buffer of size %lu", lsocket->attr.buffer_size);
    1214       10240 :       rsocket->recv_buffers[i].size = lsocket->attr.buffer_size;
    1215       10240 :       rsocket->recv_buffers[i].data = malloc(lsocket->attr.buffer_size);
    1216       10240 :       netio_post_recv(rsocket, &rsocket->recv_buffers[i]);
    1217             :     }
    1218             :   } else {
    1219           0 :     log_error("Something went wrong. Not allocating any buffers for recv socket.");
    1220             :   }
    1221             : 
    1222          42 :   on_recv_socket_tcp_cm_event(infd, (void*)rsocket);
    1223          42 :   log_dbg("Done. connection accepted");
    1224          42 : }
    1225             : 
    1226             : 
    1227             : void
    1228          58 : on_buffered_recv_socket_tcp_cm_event(int fd, void* ptr)
    1229             : {
    1230          58 :   log_info("Connection event on buffered TCP/IP recv  (FD = %d, ptr = %p)", fd, ptr);
    1231          58 :     struct netio_buffered_recv_socket *socket = (struct netio_buffered_recv_socket*)ptr;
    1232             : 
    1233          58 :   socket->recv_socket.cq_ev_ctx.fd = fd;
    1234          58 :   socket->recv_socket.cq_ev_ctx.data = socket;
    1235          58 :   socket->recv_socket.cq_ev_ctx.cb = on_recv_socket_tcp_cq_event;
    1236          58 :   netio_register_read_tcp_fd(&socket->recv_socket.ctx->evloop, &socket->recv_socket.cq_ev_ctx);
    1237             : 
    1238          58 :   log_dbg("posting %d buffers", socket->num_pages);
    1239        7958 :   for(unsigned int i=0; i<socket->num_pages; i++) {
    1240        7900 :      netio_post_recv(&socket->recv_socket, &socket->pages[i]);
    1241             :   }
    1242             : 
    1243          58 :   if(socket->lsocket->cb_connection_established) {
    1244          58 :     log_dbg("Calling user callback");
    1245          58 :     socket->lsocket->cb_connection_established(socket);
    1246             :   } else{
    1247           0 :     log_warn("no callback available");
    1248             :   }
    1249          58 : }
    1250             : 
    1251             : 
    1252             : //This function is the TCP equivalent of on_buffered_listen_socket_cm_event()
    1253             : void
    1254          58 : on_buffered_listen_socket_tcp_cm_event(int fd, void* ptr)
    1255             : {
    1256          58 :   int infd;
    1257          58 :   socklen_t in_len;
    1258          58 :   struct sockaddr in_addr;
    1259          58 :   log_info("on_buffered_listen_socket_tcp_cm_event: called with fd = %d", fd);
    1260             : 
    1261          58 :   struct netio_buffered_listen_socket* lsocket = (struct netio_buffered_listen_socket*)ptr;
    1262          58 :   in_len = sizeof in_addr;
    1263          58 :   infd = accept(fd, &in_addr, &in_len);
    1264             : 
    1265          58 :   log_info("on_buffered_listen_socket_tcp_cm_event: listen socket: infd = %d", infd);
    1266          58 :   if (infd == -1)
    1267             :   {
    1268           0 :     if ((errno == EAGAIN) || (errno == EWOULDBLOCK))      // We have processed all incoming connections.
    1269             :       log_dbg("on_buffered_listen_socket_tcp_cm_event: nothing to do?");
    1270             :     else
    1271             :     {
    1272           0 :       log_dbg("on_buffered_listen_socket_tcp_cm_event: ERROR. errno = %d", errno);
    1273           0 :       exit(-1);
    1274             :     }
    1275             :   }
    1276             : 
    1277             : #if defined DEBUG || defined DEBUG_CM
    1278             :   char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
    1279             :   int ret = getnameinfo(&in_addr, in_len, hbuf, sizeof hbuf, sbuf, sizeof sbuf, 0);
    1280             :   if (ret == 0) {
    1281             :     log_dbg("on_buffered_listen_socket_tcp_cm_event: Accepted connection on descriptor %d (host=%s, service=%s)", infd, hbuf, sbuf);
    1282             :   }
    1283             : #endif
    1284             : 
    1285          58 :   make_socket_non_blocking(infd);            // Make the incoming socket non-blocking
    1286          58 :   log_dbg("Adding TCP/IP recv socket to listen socket");
    1287          58 :   struct netio_socket_list* new_entry = add_socket(&(lsocket->listen_socket.recv_sockets), BRECV);
    1288          58 :   struct netio_buffered_recv_socket* rsocket = (struct netio_buffered_recv_socket*)new_entry->socket;
    1289          58 :   netio_buffered_recv_tcp_socket_init(rsocket, lsocket);
    1290             : 
    1291          58 :   log_dbg("Adding receive socket %d to epoll", infd);
    1292          58 :   rsocket->recv_socket.eq_ev_ctx.fd = infd;
    1293          58 :   rsocket->recv_socket.eq_ev_ctx.data = rsocket;
    1294          58 :   rsocket->recv_socket.eq_ev_ctx.cb = on_buffered_recv_socket_tcp_cm_event;
    1295          58 :   netio_register_read_tcp_fd(&rsocket->recv_socket.ctx->evloop, &rsocket->recv_socket.eq_ev_ctx);
    1296             : 
    1297        7958 :   for(unsigned int i=0; i<rsocket->num_pages; i++) {
    1298        7900 :     netio_post_recv(&rsocket->recv_socket, &rsocket->pages[i]);
    1299             :   }
    1300             : 
    1301          58 :   on_buffered_recv_socket_tcp_cm_event(infd, (void*)rsocket);
    1302          58 :   log_dbg("Connection accepted");
    1303          58 : }
    1304             : 
    1305             : 
    1306             : //This is the equivalent of on_recv_socket_cm_event
    1307             : void
    1308          42 : on_recv_socket_tcp_cm_event(int fd, void *ptr)
    1309             : {
    1310          42 :   log_dbg("on_recv_socket_tcp_cm_event: called with fd = %d and ptr = %p", fd, ptr);
    1311          42 :   struct netio_recv_socket *socket = (struct netio_recv_socket*)ptr;
    1312          42 :   socket->cq_ev_ctx.fd = fd;
    1313          42 :   socket->cq_ev_ctx.data = socket;
    1314          42 :   socket->cq_ev_ctx.cb = on_recv_socket_tcp_cq_event;
    1315          42 :   netio_register_read_tcp_fd(&socket->ctx->evloop, &socket->cq_ev_ctx);
    1316          42 :   if(socket->lsocket->cb_connection_established)
    1317             :   {
    1318          42 :     log_dbg("Calling user callback");
    1319          42 :     socket->lsocket->cb_connection_established(socket);
    1320             :   }  else {
    1321           0 :     log_warn("no callback available");
    1322             :   }
    1323          42 : }

Generated by: LCOV version 1.0