LCOV - code coverage report
Current view: top level - src - connection_event.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 373 637 58.6 %
Date: 2025-11-26 02:09:04 Functions: 18 23 78.3 %

          Line data    Source code
       1             : #include <stdio.h>
       2             : #include <unistd.h>
       3             : #include "log.h"
       4             : #include "netio/netio.h"
       5             : #include "netio/netio_tcp.h"
       6             : #include <sys/types.h>
       7             : #include <sys/socket.h>
       8             : #include <netdb.h>
       9             : 
      10             : #include "connection_event.h"
      11             : #include "completion_event.h"
      12             : #include "log.h"
      13             : 
      14             : #if defined DEBUG || defined DEBUG_CM
      15             : #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
      16             : #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
      17             : #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
      18             : #else
      19             : #define log_dbg(...)
      20             : #define log_trc(...)
      21             : #endif
      22             : 
      23             : #define FATAL(msg, c) \
      24             : do { \
      25             :     log_fatal("%s %d: %s", msg, c, fi_strerror(-c)); \
      26             :     exit(2); \
      27             : } while(0);
      28             : 
      29             : 
      30             : // STATIC FUNCTIONS ////////////////////////////////////////////////////////////
      31             : 
      32             : static int
      33         251 : read_cm_event(struct fid_eq* eq, struct fi_info** info, struct fi_eq_err_entry* err_entry)
      34             : {
      35             :     uint32_t event;
      36             :     struct fi_eq_cm_entry entry;
      37             : 
      38             :     ssize_t rd = fi_eq_sread(eq, &event, &entry, sizeof entry, 0, 0);
      39         251 :     if(rd < 0)
      40             :     {
      41          41 :         if(rd == -FI_EAGAIN)
      42             :         {
      43          37 :             return rd;
      44             :         }
      45           4 :         if(rd == -FI_EAVAIL)
      46             :         {
      47             :             int r;
      48           4 :             if((r = fi_eq_readerr(eq, err_entry, 0)) < 0)
      49             :             {
      50           0 :                 FATAL("Failed to retrieve details on Event Queue error", r);
      51             :             }
      52           4 :             log_error("Event Queue error: %s (code: %d), provider specific: %s (code: %d)",
      53             :                 fi_strerror(err_entry->err), err_entry->err,
      54             :                 fi_eq_strerror(eq, err_entry->prov_errno, err_entry->err_data, NULL, 0),
      55             :                 err_entry->prov_errno);
      56           4 :             return rd;
      57             :         }
      58             :     }
      59         210 :     if (rd != sizeof entry)
      60             :     {
      61           0 :         FATAL("Failed to read from Event Queue: %d", (int)rd);
      62             :     }
      63             : 
      64         210 :     if(info != NULL)
      65          56 :         *info = entry.info;
      66             : 
      67         210 :     return event;
      68             : }
      69             : 
      70             : 
      71             : static void
      72          56 : handle_connreq(struct netio_recv_socket* rsocket, struct netio_listen_socket* lsocket, struct fi_info *info, void (*cb)(int,void*), void* cbdata)
      73             : {
      74             :     int ret;
      75          56 :     struct fi_eq_attr eq_attr = {10000, 0, FI_WAIT_FD, 0, 0};
      76             : 
      77          56 :     if((ret = fi_domain(lsocket->fabric, info, &rsocket->domain, NULL)))
      78             :     {
      79           0 :         fi_reject(lsocket->pep, info->handle, NULL, 0);
      80           0 :         FATAL("Listen socket, cannot open fabric, error ", ret);
      81             :     }
      82             : 
      83          56 :     if((ret = fi_endpoint(rsocket->domain, info, &rsocket->ep, NULL)))
      84             :     {
      85           0 :         fi_reject(lsocket->pep, info->handle, NULL, 0);
      86           0 :         FATAL("Listen socket, cannot open endpoint, error ", ret);
      87             :     }
      88             : 
      89             :     /* Create a new event queue for the new active socket */
      90          56 :     if((ret = fi_eq_open(lsocket->fabric, &eq_attr, &rsocket->eq, NULL)))
      91             :     {
      92           0 :         fi_reject(lsocket->pep, info->handle, NULL, 0);
      93           0 :         FATAL("Listen socket, cannot open Event Queue, error ", ret);
      94             :     }
      95             : 
      96          56 :     if((ret = fi_ep_bind((rsocket->ep), &rsocket->eq->fid, 0)))
      97             :     {
      98           0 :         fi_reject(lsocket->pep, info->handle, NULL, 0);
      99           0 :         FATAL("Listen socket, cannot bind Event Queue to endpoint, error ", ret);
     100             :     }
     101             : 
     102          56 :     if((ret = fi_control(&rsocket->eq->fid, FI_GETWAIT, &rsocket->eqfd)))
     103             :     {
     104           0 :         fi_reject(lsocket->pep, info->handle, NULL, 0);
     105           0 :         FATAL("Listen socket failed to obtain Event Queue wait object", ret);
     106             :     }
     107          56 :     rsocket->eq_ev_ctx.fd = rsocket->eqfd;
     108          56 :     rsocket->eq_ev_ctx.data = cbdata;
     109          56 :     rsocket->eq_ev_ctx.cb = cb;
     110             : 
     111             :     log_dbg("Adding RECV EQ polled fid %d %p", rsocket->eqfd, &rsocket->eq->fid);
     112          56 :     add_polled_fid(&rsocket->ctx->evloop.pfids, lsocket->fabric, &rsocket->eq->fid, rsocket->eqfd, &rsocket, cb);
     113          56 :     add_open_fd(&rsocket->ctx->evloop.openfds, rsocket->eqfd, NETIO_EQ, URECV, &rsocket);
     114          56 :     netio_register_read_fd(&rsocket->ctx->evloop, &rsocket->eq_ev_ctx);
     115             :     log_dbg("recv_socket: EQ fd %d waiting for connection", rsocket->eqfd);
     116             : 
     117             :     struct fi_cq_attr cq_attr;
     118          56 :     cq_attr.size = NETIO_MAX_CQ_ENTRIES;      /* # entries for CQ */
     119          56 :     cq_attr.flags = 0;     /* operation flags */
     120          56 :     cq_attr.format = FI_CQ_FORMAT_DATA;    /* completion format */
     121          56 :     cq_attr.wait_obj= FI_WAIT_FD;  /* requested wait object */
     122          56 :     cq_attr.signaling_vector = 0; /* interrupt affinity */
     123          56 :     cq_attr.wait_cond = FI_CQ_COND_NONE; /* wait condition format */
     124          56 :     cq_attr.wait_set = NULL;  /* optional wait set */
     125             : 
     126             :     //FI_RECV CQ
     127          56 :     if((ret = fi_cq_open(rsocket->domain, &cq_attr, &rsocket->cq, NULL)))
     128             :     {
     129           0 :         fi_reject(lsocket->pep, info->handle, NULL, 0);
     130           0 :         FATAL("Listen socket, cannot open Completion Queue, error ", ret);
     131             :     }
     132             : 
     133          56 :     if((ret = fi_ep_bind((rsocket->ep), &rsocket->cq->fid, FI_RECV)))
     134             :     {
     135           0 :         fi_reject(lsocket->pep, info->handle, NULL, 0);
     136           0 :         FATAL("Listen socket, cannot bind Completion Queue to endpoint, error ", ret);;
     137             :     }
     138             : 
     139             :     //FI_TRANSMIT CQ - also necessary
     140          56 :     cq_attr.format = FI_CQ_FORMAT_UNSPEC;
     141          56 :     cq_attr.wait_obj= FI_WAIT_NONE;
     142          56 :     if((ret = fi_cq_open(rsocket->domain, &cq_attr, &rsocket->tcq, NULL)))
     143             :     {
     144           0 :         fi_reject(lsocket->pep, info->handle, NULL, 0);
     145           0 :         FATAL("Listen socket, cannot open Completion Queue, error ", ret);
     146             :     }
     147             : 
     148          56 :     if((ret = fi_ep_bind((rsocket->ep), &rsocket->tcq->fid, FI_TRANSMIT)))
     149             :     {
     150           0 :         fi_reject(lsocket->pep, info->handle, NULL, 0);
     151           0 :         FATAL("Listen socket, cannot bind Completion Queue to endpoint, error ", ret);;
     152             :     }
     153             : 
     154          56 :     if((ret = fi_enable(rsocket->ep)))
     155             :     {
     156           0 :         fi_reject(lsocket->pep, info->handle, NULL, 0);
     157           0 :         FATAL("Listen socket, cannot enable recv socket endpoint, error ", ret);
     158             :     }
     159             : 
     160          56 :     if((ret = fi_accept(rsocket->ep, NULL, 0)))
     161             :     {
     162           0 :         fi_reject(lsocket->pep, info->handle, NULL, 0);
     163           0 :         FATAL("Listen socket, connection rejected, error ", ret);;
     164             :     }
     165             :     log_dbg("connection accepted. Lsocket EQ: %d with evloop %d, rsocket EQ %d with evloop %d", lsocket->eqfd, lsocket->ctx->evloop.epollfd, rsocket->eqfd, rsocket->ctx->evloop.epollfd);
     166          56 : }
     167             : 
     168             : 
     169             : void
     170          34 : handle_recv_socket_shutdown(struct netio_recv_socket* socket)
     171             : {
     172          34 :     if(socket->eqfd < 0){
     173           0 :         log_info("handle_recv_socket_shutdown on closed socket (eqfd %d)", socket->eqfd);
     174           0 :         return;
     175             :     }
     176             :     int ret = 0;
     177             :     struct epoll_event ep_event; /* needed only for kernel <2.6.9  */
     178             :     log_dbg("Handle_recv_socket_shutdown for socket %p, evloop: %d", socket, socket->ctx->evloop.epollfd);
     179             : 
     180          34 :     if(socket->ep && (ret = fi_close(&socket->ep->fid)))
     181             :     {
     182           0 :         log_error("Failed to close recv socket Endpoint %d: %s", ret, fi_strerror(-ret));
     183             :     }
     184             :     log_dbg("fi_close done for endpoint.");
     185             : 
     186             :     //EQ
     187          34 :     remove_polled_fid(&socket->ctx->evloop.pfids, socket->eq_ev_ctx.fd);
     188          34 :     ret = epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->eq_ev_ctx.fd, &ep_event);
     189          34 :     if ( ret ){ log_warn("netio_recv_socket: cannot remove EQ fd %d from evloop %d: %s", socket->eq_ev_ctx.fd, socket->ctx->evloop.epollfd, strerror(errno));}
     190             :     log_dbg("netio_recv_socket: removing EQ fd %d from evloop %d", socket->eq_ev_ctx.fd, socket->ctx->evloop.epollfd);
     191          34 :     if(socket->eq && (ret = fi_close(&socket->eq->fid)))
     192             :     {
     193           0 :         log_error("Failed to fi_close recv socket Event Queue %d: %s", ret, fi_strerror(-ret));
     194           0 :         ret = close(socket->eq_ev_ctx.fd);
     195           0 :         if ( ret ) {log_warn("Cannot close recv socket EQ fd %d: %s", socket->eq_ev_ctx.fd, strerror(errno));}
     196             :     }
     197          34 :     remove_open_fd(&socket->ctx->evloop, socket->eq_ev_ctx.fd);
     198          34 :     socket->eqfd = -1;
     199             : 
     200             :     //FI_RECV CQ
     201          34 :     remove_polled_fid(&socket->ctx->evloop.pfids, socket->cq_ev_ctx.fd);
     202          34 :     ret = epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->cq_ev_ctx.fd, &ep_event);
     203          34 :     if ( ret ){ log_warn("netio_recv_socket: cannot remove CQ fd %d from evloop %d: %s", socket->cq_ev_ctx.fd, socket->ctx->evloop.epollfd, strerror(errno));}
     204             :     log_dbg("netio_recv_socket: removing CQ fd %d from evloop %d", socket->cq_ev_ctx.fd, socket->ctx->evloop.epollfd);
     205          34 :     if(socket->cq && (ret = fi_close(&socket->cq->fid)))
     206             :     {
     207           0 :         log_error("Failed to close recv socket Completion Queue %d: %s", ret, fi_strerror(-ret));
     208           0 :         ret = close(socket->cq_ev_ctx.fd);
     209           0 :         if ( ret ) {log_warn("Cannot close recv socket CQ fd %d: %s", socket->cq_ev_ctx.fd, strerror(errno));}
     210             :     }
     211          34 :     remove_open_fd(&socket->ctx->evloop, socket->cq_ev_ctx.fd);
     212          34 :     socket->cqfd = -1;
     213             :     //FI_TRANSMIT CQ
     214          34 :     if((ret = fi_close(&socket->tcq->fid)))
     215             :     {
     216           0 :         log_error("Failed to close FI_TRANSMIT recv socket Completion Queue %d: %s", ret, fi_strerror(-ret));
     217             :     }
     218             : 
     219             :     //MR
     220          34 :     uint16_t mem_regions = socket->reg_mr;
     221        7308 :     for(uint16_t i=0; i<mem_regions; ++i)
     222             :     {
     223        7274 :         if ((ret = fi_close(socket->mr[i]))){
     224           0 :             log_warn("Failed to close recv Memory Region %d, error %d.", i, ret);
     225             :         }
     226             :         else{
     227        7274 :             socket->reg_mr-=1;
     228             :         }
     229             :     }
     230          34 :     if(socket->reg_mr==0){free(socket->mr);socket->mr=NULL;}
     231          34 :     if((ret = fi_close(&socket->domain->fid)))
     232             :     {
     233           0 :         log_error("Failed to close recv socket Domain %d: %s", ret, fi_strerror(-ret));
     234             :     }
     235          34 :     if (socket->sub_msg_buffers != NULL){
     236          99 :         for(int i = 0; i < 32; i++){
     237          96 :             free(socket->sub_msg_buffers[i]->data);
     238          96 :             free(socket->sub_msg_buffers[i]);
     239             :         }
     240           3 :         free(socket->sub_msg_buffers);
     241           3 :         socket->sub_msg_buffers = NULL;
     242             :     }
     243             : }
     244             : 
     245             : void
     246           6 : handle_send_socket_shutdown(struct netio_send_socket* socket)
     247             : {
     248             :     // if(socket->state != CONNECTED){
     249             :     //     log_dbg("Nothing to do, send socket has already been freed....");
     250             :     //     return;
     251             :     // }
     252             :     int ret = 0;
     253             :     struct epoll_event ep_event; /* needed only for kernel <2.6.9  */
     254             :     log_dbg("Handle_send_socket_shutdown. Socket with EQ: %d", socket->eqfd);
     255             : 
     256           6 :     if(socket->ep && (ret = fi_close(&socket->ep->fid)))
     257             :     {
     258           0 :         log_error("Failed to close send socket Endpoint %d: %s", ret, fi_strerror(-ret));
     259             :     }
     260           6 :     socket->ep = NULL;
     261             : 
     262             :     //EQ
     263             :     log_dbg("netio_send_socket: removing EQ fd %d from evloop %d", socket->eqfd, socket->ctx->evloop.epollfd);
     264           6 :     remove_polled_fid(&socket->ctx->evloop.pfids, socket->eqfd);
     265           6 :     ret = epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->eqfd, &ep_event);
     266           6 :     if ( ret ){ log_warn("netio_send_socket: cannot remove EQ fd %d from evloop %d: %s", socket->eqfd, socket->ctx->evloop.epollfd, strerror(errno));}
     267           6 :     if(socket->eq && (ret = fi_close(&socket->eq->fid)))
     268             :     {
     269           0 :         log_error("Failed to close send socket Event Queue %d: %s", ret, fi_strerror(-ret));
     270           0 :         ret = close(socket->eq_ev_ctx.fd);
     271           0 :         if ( ret ) { log_warn("Cannot close send socket eq fd %d, %s", socket->eq_ev_ctx.fd, strerror(errno));}
     272             :     }
     273           6 :     remove_open_fd(&socket->ctx->evloop, socket->eqfd);
     274           6 :     socket->eqfd = -1;
     275             : 
     276             :     //FI_TRANSMIT CQ
     277             :     log_dbg("netio_send_socket: removing CQ fd %d from evloop %d", socket->cq_ev_ctx.fd, socket->ctx->evloop.epollfd);
     278           6 :     remove_polled_fid(&socket->ctx->evloop.pfids, socket->cqfd);
     279           6 :     ret = epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->cqfd, &ep_event);
     280           6 :     if ( ret ){ log_warn("netio_send_socket: cannot remove CQ fd %d from evloop %d: %s", socket->cqfd, socket->ctx->evloop.epollfd, strerror(errno));}
     281           6 :     if(socket->cq && (ret = fi_close(&socket->cq->fid)))
     282             :     {
     283           0 :         log_error("Failed to close send socket Completion Queue %d: %s", ret, fi_strerror(-ret));
     284           0 :         ret = close(socket->cq_ev_ctx.fd);
     285           0 :         if ( ret ){ log_warn("Could not close send socket cq fd %d: %s", socket->cq_ev_ctx.fd, strerror(errno));}
     286             :     }
     287           6 :     remove_open_fd(&socket->ctx->evloop, socket->cqfd);
     288           6 :     socket->cqfd = -1;
     289             :     //FI_RECV CQ
     290           6 :     if((ret = fi_close(&socket->rcq->fid)))
     291             :     {
     292           0 :         log_error("Failed to close FI_RECV senf socket Completion Queue %d: %s", ret, fi_strerror(-ret));
     293             :     }
     294             : 
     295           6 :     socket->domain->nb_sockets -= 1;
     296           6 :     if (socket->domain->nb_sockets == 0) {
     297         786 :         for(unsigned int i=0; i < socket->domain->reg_mr; ++i) {
     298         780 :             if ((ret = fi_close(socket->domain->mr[i]))) {
     299           0 :                 log_warn("Failed to close send Memory Region %d, error %d.", i, ret);;
     300             :             }
     301             :         }
     302           6 :         if ((ret = fi_close(&socket->domain->domain->fid))) {
     303           0 :             log_error("Failed to close send socket Domain %d: %s", ret, fi_strerror(-ret));
     304             :         }
     305             : 
     306           6 :         if(socket->ctx->evloop.pfids.count == 0){
     307           1 :             if ((ret = fi_close(&socket->domain->fabric->fid))) {
     308           0 :                 log_error("Failed to close send socket Fabric %d: %s", ret, fi_strerror(-ret));
     309             :             }
     310             :         }
     311           6 :         free(socket->domain->mr);
     312           6 :         socket->domain->mr = NULL;
     313           6 :         free(socket->domain);
     314           6 :         socket->domain = NULL;
     315             :     }
     316           6 :     if(socket->fi != NULL){
     317           6 :         fi_freeinfo(socket->fi);
     318           6 :         socket->fi = NULL;
     319             :     }
     320           6 :     socket->state = DISCONNECTED;
     321           6 : }
     322             : 
     323             : 
     324             : void
     325           3 : handle_tcp_send_socket_shutdown(struct netio_send_socket* socket)
     326             : {
     327           3 :     netio_signal_close(&socket->ctx->evloop, &socket->tcp_signal);
     328           3 :     epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->eq_ev_ctx.fd, NULL);
     329           3 :     int rc = close(socket->eq_ev_ctx.fd);
     330           3 :     if ( !rc ){ remove_open_fd(&socket->ctx->evloop, socket->eq_ev_ctx.fd); }
     331           3 : }
     332             : 
     333             : 
     334             : static void
     335           4 : handle_send_socket_shutdown_on_connection_refused(struct netio_send_socket* socket)
     336             : {
     337             :     int ret = 0;
     338             :     //struct fi_eq_err_entry err_entry;
     339             :     struct epoll_event ep_event; /* needed only for kernel <2.6.9  */
     340           4 :     if(socket->ep && (ret = fi_close(&socket->ep->fid)))
     341             :     {
     342           0 :         log_error("Failed to close send socket endpoint %d: %s", ret, fi_strerror(-ret));
     343             :     }
     344             : 
     345             :     //EQ
     346           4 :     remove_polled_fid(&socket->ctx->evloop.pfids, socket->eqfd);
     347           4 :     ret =  epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->eqfd, &ep_event);
     348           4 :     if (ret) {log_warn("Conn refused: cannot deregister send_socket EQ fd %d from evloop %d, %s", socket->eqfd, socket->ctx->evloop.epollfd, strerror(errno));}
     349           4 :     if(socket->eq && (ret = fi_close(&socket->eq->fid)))
     350             :     {
     351           0 :         log_error("Failed to close send socket Event Queue %d: %s", ret, fi_strerror(-ret));
     352           0 :         ret = close(socket->eq_ev_ctx.fd);
     353           0 :         if ( ret ) { log_warn("Cannot close send socket eq fd %d, %s", socket->eq_ev_ctx.fd, strerror(errno));}
     354             :     }
     355           4 :     remove_open_fd(&socket->ctx->evloop, socket->eqfd);
     356           4 :     socket->eqfd = -1;
     357             : 
     358             :     //FI_TRANSMIT CQ - wait object not retrieved yet
     359           4 :     if(socket->cq && (ret = fi_close(&socket->cq->fid)))
     360             :     {
     361           0 :         log_error("Failed to close FI_TRANSMIT send socket Completion Queue %d: %s", ret, fi_strerror(-ret));
     362             :     }
     363             :     //FI_RECV CQ
     364           4 :     if(fi_close(&socket->rcq->fid))
     365             :     {
     366           0 :         log_error("Failed to close FI_RECV send socket Completion Queue %d: %s", ret, fi_strerror(-ret));
     367             :     }
     368             : 
     369           4 :     socket->domain->nb_sockets -= 1;
     370           4 :     if (socket->domain->nb_sockets == 0) {
     371           4 :         for(unsigned int i=0; i < socket->domain->reg_mr; ++i) {
     372           0 :             if ((ret = fi_close(socket->domain->mr[i]))) {
     373           0 :                 log_warn("Failed to close send Memory Region %d, error %d.", i, ret);
     374             :             }
     375             :         }
     376           4 :         if ((ret = fi_close(&socket->domain->domain->fid))) {
     377           0 :             log_error("Failed to close send socket Domain %d: %s", ret, fi_strerror(-ret));
     378             :         }
     379           4 :         if(socket->ctx->evloop.pfids.count == 0){
     380           4 :             if ((ret = fi_close(&socket->domain->fabric->fid))) {
     381           0 :                 log_error("Failed to close send socket Fabric %d: %s", ret, fi_strerror(-ret));
     382             :             }
     383             :         }
     384           4 :         free(socket->domain->mr);
     385           4 :         socket->domain->mr = NULL;
     386           4 :         free(socket->domain);
     387           4 :         socket->domain = NULL;
     388             : 
     389             :     }
     390           4 :     if(socket->fi != NULL){
     391           4 :         fi_freeinfo(socket->fi);
     392           4 :         socket->fi = NULL;
     393             :     }
     394           4 :     socket->state = UNCONNECTED;
     395           4 : }
     396             : 
     397             : // WAIT OBJECT CALLBACKS ///////////////////////////////////////////////////////////
     398             : 
     399             : void
     400         105 : on_send_socket_libfabric_cm_event(int fd, void* ptr)
     401             : {
     402             :     struct netio_send_socket* socket = (struct netio_send_socket*)ptr;
     403             :     struct fi_eq_err_entry err_entry;
     404             :     log_dbg("send socket %p fd %d: connection event, evloop: %d", socket, fd, socket->ctx->evloop.epollfd);
     405             : 
     406         105 :     if(socket->state == DISCONNECTED || socket->eqfd != fd){
     407             :         log_dbg("discarding send_socket cm events: state disconnected or fd mismatch eqfd %d fd %d", socket->eqfd, fd);
     408          68 :         return;
     409             :     } //Check if event queue was already closed. TODO: Need to fix receiving messages after closing
     410         105 :     int event = read_cm_event(socket->eq, NULL, &err_entry);
     411             :     int ret = 0;
     412             : 
     413             :     log_dbg("event %d", event);
     414             : 
     415         105 :     switch(event)
     416             :     {
     417           8 :     case FI_SHUTDOWN:
     418           8 :         if (socket->eqfd < 0 ){
     419           0 :             log_info("Ignoring FI_SHUTDOWN on send_socket, invalid eqfd (socket already closed)");
     420          68 :             return;
     421             :         }
     422           8 :         log_info("fi_verbs_process_send_socket_cm_event: FI_SHUTDOWN");
     423           8 :         if(socket->cb_internal_connection_closed){
     424           0 :             socket->cb_internal_connection_closed(socket);
     425             :         }
     426           8 :         if(socket->cb_connection_closed) {
     427           8 :             socket->cb_connection_closed(socket);
     428             :         }
     429             :         return;
     430             : 
     431          56 :     case FI_CONNECTED:
     432          56 :         socket->cqfd = -1;
     433          56 :         if((ret = fi_control(&socket->cq->fid, FI_GETWAIT, &socket->cqfd)))
     434             :         {
     435           0 :             FATAL("Failed to retrieve wait object for send socket Completion Queue", ret);
     436             :         }
     437             : 
     438          56 :         socket->cq_ev_ctx.fd = socket->cqfd;
     439          56 :         socket->cq_ev_ctx.data = socket;
     440          56 :         socket->cq_ev_ctx.cb = on_send_socket_cq_event;
     441             : 
     442             :         log_dbg("send_socket: EQ fd %d connected, CQ fd %d", socket->eqfd, socket->cqfd);
     443             :         log_dbg("Adding SEND CQ polled fid %d %p", socket->cqfd, &socket->cq->fid);
     444          56 :         add_polled_fid(&socket->ctx->evloop.pfids, socket->domain->fabric, &socket->cq->fid, socket->cqfd, socket, on_send_socket_cq_event);
     445          56 :         add_open_fd(&socket->ctx->evloop.openfds, socket->cqfd, NETIO_CQ, USEND, socket);
     446          56 :         netio_register_read_fd(&socket->ctx->evloop, &socket->cq_ev_ctx);
     447             : 
     448          56 :         socket->state = CONNECTED;
     449             : 
     450          56 :         if(socket->cb_connection_established) {
     451          56 :           socket->cb_connection_established(socket);
     452             :         }
     453             : 
     454             :         return;
     455             : 
     456             :     case FI_MR_COMPLETE:
     457             :     case FI_AV_COMPLETE:
     458             :     case FI_JOIN_COMPLETE:
     459             :         // Not implemented
     460          37 :         break;
     461             : 
     462             : 
     463           4 :     case -FI_EAVAIL:
     464             : 
     465           4 :         switch(err_entry.err) {
     466             : 
     467           4 :             case FI_ECONNREFUSED:
     468             : 
     469             :                 log_dbg("Connection refused (FI_ECONNREFUSED). Deallocating send_socket resources.");
     470             : 
     471           4 :                 if(socket->eqfd < 0){
     472             :                     log_dbg("FI_ECONNREFUSED on send socket with EQ fd %d. Not clearing it.", socket->eqfd);
     473             :                     return;
     474             :                 }
     475             : 
     476           4 :                 handle_send_socket_shutdown_on_connection_refused(socket);
     477             : 
     478           4 :                 if(socket->cb_error_connection_refused) {
     479           4 :                     socket->cb_error_connection_refused(socket);
     480             :                 }
     481             : 
     482           4 :                 if (socket->recv_socket != NULL){
     483           0 :                     if (socket->recv_socket->eqfd < 0 ){
     484           0 :                         log_info("Ignoring FI_ETIMEDOUT on recv_socket, invalid eqfd (socket already closed)");
     485           0 :                         return;
     486             :                     }
     487           0 :                     log_info("Shutting down receive socket on FI_ETIMEDOUT");
     488           0 :                     handle_recv_socket_shutdown(socket->recv_socket);
     489           0 :                     if(socket->recv_socket->lsocket->cb_connection_closed) {
     490           0 :                         socket->recv_socket->lsocket->cb_connection_closed(socket->recv_socket);
     491             :                     }
     492           0 :                     if (socket->recv_socket->lsocket->attr.num_buffers > 0){
     493           0 :                         for(int i = 0; i < socket->recv_socket->lsocket->attr.num_buffers; i++){
     494           0 :                             free(socket->recv_socket->recv_buffers[i].data);
     495             :                         }
     496           0 :                         free(socket->recv_socket->recv_buffers);
     497             :                     }
     498             :                 }
     499             : 
     500             :                 //print_openfds(&socket->ctx->evloop.openfds);
     501             :                 break;
     502             : 
     503           0 :             case FI_ETIMEDOUT:
     504           0 :                 log_info("fi_verbs_process_send_socket_cm_event: FI_ETIMEDOUT");
     505           0 :                 if (socket->eqfd < 0 ){
     506           0 :                     log_info("Ignoring FI_SHUTDOWN on send_socket, invalid eqfd (socket already closed)");
     507           0 :                     break;
     508             :                 }
     509             : 
     510             :                 // Need to take care of receive socket as well
     511           0 :                 if (socket->recv_socket != NULL){
     512           0 :                     if (socket->recv_socket->eqfd < 0 ){
     513           0 :                         log_info("Ignoring FI_ETIMEDOUT on recv_socket, invalid eqfd (socket already closed)");
     514           0 :                         return;
     515             :                     }
     516           0 :                     log_info("Shutting down receive socket on FI_ETIMEDOUT");
     517           0 :                     handle_recv_socket_shutdown(socket->recv_socket);
     518           0 :                     if(socket->recv_socket->lsocket->cb_connection_closed) {
     519           0 :                         socket->recv_socket->lsocket->cb_connection_closed(socket->recv_socket);
     520             :                     }
     521           0 :                     if (socket->recv_socket->lsocket->attr.num_buffers > 0){
     522           0 :                         for(int i = 0; i < socket->recv_socket->lsocket->attr.num_buffers; i++){
     523           0 :                             free(socket->recv_socket->recv_buffers[i].data);
     524             :                         }
     525           0 :                         free(socket->recv_socket->recv_buffers);
     526             :                     }
     527             :                 }
     528             : 
     529           0 :                 if(socket->cqfd < 0){ //cq not initalized yet
     530           0 :                     handle_send_socket_shutdown_on_connection_refused(socket);
     531             :                 } else {
     532           0 :                     if(socket->cb_internal_connection_closed){
     533           0 :                         socket->cb_internal_connection_closed(socket);
     534             :                     }
     535           0 :                     if(socket->cb_connection_closed) {
     536           0 :                         socket->cb_connection_closed(socket);
     537             :                     }
     538             :                 }
     539             : 
     540             :                 break;
     541             : 
     542           0 :             default:
     543             : 
     544           0 :                 log_error("Unhandled error in the Event Queue: %s (code: %d), provider specific: %s (code: %d)",
     545             :                         fi_strerror(err_entry.err), err_entry.err,
     546             :                         fi_eq_strerror(socket->eq, err_entry.prov_errno, err_entry.err_data, NULL, 0),
     547             :                         err_entry.prov_errno);
     548             :         }
     549             :         return;
     550             : 
     551          37 :     case -FI_EAGAIN:
     552          37 :         struct fid* fp = &socket->eq->fid;
     553          37 :         fi_trywait(socket->domain->fabric, &fp, 1);
     554             :         break;
     555             : 
     556           0 :     default:
     557           0 :         log_error("Unexpected event %d in send socket Event Queue", event);
     558           0 :         exit(2);
     559             :     }
     560             : }
     561             : 
     562             : 
     563             : void
     564          25 : on_listen_socket_libfabric_cm_event(int fd, void* ptr)
     565             : {
     566             :     struct netio_listen_socket* lsocket = (struct netio_listen_socket*)ptr;
     567             :     log_dbg("listen socket: connection event");
     568          25 :     if(lsocket->eqfd != fd){
     569             :         log_dbg("listen socket is already closed.");
     570           0 :         return;
     571             :     }
     572             : 
     573          25 :     struct fi_info *info = NULL;
     574             :     struct fi_eq_err_entry err_entry;
     575          25 :     int event = read_cm_event(lsocket->eq, &info, &err_entry);
     576             : 
     577             : 
     578          25 :     switch (event)
     579             :     {
     580          25 :         case FI_CONNREQ:
     581             :             log_dbg("fi_verbs_process_listen_socket_cm_event: FI_CONNREQ");
     582          25 :             struct netio_socket_list* new_entry = add_socket(&lsocket->recv_sockets, URECV);
     583          25 :             struct netio_recv_socket* rsocket = (struct netio_recv_socket*)new_entry->socket;
     584          25 :             netio_init_recv_socket(rsocket, lsocket);
     585          25 :             handle_connreq(rsocket, lsocket, info, on_recv_socket_libfabric_cm_event, rsocket);
     586          25 :             if(lsocket->recv_sub_msg == 1){
     587          12 :                 rsocket->sub_msg_buffers = malloc(32*sizeof(struct netio_buffer*));
     588         396 :                 for (int i = 0; i < 32; i++){
     589         384 :                     rsocket->sub_msg_buffers[i] = malloc(sizeof(struct netio_buffer));
     590         384 :                     rsocket->sub_msg_buffers[i]->size = sizeof(struct netio_subscription_message);
     591         384 :                     rsocket->sub_msg_buffers[i]->data = malloc(rsocket->sub_msg_buffers[i]->size);
     592         384 :                     netio_register_recv_buffer(rsocket, rsocket->sub_msg_buffers[i], 0);
     593         384 :                     netio_post_recv(rsocket, rsocket->sub_msg_buffers[i]);
     594             :                 }
     595             :                 log_dbg("Posted recv for subscription messages");
     596          13 :             } else if (lsocket->attr.num_buffers > 0) {
     597             :                 log_dbg("Connection established, posting %lu buffers", lsocket->attr.num_buffers);
     598          13 :                 rsocket->sub_msg_buffers = NULL;
     599          13 :                 rsocket->recv_buffers = malloc(lsocket->attr.num_buffers * sizeof(struct netio_buffer));
     600        1556 :                 for(unsigned i=0; i<lsocket->attr.num_buffers; i++) {
     601             :                     log_trc("registering buffer of size %lu", lsocket->attr.buffer_size);
     602        1543 :                     rsocket->recv_buffers[i].size = lsocket->attr.buffer_size;
     603        1543 :                     rsocket->recv_buffers[i].data = malloc(lsocket->attr.buffer_size);
     604        1543 :                     netio_register_recv_buffer(rsocket, &rsocket->recv_buffers[i], 0);
     605        1543 :                     netio_post_recv(rsocket, &rsocket->recv_buffers[i]);
     606             :                 }
     607             :             } else {
     608           0 :                 log_error("Something went wrong. Not allocating any buffers for recv socket.");
     609             :             }
     610             :             break;
     611             : 
     612           0 :         case FI_CONNECTED:
     613           0 :             log_fatal("FI_CONNECTED received on listen socket");
     614           0 :             exit(2);
     615             : 
     616           0 :         case FI_SHUTDOWN:
     617           0 :             log_fatal("FI_SHUTDOWN received on listen socket");
     618           0 :             exit(2);
     619             : 
     620           0 :         case -FI_EAGAIN:
     621           0 :             struct fid* fp = &lsocket->eq->fid;
     622           0 :             fi_trywait(lsocket->fabric, &fp, 1);
     623             :             break;
     624             : 
     625           0 :         case -FI_EAVAIL:
     626           0 :             log_error("Unhandled error in listen socket EQ code: %d, provider specific code: %d",
     627             :                 err_entry.err, err_entry.prov_errno);
     628           0 :             break;
     629             :     }
     630          25 :     fi_freeinfo(info);
     631             : }
     632             : 
     633             : 
     634             : void
     635          31 : on_buffered_listen_socket_libfabric_cm_event(int fd, void* ptr)
     636             : {
     637             :     struct netio_buffered_listen_socket* lsocket = (struct netio_buffered_listen_socket*)ptr;
     638             :     log_dbg("buffered listen socket: connection event");
     639          31 :     if(lsocket->listen_socket.eqfd != fd){
     640             :         log_dbg("Buffered listen socket CM event: inconsistend fd. Ignoring event.");
     641           0 :         return;
     642             :     }
     643             : 
     644          31 :     struct fi_info *info = NULL;
     645             :     struct fi_eq_err_entry err_entry;
     646          31 :     int event = read_cm_event(lsocket->listen_socket.eq, &info, &err_entry);
     647             : 
     648          31 :     switch (event)
     649             :     {
     650          31 :         case FI_CONNREQ:
     651             :             log_dbg("FI_CONNREQ");
     652          31 :             struct netio_socket_list* new_entry = add_socket(&(lsocket->listen_socket.recv_sockets), BRECV);
     653          31 :             struct netio_buffered_recv_socket* rsocket = (struct netio_buffered_recv_socket*)new_entry->socket;
     654          31 :             netio_buffered_recv_socket_init(rsocket, lsocket);
     655          31 :             handle_connreq(&rsocket->recv_socket, &lsocket->listen_socket, info, on_buffered_recv_socket_libfabric_cm_event, rsocket);
     656          31 :             log_info("fi_verbs_process_recv_socket_cm_event: FI_CONNECTED, posting %d buffers", rsocket->num_pages);
     657        7479 :             for(unsigned int i=0; i<rsocket->num_pages; i++) {
     658        7448 :                 netio_register_recv_buffer(&rsocket->recv_socket, &rsocket->pages[i], 0);
     659        7448 :                 netio_post_recv(&rsocket->recv_socket, &rsocket->pages[i]);
     660             :             }
     661             :             break;
     662             : 
     663           0 :         case FI_CONNECTED:
     664           0 :             log_fatal("FI_CONNECTED received on buffered listen socket");
     665           0 :             exit(2);
     666             : 
     667           0 :         case FI_SHUTDOWN:
     668           0 :             log_fatal("FI_SHUTDOWN received on buffered listen socket");
     669           0 :             exit(2);
     670             : 
     671           0 :         case -FI_EAGAIN:
     672           0 :             struct fid* fp = &lsocket->listen_socket.eq->fid;
     673           0 :             fi_trywait(lsocket->listen_socket.fabric, &fp, 1);
     674             :             break;
     675             : 
     676           0 :         case -FI_EAVAIL:
     677           0 :             log_error("Unhandled error in buffer listen socket EQ code: %d, provider specific code: %d",
     678             :                 err_entry.err, err_entry.prov_errno);
     679           0 :             break;
     680             :     }
     681          31 :     fi_freeinfo(info);
     682             : }
     683             : 
     684             : 
     685             : void
     686          34 : on_recv_socket_libfabric_cm_event(int fd, void* ptr)
     687             : {
     688             :     struct netio_recv_socket* socket = (struct netio_recv_socket*)ptr;
     689          34 :     log_info("recv socket %d: connection event", socket->eqfd);
     690          34 :     if(socket->eqfd != fd){
     691             :         log_dbg("Recv socket CM event: inconsistend fd. Ignoring event.");
     692           0 :         return;
     693             :     }
     694             :     int ret;
     695             :     struct fi_eq_err_entry err_entry;
     696          34 :     uint32_t event = read_cm_event(socket->eq, NULL, &err_entry);
     697             : 
     698          34 :     switch (event)
     699             :     {
     700          25 :     case FI_CONNECTED:
     701          25 :         log_info("fi_verbs_process_recv_socket_cm_event: FI_CONNECTED");
     702          25 :         socket->cqfd = -1;
     703          25 :         if((ret = fi_control(&socket->cq->fid, FI_GETWAIT, &socket->cqfd)))
     704             :         {
     705           0 :             FATAL("Failed to retrieve recv socket Completion Queue wait object", ret);
     706             :         }
     707             : 
     708          25 :         socket->cq_ev_ctx.fd = socket->cqfd;
     709          25 :         socket->cq_ev_ctx.data = socket;
     710          25 :         socket->cq_ev_ctx.cb = on_recv_socket_cq_event;
     711             : 
     712             :         log_dbg("Adding recv CQ polled fid %d %p", socket->cqfd, &socket->cq->fid);
     713          25 :         add_polled_fid(&socket->ctx->evloop.pfids, socket->lsocket->fabric, &socket->cq->fid, socket->cqfd, socket, on_recv_socket_cq_event);
     714          25 :         add_open_fd(&socket->ctx->evloop.openfds, socket->cqfd, NETIO_CQ, URECV, socket);
     715          25 :         netio_register_read_fd(&socket->ctx->evloop, &socket->cq_ev_ctx);
     716             : 
     717             :         log_dbg("recv_socket: EQ fd %d CQ fd %d connected", socket->eqfd, socket->cqfd);
     718          25 :         if(socket->lsocket->cb_connection_established) {
     719          25 :             socket->lsocket->cb_connection_established(socket);
     720             :         }
     721             : 
     722          34 :         break;
     723             : 
     724           9 :     case FI_SHUTDOWN:
     725           9 :         if (socket->eqfd < 0 ){
     726           0 :             log_info("Ignoring FI_SHUTDOWN on recv_socket, invalid eqfd (socket already closed)");
     727           0 :             return;
     728             :         }
     729           9 :         log_info("fi_verbs_process_recv_socket_cm_event: FI_SHUTDOWN");
     730           9 :         handle_recv_socket_shutdown(socket);
     731           9 :         if(socket->lsocket->cb_connection_closed) {
     732           6 :             socket->lsocket->cb_connection_closed(socket);
     733             :         }
     734             : 
     735           9 :         if (socket->lsocket->attr.num_buffers > 0){
     736        1032 :             for(int i = 0; i < socket->lsocket->attr.num_buffers; i++){
     737        1026 :                 free(socket->recv_buffers[i].data);
     738             :             }
     739           6 :             free(socket->recv_buffers);
     740             :         }
     741             : 
     742           9 :         int return_value = remove_socket(&socket->lsocket->recv_sockets, socket);
     743           9 :         log_info("Recv socket removed, result: %d", return_value);
     744           9 :         break;
     745             : 
     746             :     case FI_MR_COMPLETE:
     747             :     case FI_AV_COMPLETE:
     748             :     case FI_JOIN_COMPLETE:
     749             :         // Not implemented
     750             :         break;
     751             : 
     752           0 :     case -FI_EAGAIN:
     753           0 :         struct fid* fp = &socket->eq->fid;
     754           0 :         fi_trywait(socket->lsocket->fabric, &fp, 1);
     755             :         break;
     756             : 
     757           0 :     case -FI_EAVAIL:
     758           0 :         log_error("Unhandled error in recv socket EQ code: %d, provider specific code: %d",
     759             :             err_entry.err, err_entry.prov_errno);
     760           0 :         break;
     761             : 
     762           0 :     default:
     763           0 :         log_error("Unexpected event %d in recv socket Event Queue", event);
     764           0 :         exit(2);
     765             :         break;
     766             :     }
     767             : }
     768             : 
     769             : 
     770             : void
     771          56 : on_buffered_recv_socket_libfabric_cm_event(int fd, void* ptr)
     772             : {
     773             :     struct netio_buffered_recv_socket* socket = (struct netio_buffered_recv_socket*)ptr;
     774             :     log_dbg("buffered recv socket %d: connection event (FD = %d)", socket->recv_socket.eqfd, fd);
     775          56 :     if(socket->recv_socket.eqfd != fd){
     776             :         log_dbg("Buffered recv socket CM event: inconsistend fd. Ignoring event.");
     777           0 :         return;
     778             :     }
     779             :     int ret;
     780             :     struct fi_eq_err_entry err_entry;
     781          56 :     uint32_t event = read_cm_event(socket->recv_socket.eq, NULL, &err_entry);
     782             : 
     783          56 :     switch (event)
     784             :     {
     785          31 :     case FI_CONNECTED:
     786          31 :         socket->recv_socket.cqfd = -1;
     787          31 :         if((ret = fi_control(&socket->recv_socket.cq->fid, FI_GETWAIT, &socket->recv_socket.cqfd)))
     788             :         {
     789           0 :             FATAL("Failed to retrieve recv socket Completion Queue wait object", ret);
     790             :         }
     791             : 
     792          31 :         socket->recv_socket.cq_ev_ctx.fd = socket->recv_socket.cqfd;
     793          31 :         socket->recv_socket.cq_ev_ctx.data = &socket->recv_socket;
     794          31 :         socket->recv_socket.cq_ev_ctx.cb = on_recv_socket_cq_event;
     795             : 
     796             :         log_dbg("Adding BUFFERED RECV CQ polled fid %d %p", socket->recv_socket.cqfd, &socket->recv_socket.cq->fid);
     797          31 :         add_open_fd(&socket->recv_socket.ctx->evloop.openfds, socket->recv_socket.cqfd, NETIO_CQ, BRECV, &socket->recv_socket);
     798          31 :         add_polled_fid(&socket->recv_socket.ctx->evloop.pfids,
     799          31 :                     socket->recv_socket.lsocket->fabric,
     800          31 :                     &socket->recv_socket.cq->fid,
     801             :                     socket->recv_socket.cqfd,
     802             :                     &socket->recv_socket,
     803             :                     on_recv_socket_cq_event);
     804             : 
     805          31 :         netio_register_read_fd(&socket->recv_socket.ctx->evloop, &socket->recv_socket.cq_ev_ctx);
     806             :         log_dbg("buffered_recv_socket: registering CQ fd %d", socket->recv_socket.cqfd);
     807             : 
     808          31 :         if(socket->lsocket->cb_connection_established) {
     809          31 :             socket->lsocket->cb_connection_established(socket);
     810             :         }
     811             : 
     812          56 :         break;
     813             : 
     814          25 :     case FI_SHUTDOWN:
     815             :         log_dbg("recv socket shutdown");
     816          25 :         if (socket->recv_socket.eqfd < 0 ){
     817             :             log_dbg("Ignoring FI_SHUTDOWN on buffered recv_socket, invalid eqfd (socket already closed)");
     818           0 :             return;
     819             :         }
     820          25 :         handle_recv_socket_shutdown(&socket->recv_socket);
     821        6177 :         for(unsigned int i=0; i<socket->num_pages; i++) {
     822        6152 :             free(socket->pages[i].data);
     823             :         }
     824          25 :         free(socket->pages);
     825          25 :         if(socket->lsocket->cb_connection_closed) {
     826          25 :             socket->lsocket->cb_connection_closed(socket);
     827             :         }
     828             : 
     829          25 :         remove_socket(&socket->lsocket->listen_socket.recv_sockets, socket);
     830          25 :         break;
     831             : 
     832             :     case FI_MR_COMPLETE:
     833             :     case FI_AV_COMPLETE:
     834             :     case FI_JOIN_COMPLETE:
     835             :         // Not implemented
     836             :         break;
     837             : 
     838           0 :     case -FI_EAGAIN:
     839           0 :         struct fid* fp = &socket->recv_socket.eq->fid;
     840           0 :         fi_trywait(socket->lsocket->listen_socket.fabric, &fp, 1);
     841             :         break;
     842             : 
     843           0 :     case -FI_EAVAIL:
     844             :         // error was found
     845           0 :         log_error("Unhandled error in buffered recv socket EQ code: %d, provider specific code: %d",
     846             :             err_entry.err, err_entry.prov_errno);
     847           0 :         break;
     848             : 
     849           0 :     default:
     850           0 :         log_error("Unexpected event %d in buffered recv socket Event Queue", event);
     851           0 :         exit(2);
     852             :         break;
     853             :     }
     854             : }
     855             : 
     856             : 
     857             : // CALLBACKS FOR GARBAGE COLLECTION  //////////////////////////////////////////////////////////
     858             : void
     859           0 : close_send_socket(void* ptr)
     860             : {
     861           0 :     log_info("Close_send_socket. Not supported anymore");
     862             :     struct signal_data* sd = (struct signal_data*)ptr;
     863             : 
     864           0 :     struct netio_send_socket* socket = (struct netio_send_socket*)sd->ptr;
     865           0 :     if(socket->tcp_fi_mode == NETIO_MODE_TCP){
     866           0 :         shutdown(socket->cq_ev_ctx.fd, SHUT_WR);
     867           0 :         netio_signal_close(&socket->ctx->evloop, &socket->tcp_signal);
     868           0 :         epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->eq_ev_ctx.fd, NULL);
     869           0 :         int rc = close(socket->cq_ev_ctx.fd);
     870           0 :         if ( rc ){ log_warn("Cannot close TCP send socket fd %d: %s", socket->cq_ev_ctx.fd, strerror(errno));}
     871           0 :         if ( !rc ){ remove_open_fd(&socket->ctx->evloop, socket->eq_ev_ctx.fd); }
     872           0 :         socket->state=DISCONNECTED;
     873           0 :         if(socket->cb_connection_closed) {
     874           0 :           socket->cb_connection_closed(socket);
     875             :         }
     876           0 :     } else if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC) {
     877             :         //commented out as closing libfabric sockets relies on exhanges of fi_shutdown
     878             :         //handle_send_socket_shutdown(socket);
     879             :     } else {
     880           0 :         log_error("Closing send socket with unknown netio mode %d", socket->tcp_fi_mode);
     881             :     }
     882             : 
     883             :     //delete the used signal
     884           0 :     netio_signal_close(sd->evloop, sd->signal);
     885           0 :     free(sd->signal);
     886           0 :     free(sd);
     887           0 : }
     888             : 
     889             : void
     890           0 : close_buffered_send_socket(void *ptr)
     891             : {
     892             :     log_dbg("Closing buffered_send_socket %p.", ptr);
     893             :     struct signal_data* sd = (struct signal_data*)ptr;
     894             : 
     895           0 :     struct netio_buffered_send_socket* socket = (struct netio_buffered_send_socket*)sd->ptr;
     896           0 :     struct netio_send_socket* ssocket = &socket->send_socket;
     897           0 :     if(ssocket->tcp_fi_mode == NETIO_MODE_TCP){
     898           0 :         netio_timer_close(&(ssocket->ctx->evloop), &socket->flush_timer);
     899           0 :         netio_signal_close(&(ssocket->ctx->evloop), &socket->signal_buffer_available);
     900           0 :         shutdown(ssocket->eq_ev_ctx.fd, SHUT_WR);
     901           0 :         netio_signal_close(&ssocket->ctx->evloop, &ssocket->tcp_signal);
     902           0 :         epoll_ctl(ssocket->ctx->evloop.epollfd, EPOLL_CTL_DEL, ssocket->eq_ev_ctx.fd, NULL);
     903           0 :         int rc = close(ssocket->eq_ev_ctx.fd);
     904           0 :         if ( !rc ){ remove_open_fd(&ssocket->ctx->evloop, ssocket->eq_ev_ctx.fd); }
     905           0 :         if(ssocket->cb_connection_closed) {
     906           0 :           ssocket->cb_connection_closed(ssocket);
     907             :         }
     908           0 :     } else if(ssocket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
     909             :         //commented out as closing libfabric sockets relies on exhanges of fi_shutdown
     910             :         //handle_send_socket_shutdown(ssocket);
     911             :     } else {
     912           0 :         log_error("Closing send socket with unkown netio mode %d", ssocket->tcp_fi_mode);
     913             :     }
     914           0 :     for(size_t i=0; i < socket->buffers.num_buffers; ++i ){
     915           0 :         free(socket->buffers.buffers[i]);
     916             :     }
     917             : 
     918             :     //delete the used signal
     919           0 :     netio_signal_close(sd->evloop, sd->signal);
     920           0 :     free(sd->signal);
     921           0 :     free(sd);
     922           0 : }
     923             : 
     924             : void
     925           2 : close_recv_socket(void* ptr)
     926             : {
     927             :     log_dbg("Closing recv_socket %p.", ptr);
     928             :     struct signal_data* sd = (struct signal_data*)ptr;
     929           2 :     struct netio_recv_socket* socket = (struct netio_recv_socket*)sd->ptr;
     930           2 :     if(socket->tcp_fi_mode == NETIO_MODE_TCP){
     931           2 :         shutdown(socket->eq_ev_ctx.fd, SHUT_RD);
     932             :         // netio_signal_close(&socket->ctx->evloop, &socket->tcp_signal); // causes bad file descriptor
     933           2 :         epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->eq_ev_ctx.fd, NULL);
     934           2 :         int rc = close(socket->eq_ev_ctx.fd);
     935           2 :         if ( !rc ){ remove_open_fd(&socket->ctx->evloop, socket->eq_ev_ctx.fd); }
     936           2 :         if(socket->lsocket->cb_connection_closed) {
     937           0 :           socket->lsocket->cb_connection_closed(socket);
     938             :         }
     939           0 :     } else if(socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
     940             :         //commented out as closing libfabric sockets relies on exhanges of fi_shutdown
     941             :         //handle_recv_socket_shutdown(socket);
     942             :     } else {
     943           0 :         log_error("Closing recv socket with unkown netio mode %d", socket->tcp_fi_mode);
     944             :     }
     945             : 
     946             :     //clean up signal
     947           2 :     netio_signal_close(sd->evloop, sd->signal);
     948           2 :     free(sd->signal);
     949           2 :     free(sd);
     950           2 : }
     951             : 
     952             : void
     953           0 : close_buffered_recv_socket(void* ptr)
     954             : {
     955           0 :     log_info("Closing buffered_recv_socket %p. Not supported anymore", ptr);
     956             :     struct signal_data* sd = (struct signal_data*)ptr;
     957             :     /*
     958             :     struct netio_buffered_recv_socket* socket = (struct netio_buffered_recv_socket*)sd->ptr;
     959             :     struct netio_recv_socket* rsocket = &socket->recv_socket;
     960             :     if(rsocket->tcp_fi_mode == NETIO_MODE_TCP){
     961             :         shutdown(rsocket->eq_ev_ctx.fd, SHUT_RD);
     962             :         netio_signal_close(&rsocket->ctx->evloop, &rsocket->tcp_signal);
     963             :         epoll_ctl(rsocket->ctx->evloop.epollfd, EPOLL_CTL_DEL, rsocket->eq_ev_ctx.fd, NULL);
     964             :         int rc = close(rsocket->eq_ev_ctx.fd);
     965             :         if ( !rc ){ remove_open_fd(&rsocket->ctx->evloop.openfds, rsocket->eq_ev_ctx.fd); }
     966             :     } else if (rsocket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
     967             :         handle_recv_socket_shutdown(rsocket);
     968             :     } else {
     969             :         log_error("Closing buffered recv socket with unkown netio mode %d", rsocket->tcp_fi_mode);
     970             :     }
     971             :     free(socket->pages);
     972             :     */
     973             :     //clean up signal
     974           0 :     netio_signal_close(sd->evloop, sd->signal);
     975           0 :     free(sd->signal);
     976           0 :     free(sd);
     977           0 : }
     978             : 
     979             : 
     980             : void
     981           0 : close_listen_socket(void* ptr)
     982             : {
     983             :     log_dbg("close_listen_socket");
     984             :     struct signal_data* sd = (struct signal_data*)ptr;
     985           0 :     struct netio_listen_socket* socket = (struct netio_listen_socket*)sd->ptr;
     986             : 
     987           0 :     if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
     988           0 :         if(socket->recv_sockets != NULL){
     989             :             struct netio_socket_list* entry = socket->recv_sockets;
     990             :             int still_open = 0;
     991           0 :             while(entry != NULL){
     992           0 :                 struct netio_recv_socket* recv_socket = entry->socket;
     993           0 :                 still_open += check_open_fd_exists(&socket->ctx->evloop.openfds, recv_socket->eqfd);
     994           0 :                 entry = entry->next;
     995             :             }
     996           0 :             if(still_open){
     997           0 :                 netio_signal_fire(sd->signal);
     998           0 :                 return;
     999             :             }
    1000           0 :             free_socket_list(&socket->recv_sockets);
    1001             :         }
    1002           0 :         handle_listen_socket_shutdown(socket);
    1003             :     }
    1004           0 :     else if (socket->tcp_fi_mode == NETIO_MODE_TCP){
    1005           0 :         if(socket->recv_sockets != NULL){
    1006             :             struct netio_socket_list* entry = socket->recv_sockets;
    1007           0 :             while(entry != NULL){
    1008           0 :                 struct netio_recv_socket* recv_socket = entry->socket;
    1009           0 :                 shutdown(recv_socket->eq_ev_ctx.fd, SHUT_RDWR);
    1010           0 :                 close(recv_socket->eq_ev_ctx.fd);
    1011           0 :                 remove_open_fd(&socket->ctx->evloop, recv_socket->eq_ev_ctx.fd);
    1012           0 :                 entry = entry->next;
    1013             :             }
    1014           0 :             free_socket_list(&socket->recv_sockets);
    1015             :         }
    1016             :     }
    1017             :     else {
    1018           0 :         log_error("Connection mode unsupported");
    1019             :     }
    1020             : 
    1021             :     //clean up signal
    1022           0 :     netio_signal_close(sd->evloop, sd->signal);
    1023           0 :     free(sd->signal);
    1024           0 :     free(sd);
    1025             : }
    1026             : 
    1027             : 
    1028             : 
    1029             : void
    1030           0 : close_buffered_listen_socket(void* ptr)
    1031             : {
    1032             :     log_dbg("close_buffered_listen_socket");
    1033             :     struct signal_data* sd = (struct signal_data*)ptr;
    1034           0 :     struct netio_listen_socket* socket = (struct netio_listen_socket*)sd->ptr;
    1035             : 
    1036           0 :     if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
    1037           0 :         if(socket->recv_sockets != NULL){
    1038             :             struct netio_socket_list* entry = socket->recv_sockets;
    1039             :             int still_open = 0;
    1040           0 :             while(entry != NULL){
    1041           0 :                 struct netio_buffered_recv_socket* recv_socket = entry->socket;
    1042           0 :                 still_open += check_open_fd_exists(&socket->ctx->evloop.openfds, recv_socket->recv_socket.eqfd);
    1043           0 :                 entry = entry->next;
    1044             :             }
    1045           0 :             if(still_open){
    1046           0 :                 netio_signal_fire(sd->signal);
    1047           0 :                 return;
    1048             :             }
    1049           0 :             free_socket_list(&socket->recv_sockets);
    1050             :         }
    1051           0 :         handle_listen_socket_shutdown(socket);
    1052             :     }
    1053           0 :     else if (socket->tcp_fi_mode == NETIO_MODE_TCP){
    1054           0 :         if(socket->recv_sockets != NULL){
    1055             :             struct netio_socket_list* entry = socket->recv_sockets;
    1056           0 :             while(entry != NULL){
    1057           0 :                 struct netio_buffered_recv_socket* recv_socket = entry->socket;
    1058           0 :                 shutdown(recv_socket->recv_socket.eqfd, SHUT_RDWR);
    1059           0 :                 close(recv_socket->recv_socket.eqfd);
    1060           0 :                 remove_open_fd(&socket->ctx->evloop, recv_socket->recv_socket.eqfd);
    1061           0 :                 entry = entry->next;
    1062             :             }
    1063           0 :             free_socket_list(&socket->recv_sockets);
    1064             :         }
    1065             :     }
    1066             :     else {
    1067           0 :         log_error("Connection mode unsupported");
    1068             :     }
    1069             : 
    1070             :     //clean up signal
    1071           0 :     netio_signal_close(sd->evloop, sd->signal);
    1072           0 :     free(sd->signal);
    1073           0 :     free(sd);
    1074             : }
    1075             : 
    1076             : 
    1077             : void
    1078           6 : handle_listen_socket_shutdown(struct netio_listen_socket* socket)
    1079             : {
    1080           6 :     if(socket->eqfd < 0){return;}//nothing to do
    1081             :     struct epoll_event ep_event; /* needed only for kernel <2.6.9  */
    1082             :     log_dbg("Handle_listen_socket_shutdown. Lsocket EQ: %d with evloop %d socket %p", socket->eqfd, socket->ctx->evloop.epollfd, socket);
    1083           6 :     int ret = epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->eqfd, &ep_event);
    1084           6 :     if(ret){ log_warn("Cannot deregister listen socket EQ %d from evloop %d", socket->eqfd, socket->ctx->evloop.epollfd); }
    1085             : 
    1086           6 :     if(socket->pep != NULL){
    1087           6 :        fi_close(&socket->pep->fid);
    1088           6 :        socket->pep = NULL;
    1089             :     }
    1090             : 
    1091             :     log_dbg("netio_listen_socket: removing EQ fd %d from evloop %d, ret %d", socket->eqfd, socket->ctx->evloop.epollfd,  ret);
    1092           6 :     if(socket->eq && (ret = fi_close(&socket->eq->fid)))
    1093             :     {
    1094           0 :         log_error("Failed to close listen socket %d: %s", ret, fi_strerror(-ret));
    1095           0 :         ret = close(socket->eq_ev_ctx.fd);
    1096           0 :         if ( ret ) {log_warn("Cannot close listen socket EQ fd %d: %s", socket->eq_ev_ctx.fd, strerror(errno));}
    1097             :     }
    1098           6 :     remove_open_fd(&socket->ctx->evloop, socket->eqfd);
    1099           6 :     socket->eqfd = -1;
    1100             : 
    1101           6 :     if(socket->fi != NULL){
    1102           6 :         fi_freeinfo(socket->fi);
    1103           6 :         socket->fi = NULL;
    1104             :     }
    1105             :     //fi_close(&socket->fabric->fid);
    1106             : }
    1107             : 
    1108             : 
    1109             : //This function is the TCP equivalent of on_send_socket_cm_event()
    1110             : void
    1111          36 : on_send_socket_tcp_cm_event(int fd, void* ptr)
    1112             : {
    1113             :   int ret, retVal;
    1114             :   socklen_t in_len;
    1115             :   struct netio_send_socket* socket = (struct netio_send_socket*)ptr;
    1116             : 
    1117             :   log_dbg("connection event called with fd = %d", fd);
    1118             : 
    1119          36 :   in_len = sizeof(retVal);
    1120          36 :   ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &retVal, &in_len);
    1121          36 :   if (ret < 0)
    1122             :   {
    1123           0 :     log_error("on_send_socket_tcp_cm_event: getsockopt failed", ret);
    1124             :   }
    1125             : 
    1126          36 :   if (retVal != 0)
    1127             :   {
    1128           0 :     log_error("on_send_socket_tcp_cm_event: connect did not work. retVal = %d, ret = %d, errno = %d", retVal, ret, errno);
    1129             :   }
    1130             : 
    1131             :   //MJ: The log_error above sometimes returns "retVal = 111, ret = 0, errno = 115". This error indicates that the receiving side temporarily
    1132             :   //MJ: cannot acknowledge the connection request.
    1133             :   //MJ: Should we somehow retry the connection request in netio or leave it to the user to redo it. In the latter case we have to give a clear error back to the user
    1134             :   //MJ: Experience shows that just waiting a few seconds and then retrying actually helps.
    1135             : 
    1136             :   log_dbg("Register the callback for on_send_completed. Using TCP socket %d", fd);
    1137          36 :   socket->cq_ev_ctx.fd   = fd;
    1138          36 :   socket->cq_ev_ctx.data = socket;
    1139          36 :   socket->cq_ev_ctx.cb   = on_send_socket_tcp_cq_event;
    1140          36 :   netio_register_write_tcp_fd(&socket->ctx->evloop, &socket->cq_ev_ctx);
    1141             : 
    1142             :   //MJ: In case of the TCP instruction chain "socket()->connect()->write()/recv()" one is always using the same FD.
    1143             :   //MJ: therefore we don't know if a connect() or a write()/recv() has completed when we receive a trigger from this FD.
    1144             :   //MJ: Therefore the code below may not work because cb_connection_established() will also get called on write()/recv()
    1145          36 :   socket->state = CONNECTED;
    1146             : 
    1147          36 :   if(socket->cb_connection_established)
    1148             :   {
    1149             :     log_dbg("Calling the callback that is registered for cb_connection_established");
    1150          36 :     socket->cb_connection_established(socket);
    1151             :   }
    1152          36 : }
    1153             : 
    1154             : 
    1155             : //This function is the TCP equivalent of on_listen_socket_cm_event()
    1156             : void
    1157           7 : on_listen_socket_tcp_cm_event(int fd, void* ptr)
    1158             : {
    1159             :   int infd;
    1160             :   socklen_t in_len;
    1161             :   struct sockaddr in_addr;
    1162           7 :   log_info("on_listen_socket_tcp_cm_event: called with fd = %d", fd);
    1163             : 
    1164             :   struct netio_listen_socket* lsocket = (struct netio_listen_socket*)ptr;
    1165           7 :   in_len = sizeof in_addr;
    1166           7 :   infd = accept(fd, &in_addr, &in_len);
    1167             : 
    1168             :   log_dbg("on_listen_socket_tcp_cm_event: listen socket: infd = %d", infd);
    1169           7 :   if (infd == -1) {
    1170           0 :     if ((errno == EAGAIN) || (errno == EWOULDBLOCK))      // We have processed all incoming connections.
    1171             :       log_dbg("on_listen_socket_tcp_cm_event: nothing to do?");
    1172             :     else
    1173             :     {
    1174             :       log_dbg("on_listen_socket_tcp_cm_event: ERROR. errno = %d", errno);
    1175           0 :       exit(-1);
    1176             :     }
    1177             :   }
    1178             : 
    1179             : #if defined DEBUG || defined DEBUG_CM
    1180             :   char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
    1181             :   int ret = getnameinfo(&in_addr, in_len, hbuf, sizeof hbuf, sbuf, sizeof sbuf, 0);
    1182             :   if (ret == 0) {
    1183             :     log_dbg("on_listen_socket_tcp_cm_event: Accepted connection on descriptor %d (host=%s, service=%s)", infd, hbuf, sbuf);
    1184             :   }
    1185             : #endif
    1186             : 
    1187           7 :   make_socket_non_blocking(infd);            // Make the incoming socket non-blocking
    1188             :   log_dbg("Adding TCP/IP recv socket to listen socket");
    1189           7 :   struct netio_socket_list* new_entry = add_socket(&lsocket->recv_sockets, URECV);
    1190           7 :   struct netio_recv_socket* rsocket = (struct netio_recv_socket*)new_entry->socket;
    1191           7 :   netio_init_recv_tcp_socket(rsocket, lsocket);
    1192             : 
    1193             :   log_dbg("Adding receive socket %d to epoll", infd);
    1194           7 :   rsocket->eq_ev_ctx.fd = infd;
    1195           7 :   rsocket->eq_ev_ctx.data = rsocket;
    1196           7 :   rsocket->eq_ev_ctx.cb = on_recv_socket_tcp_cm_event;
    1197           7 :   netio_register_read_tcp_fd(&rsocket->ctx->evloop, &rsocket->eq_ev_ctx);
    1198             : 
    1199           7 :   if(lsocket->recv_sub_msg == 1){
    1200           2 :     rsocket->sub_msg_buffers = malloc(32*sizeof(struct netio_buffer*));
    1201          66 :     for (int i = 0; i < 32; i++){
    1202          64 :       rsocket->sub_msg_buffers[i] = malloc(sizeof(struct netio_buffer));
    1203          64 :       rsocket->sub_msg_buffers[i]->size = sizeof(struct netio_subscription_message);
    1204          64 :       rsocket->sub_msg_buffers[i]->data = malloc(rsocket->sub_msg_buffers[i]->size);
    1205          64 :       netio_post_recv(rsocket, rsocket->sub_msg_buffers[i]);
    1206             :     }
    1207             :     log_dbg("Posted recv for subscription messages");
    1208           5 :   } else if (lsocket->attr.num_buffers > 0) {
    1209             :     log_dbg("Connection established, posting %lu buffers", lsocket->attr.num_buffers);
    1210           5 :     rsocket->sub_msg_buffers = NULL;
    1211           5 :     rsocket->recv_buffers = malloc(lsocket->attr.num_buffers * sizeof(struct netio_buffer));
    1212        1285 :     for(unsigned i=0; i<lsocket->attr.num_buffers; i++) {
    1213             :       log_trc("registering buffer of size %lu", lsocket->attr.buffer_size);
    1214        1280 :       rsocket->recv_buffers[i].size = lsocket->attr.buffer_size;
    1215        1280 :       rsocket->recv_buffers[i].data = malloc(lsocket->attr.buffer_size);
    1216        1280 :       netio_post_recv(rsocket, &rsocket->recv_buffers[i]);
    1217             :     }
    1218             :   } else {
    1219           0 :     log_error("Something went wrong. Not allocating any buffers for recv socket.");
    1220             :   }
    1221             : 
    1222           7 :   on_recv_socket_tcp_cm_event(infd, (void*)rsocket);
    1223             :   log_dbg("Done. connection accepted");
    1224           7 : }
    1225             : 
    1226             : 
    1227             : void
    1228          27 : on_buffered_recv_socket_tcp_cm_event(int fd, void* ptr)
    1229             : {
    1230          27 :   log_info("Connection event on buffered TCP/IP recv  (FD = %d, ptr = %p)", fd, ptr);
    1231             :     struct netio_buffered_recv_socket *socket = (struct netio_buffered_recv_socket*)ptr;
    1232             : 
    1233          27 :   socket->recv_socket.cq_ev_ctx.fd = fd;
    1234          27 :   socket->recv_socket.cq_ev_ctx.data = socket;
    1235          27 :   socket->recv_socket.cq_ev_ctx.cb = on_recv_socket_tcp_cq_event;
    1236          27 :   netio_register_read_tcp_fd(&socket->recv_socket.ctx->evloop, &socket->recv_socket.cq_ev_ctx);
    1237             : 
    1238             :   log_dbg("posting %d buffers", socket->num_pages);
    1239        6917 :   for(unsigned int i=0; i<socket->num_pages; i++) {
    1240        6890 :      netio_post_recv(&socket->recv_socket, &socket->pages[i]);
    1241             :   }
    1242             : 
    1243          27 :   if(socket->lsocket->cb_connection_established) {
    1244             :     log_dbg("Calling user callback");
    1245          27 :     socket->lsocket->cb_connection_established(socket);
    1246             :   } else{
    1247           0 :     log_warn("no callback available");
    1248             :   }
    1249          27 : }
    1250             : 
    1251             : 
    1252             : //This function is the TCP equivalent of on_buffered_listen_socket_cm_event()
    1253             : void
    1254          27 : on_buffered_listen_socket_tcp_cm_event(int fd, void* ptr)
    1255             : {
    1256             :   int infd;
    1257             :   socklen_t in_len;
    1258             :   struct sockaddr in_addr;
    1259          27 :   log_info("on_buffered_listen_socket_tcp_cm_event: called with fd = %d", fd);
    1260             : 
    1261             :   struct netio_buffered_listen_socket* lsocket = (struct netio_buffered_listen_socket*)ptr;
    1262          27 :   in_len = sizeof in_addr;
    1263          27 :   infd = accept(fd, &in_addr, &in_len);
    1264             : 
    1265          27 :   log_info("on_buffered_listen_socket_tcp_cm_event: listen socket: infd = %d", infd);
    1266          27 :   if (infd == -1)
    1267             :   {
    1268           0 :     if ((errno == EAGAIN) || (errno == EWOULDBLOCK))      // We have processed all incoming connections.
    1269             :       log_dbg("on_buffered_listen_socket_tcp_cm_event: nothing to do?");
    1270             :     else
    1271             :     {
    1272             :       log_dbg("on_buffered_listen_socket_tcp_cm_event: ERROR. errno = %d", errno);
    1273           0 :       exit(-1);
    1274             :     }
    1275             :   }
    1276             : 
    1277             : #if defined DEBUG || defined DEBUG_CM
    1278             :   char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
    1279             :   int ret = getnameinfo(&in_addr, in_len, hbuf, sizeof hbuf, sbuf, sizeof sbuf, 0);
    1280             :   if (ret == 0) {
    1281             :     log_dbg("on_buffered_listen_socket_tcp_cm_event: Accepted connection on descriptor %d (host=%s, service=%s)", infd, hbuf, sbuf);
    1282             :   }
    1283             : #endif
    1284             : 
    1285          27 :   make_socket_non_blocking(infd);            // Make the incoming socket non-blocking
    1286             :   log_dbg("Adding TCP/IP recv socket to listen socket");
    1287          27 :   struct netio_socket_list* new_entry = add_socket(&(lsocket->listen_socket.recv_sockets), BRECV);
    1288          27 :   struct netio_buffered_recv_socket* rsocket = (struct netio_buffered_recv_socket*)new_entry->socket;
    1289          27 :   netio_buffered_recv_tcp_socket_init(rsocket, lsocket);
    1290             : 
    1291             :   log_dbg("Adding receive socket %d to epoll", infd);
    1292          27 :   rsocket->recv_socket.eq_ev_ctx.fd = infd;
    1293          27 :   rsocket->recv_socket.eq_ev_ctx.data = rsocket;
    1294          27 :   rsocket->recv_socket.eq_ev_ctx.cb = on_buffered_recv_socket_tcp_cm_event;
    1295          27 :   netio_register_read_tcp_fd(&rsocket->recv_socket.ctx->evloop, &rsocket->recv_socket.eq_ev_ctx);
    1296             : 
    1297        6892 :   for(unsigned int i=0; i<rsocket->num_pages; i++) {
    1298        6865 :     netio_post_recv(&rsocket->recv_socket, &rsocket->pages[i]);
    1299             :   }
    1300             : 
    1301          27 :   on_buffered_recv_socket_tcp_cm_event(infd, (void*)rsocket);
    1302             :   log_dbg("Connection accepted");
    1303          27 : }
    1304             : 
    1305             : 
    1306             : //This is the equivalent of on_recv_socket_cm_event
    1307             : void
    1308           7 : on_recv_socket_tcp_cm_event(int fd, void *ptr)
    1309             : {
    1310             :   log_dbg("on_recv_socket_tcp_cm_event: called with fd = %d and ptr = %p", fd, ptr);
    1311             :   struct netio_recv_socket *socket = (struct netio_recv_socket*)ptr;
    1312           7 :   socket->cq_ev_ctx.fd = fd;
    1313           7 :   socket->cq_ev_ctx.data = socket;
    1314           7 :   socket->cq_ev_ctx.cb = on_recv_socket_tcp_cq_event;
    1315           7 :   netio_register_read_tcp_fd(&socket->ctx->evloop, &socket->cq_ev_ctx);
    1316           7 :   if(socket->lsocket->cb_connection_established)
    1317             :   {
    1318             :     log_dbg("Calling user callback");
    1319           7 :     socket->lsocket->cb_connection_established(socket);
    1320             :   }  else {
    1321           0 :     log_warn("no callback available");
    1322             :   }
    1323           7 : }

Generated by: LCOV version 1.0