LCOV - code coverage report
Current view: top level - netio-next/src - connection_event.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 354 520 68.1 %
Date: 2025-06-10 03:23:28 Functions: 13 17 76.5 %

          Line data    Source code
       1             : #include <stdio.h>
       2             : #include <unistd.h>
       3             : #include "log.h"
       4             : #include "netio/netio.h"
       5             : #include "connection_event.h"
       6             : #include "completion_event.h"
       7             : 
       8             : #if defined DEBUG || defined DEBUG_CM
       9             : #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
      10             : #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
      11             : #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
      12             : #else
      13             : #define log_dbg(...)
      14             : #define log_trc(...)
      15             : #endif
      16             : 
      17             : #define FATAL(msg, c) \
      18             : do { \
      19             :     log_fatal("%s %d: %s", msg, c, fi_strerror(-c)); \
      20             :     exit(2); \
      21             : } while(0);
      22             : 
      23             : 
      24             : // STATIC FUNCTIONS ////////////////////////////////////////////////////////////
      25             : 
      26             : static int
      27        4346 : read_cm_event(struct fid_eq* eq, struct fi_info** info, struct fi_eq_err_entry* err_entry)
      28             : {
      29        4346 :     uint32_t event;
      30        4346 :     struct fi_eq_cm_entry entry;
      31             : 
      32        4346 :     ssize_t rd = fi_eq_sread(eq, &event, &entry, sizeof entry, 0, 0);
      33        4346 :     if(rd < 0)
      34             :     {
      35        1333 :         if(rd == -FI_EAGAIN)
      36             :         {
      37             :             return rd;
      38             :         }
      39         215 :         if(rd == -FI_EAVAIL)
      40             :         {
      41         215 :             int r;
      42         215 :             if((r = fi_eq_readerr(eq, err_entry, 0)) < 0)
      43             :             {
      44           0 :                 FATAL("Failed to retrieve details on Event Queue error", r);
      45             :             }
      46         215 :             log_error("Event Queue error: %s (code: %d), provider specific: %s (code: %d)",
      47             :                 fi_strerror(err_entry->err), err_entry->err,
      48             :                 fi_eq_strerror(eq, err_entry->prov_errno, err_entry->err_data, NULL, 0),
      49             :                 err_entry->prov_errno);
      50         215 :             return rd;
      51             :         }
      52             :     }
      53        3013 :     if (rd != sizeof entry)
      54             :     {
      55           0 :         FATAL("Failed to read from Event Queue: %d", (int)rd);
      56             :     }
      57             : 
      58        3013 :     if(info != NULL)
      59         463 :         *info = entry.info;
      60             : 
      61        3013 :     return event;
      62             : }
      63             : 
      64             : 
      65             : static void
      66         463 : handle_connreq(struct netio_recv_socket* rsocket, struct netio_listen_socket* lsocket, struct fi_info *info, void (*cb)(int,void*), void* cbdata)
      67             : {
      68         463 :     int ret;
      69         463 :     struct fi_eq_attr eq_attr;
      70         463 :     eq_attr.wait_obj = FI_WAIT_FD;
      71             : 
      72         463 :     if((ret = fi_domain(lsocket->fabric, info, &rsocket->domain, NULL)))
      73             :     {
      74           0 :         fi_reject(lsocket->pep, info->handle, NULL, 0);
      75           0 :         FATAL("Listen socket, cannot open fabric, error ", ret);
      76             :     }
      77             : 
      78         463 :     if((ret = fi_endpoint(rsocket->domain, info, &rsocket->ep, NULL)))
      79             :     {
      80           0 :         fi_reject(lsocket->pep, info->handle, NULL, 0);
      81           0 :         FATAL("Listen socket, cannot open endpoint, error ", ret);
      82             :     }
      83             : 
      84             :     /* Create a new event queue for the new active socket */
      85         463 :     if((ret = fi_eq_open(lsocket->fabric, &eq_attr, &rsocket->eq, NULL)))
      86             :     {
      87           0 :         fi_reject(lsocket->pep, info->handle, NULL, 0);
      88           0 :         FATAL("Listen socket, cannot open Event Queue, error ", ret);
      89             :     }
      90             : 
      91         463 :     if((ret = fi_ep_bind((rsocket->ep), &rsocket->eq->fid, 0)))
      92             :     {
      93           0 :         fi_reject(lsocket->pep, info->handle, NULL, 0);
      94           0 :         FATAL("Listen socket, cannot bind Event Queue to endpoint, error ", ret);
      95             :     }
      96             : 
      97         463 :     if((ret = fi_control(&rsocket->eq->fid, FI_GETWAIT, &rsocket->eqfd)))
      98             :     {
      99           0 :         fi_reject(lsocket->pep, info->handle, NULL, 0);
     100           0 :         FATAL("Listen socket failed to obtain Event Queue wait object", ret);
     101             :     }
     102         463 :     rsocket->eq_ev_ctx.fd = rsocket->eqfd;
     103         463 :     rsocket->eq_ev_ctx.data = cbdata;
     104         463 :     rsocket->eq_ev_ctx.cb = cb;
     105             : 
     106         463 :     log_dbg("Adding RECV EQ polled fid %d %p", rsocket->eqfd, &rsocket->eq->fid);
     107         463 :     add_polled_fid(&rsocket->ctx->evloop.pfids, lsocket->fabric, &rsocket->eq->fid, rsocket->eqfd, &rsocket, cb);
     108         463 :     add_open_fd(&rsocket->ctx->evloop.openfds, rsocket->eqfd, NETIO_EQ, URECV, &rsocket);
     109         463 :     netio_register_read_fd(&rsocket->ctx->evloop, &rsocket->eq_ev_ctx);
     110         463 :     log_dbg("recv_socket: EQ fd %d waiting for connection", rsocket->eqfd);
     111             : 
     112         463 :     struct fi_cq_attr cq_attr;
     113         463 :     cq_attr.size = NETIO_MAX_CQ_ENTRIES;      /* # entries for CQ */
     114         463 :     cq_attr.flags = 0;     /* operation flags */
     115         463 :     cq_attr.format = FI_CQ_FORMAT_DATA;    /* completion format */
     116         463 :     cq_attr.wait_obj= FI_WAIT_FD;  /* requested wait object */
     117         463 :     cq_attr.signaling_vector = 0; /* interrupt affinity */
     118         463 :     cq_attr.wait_cond = FI_CQ_COND_NONE; /* wait condition format */
     119         463 :     cq_attr.wait_set = NULL;  /* optional wait set */
     120             : 
     121             :     //FI_RECV CQ
     122         463 :     if((ret = fi_cq_open(rsocket->domain, &cq_attr, &rsocket->cq, NULL)))
     123             :     {
     124           0 :         fi_reject(lsocket->pep, info->handle, NULL, 0);
     125           0 :         FATAL("Listen socket, cannot open Completion Queue, error ", ret);
     126             :     }
     127             : 
     128         463 :     if((ret = fi_ep_bind((rsocket->ep), &rsocket->cq->fid, FI_RECV)))
     129             :     {
     130           0 :         fi_reject(lsocket->pep, info->handle, NULL, 0);
     131           0 :         FATAL("Listen socket, cannot bind Completion Queue to endpoint, error ", ret);;
     132             :     }
     133             : 
     134             :     //FI_TRANSMIT CQ - also necessary
     135         463 :     cq_attr.format = FI_CQ_FORMAT_UNSPEC;
     136         463 :     cq_attr.wait_obj= FI_WAIT_NONE;
     137         463 :     if((ret = fi_cq_open(rsocket->domain, &cq_attr, &rsocket->tcq, NULL)))
     138             :     {
     139           0 :         fi_reject(lsocket->pep, info->handle, NULL, 0);
     140           0 :         FATAL("Listen socket, cannot open Completion Queue, error ", ret);
     141             :     }
     142             : 
     143         463 :     if((ret = fi_ep_bind((rsocket->ep), &rsocket->tcq->fid, FI_TRANSMIT)))
     144             :     {
     145           0 :         fi_reject(lsocket->pep, info->handle, NULL, 0);
     146           0 :         FATAL("Listen socket, cannot bind Completion Queue to endpoint, error ", ret);;
     147             :     }
     148             : 
     149         463 :     if((ret = fi_enable(rsocket->ep)))
     150             :     {
     151           0 :         fi_reject(lsocket->pep, info->handle, NULL, 0);
     152           0 :         FATAL("Listen socket, cannot enable recv socket endpoint, error ", ret);
     153             :     }   
     154             : 
     155         463 :     if((ret = fi_accept(rsocket->ep, NULL, 0)))
     156             :     {
     157           0 :         fi_reject(lsocket->pep, info->handle, NULL, 0);
     158           0 :         FATAL("Listen socket, connection rejected, error ", ret);;
     159             :     }
     160         463 :     log_dbg("connection accepted. Lsocket EQ: %d with evloop %d, rsocket EQ %d with evloop %d", lsocket->eqfd, lsocket->ctx->evloop.epollfd, rsocket->eqfd, rsocket->ctx->evloop.epollfd);
     161         463 : }
     162             : 
     163             : 
     164             : void
     165         350 : handle_recv_socket_shutdown(struct netio_recv_socket* socket)
     166             : {
     167         350 :     if(socket->eqfd < 0){
     168           0 :         log_info("handle_recv_socket_shutdown on closed socket (eqfd %d)", socket->eqfd);
     169           0 :         return;
     170             :     }
     171         350 :     int ret = 0;
     172         350 :     struct epoll_event ep_event; /* needed only for kernel <2.6.9  */
     173         350 :     log_dbg("Handle_recv_socket_shutdown for socket %p, evloop: %d", socket, socket->ctx->evloop.epollfd);
     174             : 
     175         350 :     if(socket->ep && (ret = fi_close(&socket->ep->fid)))
     176             :     {
     177           0 :         log_error("Failed to close recv socket Endpoint %d: %s", ret, fi_strerror(-ret));
     178             :     }
     179         350 :     log_dbg("fi_close done for endpoint.");
     180             : 
     181             :     //EQ
     182         350 :     remove_polled_fid(&socket->ctx->evloop.pfids, socket->eq_ev_ctx.fd);
     183         350 :     ret = epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->eq_ev_ctx.fd, &ep_event);
     184         350 :     if ( ret ){ log_warn("netio_recv_socket: cannot remove EQ fd %d from evloop %d: %s", socket->eq_ev_ctx.fd, socket->ctx->evloop.epollfd, strerror(errno));}
     185         350 :     log_dbg("netio_recv_socket: removing EQ fd %d from evloop %d", socket->eq_ev_ctx.fd, socket->ctx->evloop.epollfd);
     186         350 :     if(socket->eq && (ret = fi_close(&socket->eq->fid)))
     187             :     {
     188           0 :         log_error("Failed to fi_close recv socket Event Queue %d: %s", ret, fi_strerror(-ret));
     189           0 :         ret = close(socket->eq_ev_ctx.fd);
     190           0 :         if ( ret ) {log_warn("Cannot close recv socket EQ fd %d: %s", socket->eq_ev_ctx.fd, strerror(errno));}
     191             :     }
     192         350 :     remove_open_fd(&socket->ctx->evloop, socket->eq_ev_ctx.fd);
     193         350 :     socket->eqfd = -1;
     194             : 
     195             :     //FI_RECV CQ
     196         350 :     remove_polled_fid(&socket->ctx->evloop.pfids, socket->cq_ev_ctx.fd);
     197         350 :     ret = epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->cq_ev_ctx.fd, &ep_event);
     198         350 :     if ( ret ){ log_warn("netio_recv_socket: cannot remove CQ fd %d from evloop %d: %s", socket->cq_ev_ctx.fd, socket->ctx->evloop.epollfd, strerror(errno));}
     199         350 :     log_dbg("netio_recv_socket: removing CQ fd %d from evloop %d", socket->cq_ev_ctx.fd, socket->ctx->evloop.epollfd);
     200         350 :     if(socket->cq && (ret = fi_close(&socket->cq->fid)))
     201             :     {
     202           0 :         log_error("Failed to close recv socket Completion Queue %d: %s", ret, fi_strerror(-ret));
     203           0 :         ret = close(socket->cq_ev_ctx.fd);
     204           0 :         if ( ret ) {log_warn("Cannot close recv socket CQ fd %d: %s", socket->cq_ev_ctx.fd, strerror(errno));}
     205             :     }
     206         350 :     remove_open_fd(&socket->ctx->evloop, socket->cq_ev_ctx.fd);
     207         350 :     socket->cqfd = -1;
     208             :     //FI_TRANSMIT CQ
     209         350 :     if((ret = fi_close(&socket->tcq->fid)))
     210             :     {
     211           0 :         log_error("Failed to close FI_TRANSMIT recv socket Completion Queue %d: %s", ret, fi_strerror(-ret));
     212             :     }
     213             : 
     214             :     //MR
     215         350 :     uint16_t mem_regions = socket->reg_mr;
     216       77842 :     for(uint16_t i=0; i<mem_regions; ++i)
     217             :     {
     218       77492 :         if ((ret = fi_close(socket->mr[i]))){
     219           0 :             log_warn("Failed to close recv Memory Region %d, error %d.", i, ret);
     220             :         }
     221             :         else{
     222       77492 :             socket->reg_mr-=1;
     223             :         }
     224             :     }
     225         350 :     if(socket->reg_mr==0){free(socket->mr);socket->mr=NULL;}
     226         350 :     if((ret = fi_close(&socket->domain->fid)))
     227             :     {
     228           0 :         log_error("Failed to close recv socket Domain %d: %s", ret, fi_strerror(-ret));
     229             :     }    
     230         350 :     if (socket->sub_msg_buffers != NULL){
     231        1881 :         for(int i = 0; i < 32; i++){
     232        1824 :             free(socket->sub_msg_buffers[i]->data); 
     233        1824 :             free(socket->sub_msg_buffers[i]);
     234             :         }
     235          57 :         free(socket->sub_msg_buffers);
     236          57 :         socket->sub_msg_buffers = NULL;
     237             :     }
     238             : }
     239             : 
     240             : void
     241         786 : handle_send_socket_shutdown(struct netio_send_socket* socket)
     242             : {
     243             :     // if(socket->state != CONNECTED){
     244             :     //     log_dbg("Nothing to do, send socket has already been freed....");
     245             :     //     return;
     246             :     // }
     247         786 :     int ret = 0;
     248         786 :     struct epoll_event ep_event; /* needed only for kernel <2.6.9  */
     249         786 :     log_dbg("Handle_send_socket_shutdown. Socket with EQ: %d", socket->eqfd);
     250             : 
     251         786 :     if(socket->ep && (ret = fi_close(&socket->ep->fid)))
     252             :     {
     253           0 :         log_error("Failed to close send socket Endpoint %d: %s", ret, fi_strerror(-ret));
     254             :     }
     255         786 :     socket->ep = NULL;
     256             : 
     257             :     //EQ
     258         786 :     log_dbg("netio_send_socket: removing EQ fd %d from evloop %d", socket->eqfd, socket->ctx->evloop.epollfd);
     259         786 :     remove_polled_fid(&socket->ctx->evloop.pfids, socket->eqfd);
     260         786 :     ret = epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->eqfd, &ep_event);
     261         786 :     if ( ret ){ log_warn("netio_send_socket: cannot remove EQ fd %d from evloop %d: %s", socket->eqfd, socket->ctx->evloop.epollfd, strerror(errno));}
     262         786 :     if(socket->eq && (ret = fi_close(&socket->eq->fid)))
     263             :     {
     264           0 :         log_error("Failed to close send socket Event Queue %d: %s", ret, fi_strerror(-ret));
     265           0 :         ret = close(socket->eq_ev_ctx.fd);
     266           0 :         if ( ret ) { log_warn("Cannot close send socket eq fd %d, %s", socket->eq_ev_ctx.fd, strerror(errno));}
     267             : 
     268             :     }
     269         786 :     remove_open_fd(&socket->ctx->evloop, socket->eqfd);
     270         786 :     socket->eqfd = -1;
     271             : 
     272             :     //FI_TRANSMIT CQ
     273         786 :     log_dbg("netio_send_socket: removing CQ fd %d from evloop %d", socket->cq_ev_ctx.fd, socket->ctx->evloop.epollfd);
     274         786 :     remove_polled_fid(&socket->ctx->evloop.pfids, socket->cqfd);
     275         786 :     ret = epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->cqfd, &ep_event);
     276         786 :     if ( ret ){ log_warn("netio_send_socket: cannot remove CQ fd %d from evloop %d: %s", socket->cqfd, socket->ctx->evloop.epollfd, strerror(errno));}
     277         786 :     if(socket->cq && (ret = fi_close(&socket->cq->fid)))
     278             :     {
     279           0 :         log_error("Failed to close send socket Completion Queue %d: %s", ret, fi_strerror(-ret));
     280           0 :         ret = close(socket->cq_ev_ctx.fd);
     281           0 :         if ( ret ){ log_warn("Could not close send socket cq fd %d: %s", socket->cq_ev_ctx.fd, strerror(errno));} 
     282             :     }
     283         786 :     remove_open_fd(&socket->ctx->evloop, socket->cqfd);
     284         786 :     socket->cqfd = -1;
     285             :     //FI_RECV CQ
     286         786 :     if((ret = fi_close(&socket->rcq->fid)))
     287             :     {
     288           0 :         log_error("Failed to close FI_RECV senf socket Completion Queue %d: %s", ret, fi_strerror(-ret));
     289             :     }
     290             : 
     291         786 :     socket->domain->nb_sockets -= 1;
     292         786 :     if (socket->domain->nb_sockets == 0) {
     293       79377 :         for(unsigned int i=0; i < socket->domain->reg_mr; ++i) {
     294       78592 :             if ((ret = fi_close(socket->domain->mr[i]))) {
     295       78592 :                 log_warn("Failed to close send Memory Region %d, error %d.", i, ret);;
     296             :             }
     297             :         }
     298         785 :         if ((ret = fi_close(&socket->domain->domain->fid))) {
     299           0 :             log_error("Failed to close send socket Domain %d: %s", ret, fi_strerror(-ret));
     300             :         }
     301             : 
     302         785 :         if(socket->ctx->evloop.pfids.count == 0){
     303         641 :             if ((ret = fi_close(&socket->domain->fabric->fid))) {
     304           0 :                 log_error("Failed to close send socket Fabric %d: %s", ret, fi_strerror(-ret));
     305             :             }
     306             :         }
     307         785 :         free(socket->domain->mr);
     308         785 :         socket->domain->mr = NULL;
     309         785 :         free(socket->domain);
     310         785 :         socket->domain = NULL; 
     311             :     }
     312         786 :     if(socket->fi != NULL){
     313         786 :         fi_freeinfo(socket->fi);
     314         786 :         socket->fi = NULL;
     315             :     }
     316         786 :     socket->state = DISCONNECTED;
     317         786 : }
     318             : 
     319             : 
     320             : static void
     321         215 : handle_send_socket_shutdown_on_connetion_refused(struct netio_send_socket* socket)
     322             : {
     323         215 :     int ret = 0;
     324             :     //struct fi_eq_err_entry err_entry;
     325         215 :     struct epoll_event ep_event; /* needed only for kernel <2.6.9  */
     326         215 :     if(socket->ep && (ret = fi_close(&socket->ep->fid)))
     327             :     {
     328           0 :         log_error("Failed to close send socket endpoint %d: %s", ret, fi_strerror(-ret));
     329             :     }
     330             : 
     331             :     //EQ
     332         215 :     remove_polled_fid(&socket->ctx->evloop.pfids, socket->eqfd);
     333         215 :     ret =  epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->eqfd, &ep_event);
     334         215 :     if (ret) {log_warn("Conn refused: cannot deregister send_socket EQ fd %d from evloop %d, %s", socket->eqfd, socket->ctx->evloop.epollfd, strerror(errno));}
     335         215 :     if(socket->eq && (ret = fi_close(&socket->eq->fid)))
     336             :     {
     337           0 :         log_error("Failed to close send socket Event Queue %d: %s", ret, fi_strerror(-ret));
     338           0 :         ret = close(socket->eq_ev_ctx.fd);
     339           0 :         if ( ret ) { log_warn("Cannot close send socket eq fd %d, %s", socket->eq_ev_ctx.fd, strerror(errno));}
     340             :     }
     341         215 :     remove_open_fd(&socket->ctx->evloop, socket->eqfd);
     342         215 :     socket->eqfd = -1;
     343             : 
     344             :     //FI_TRANSMIT CQ - wait object not retrieved yet
     345         215 :     if(socket->cq && (ret = fi_close(&socket->cq->fid)))
     346             :     {
     347           0 :         log_error("Failed to close FI_TRANSMIT send socket Completion Queue %d: %s", ret, fi_strerror(-ret));
     348             :     }
     349             :     //FI_RECV CQ 
     350         215 :     if(fi_close(&socket->rcq->fid))
     351             :     {
     352           0 :         log_error("Failed to close FI_RECV send socket Completion Queue %d: %s", ret, fi_strerror(-ret));
     353             :     }
     354             :     
     355         215 :     socket->domain->nb_sockets -= 1;
     356         215 :     if (socket->domain->nb_sockets == 0) {
     357         261 :         for(unsigned int i=0; i < socket->domain->reg_mr; ++i) {
     358          46 :             if ((ret = fi_close(socket->domain->mr[i]))) {
     359           0 :                 log_warn("Failed to close send Memory Region %d, error %d.", i, ret);
     360             :             }
     361             :         }
     362         215 :         if ((ret = fi_close(&socket->domain->domain->fid))) {
     363           0 :             log_error("Failed to close send socket Domain %d: %s", ret, fi_strerror(-ret));
     364             :         }
     365         215 :         if(socket->ctx->evloop.pfids.count == 0){
     366         157 :             if ((ret = fi_close(&socket->domain->fabric->fid))) {
     367           0 :                 log_error("Failed to close send socket Fabric %d: %s", ret, fi_strerror(-ret));
     368             :             }
     369             :         }
     370         215 :         free(socket->domain->mr);
     371         215 :         socket->domain->mr = NULL;
     372         215 :         free(socket->domain);
     373         215 :         socket->domain = NULL;
     374             : 
     375             :     }
     376         215 :     socket->state = UNCONNECTED;
     377         215 : }
     378             : 
     379             : // WAIT OBJECT CALLBACKS ///////////////////////////////////////////////////////////
     380             : 
     381             : void
     382        3070 : on_send_socket_cm_event(int fd, void* ptr)
     383             : {
     384        3070 :     struct netio_send_socket* socket = (struct netio_send_socket*)ptr;
     385        3070 :     struct fi_eq_err_entry err_entry;
     386        3070 :     log_dbg("send socket %p fd %d: connection event, evloop: %d", socket, fd, socket->ctx->evloop.epollfd);
     387             : 
     388        5022 :     if(socket->state == DISCONNECTED){return;} //Check if event queue was already closed. TODO: Need to fix receiving messages after closing
     389        3070 :     int event = read_cm_event(socket->eq, NULL, &err_entry);
     390        3070 :     int ret = 0;
     391             : 
     392        3070 :     log_dbg("event %d", event);
     393             : 
     394        3070 :     switch(event)
     395             :     {
     396         788 :     case FI_SHUTDOWN:
     397         788 :         if (socket->eqfd < 0 ){
     398           0 :             log_info("Ignoring FI_SHUTDOWN on send_socket, invalid eqfd (socket already closed)");
     399        1952 :             return;
     400             :         }
     401         788 :         log_info("fi_verbs_process_send_socket_cm_event: FI_SHUTDOWN");
     402         788 :         if(socket->cb_internal_connection_closed){
     403         318 :             socket->cb_internal_connection_closed(socket);
     404             :         }
     405         788 :         if(socket->cb_connection_closed) {
     406         788 :             socket->cb_connection_closed(socket);
     407             :         }
     408             :         return;
     409             : 
     410         949 :     case FI_CONNECTED:
     411         949 :         socket->cqfd = -1;
     412         949 :         if((ret = fi_control(&socket->cq->fid, FI_GETWAIT, &socket->cqfd)))
     413             :         {
     414           0 :             FATAL("Failed to retrieve wait object for send socket Completion Queue", ret);
     415             :         }
     416             : 
     417         949 :         socket->cq_ev_ctx.fd = socket->cqfd;
     418         949 :         socket->cq_ev_ctx.data = socket;
     419         949 :         socket->cq_ev_ctx.cb = on_send_socket_cq_event;
     420             : 
     421         949 :         log_dbg("send_socket: EQ fd %d connected, CQ fd %d", socket->eqfd, socket->cqfd);
     422         949 :         log_dbg("Adding SEND CQ polled fid %d %p", socket->cqfd, &socket->cq->fid);
     423         949 :         add_polled_fid(&socket->ctx->evloop.pfids, socket->domain->fabric, &socket->cq->fid, socket->cqfd, socket, on_send_socket_cq_event);
     424         949 :         add_open_fd(&socket->ctx->evloop.openfds, socket->cqfd, NETIO_CQ, USEND, socket);
     425         949 :         netio_register_read_fd(&socket->ctx->evloop, &socket->cq_ev_ctx);
     426             : 
     427         949 :         socket->state = CONNECTED;
     428             : 
     429         949 :         if(socket->cb_connection_established) {
     430         949 :           socket->cb_connection_established(socket);
     431             :         }
     432             : 
     433             :         return;
     434             : 
     435             :     case FI_MR_COMPLETE:
     436             :     case FI_AV_COMPLETE:
     437             :     case FI_JOIN_COMPLETE:
     438             :         // Not implemented
     439        1118 :         break;
     440             : 
     441             : 
     442         215 :     case -FI_EAVAIL:
     443             : 
     444         215 :         switch(err_entry.err) {
     445             : 
     446         215 :             case FI_ECONNREFUSED:
     447             : 
     448         215 :                 log_dbg("Connection refused (FI_ECONNREFUSED). Deallocating send_socket resources.");
     449             : 
     450         215 :                 if(socket->eqfd < 0){
     451             :                     log_dbg("FI_ECONNREFUSED on send socket with EQ fd %d. Not clearing it.", socket->eqfd);
     452             :                     return;
     453             :                 }
     454             : 
     455         215 :                 handle_send_socket_shutdown_on_connetion_refused(socket);
     456             : 
     457         215 :                 if(socket->cb_error_connection_refused) {
     458         215 :                     socket->cb_error_connection_refused(socket);
     459             :                 }
     460             : 
     461             :             case FI_ETIMEDOUT:
     462         215 :                 log_info("fi_verbs_process_send_socket_cm_event: FI_ETIMEDOUT");
     463         215 :                 if (socket->eqfd < 0 ){
     464         215 :                     log_info("Ignoring FI_SHUTDOWN on send_socket, invalid eqfd (socket already closed)");
     465         215 :                     break;
     466             :                 }
     467             : 
     468             :                 // Need to take care of receive socket as well
     469           0 :                 if (socket->recv_socket != NULL){
     470           0 :                     if (socket->recv_socket->eqfd < 0 ){
     471           0 :                         log_info("Ignoring FI_ETIMEDOUT on recv_socket, invalid eqfd (socket already closed)");
     472           0 :                         return;
     473             :                     }
     474           0 :                     log_info("Shutting down receive socket on FI_ETIMEDOUT");
     475           0 :                     handle_recv_socket_shutdown(socket->recv_socket);
     476           0 :                     if(socket->recv_socket->lsocket->cb_connection_closed) {
     477           0 :                         socket->recv_socket->lsocket->cb_connection_closed(socket->recv_socket);
     478             :                     }
     479             :                 }
     480             : 
     481           0 :                 if(socket->cqfd < 0){ //cq not initalized yet
     482           0 :                     handle_send_socket_shutdown_on_connetion_refused(socket);
     483             :                 } else {
     484           0 :                     if(socket->cb_internal_connection_closed){
     485           0 :                         socket->cb_internal_connection_closed(socket);
     486             :                     }
     487           0 :                     if(socket->cb_connection_closed) {
     488           0 :                         socket->cb_connection_closed(socket);
     489             :                     }
     490             :                 }
     491             :                 break;
     492             : 
     493           0 :             default:
     494             : 
     495           0 :                 log_error("Unhandled error in the Event Queue: %s (code: %d), provider specific: %s (code: %d)",
     496             :                         fi_strerror(err_entry.err), err_entry.err,
     497             :                         fi_eq_strerror(socket->eq, err_entry.prov_errno, err_entry.err_data, NULL, 0),
     498             :                         err_entry.prov_errno);
     499             :         }
     500             :         return;
     501             : 
     502        1118 :     case -FI_EAGAIN:
     503        1118 :         struct fid* fp = &socket->eq->fid;
     504        1118 :         fi_trywait(socket->domain->fabric, &fp, 1);
     505        1118 :         break;
     506             : 
     507           0 :     default:
     508           0 :         log_error("Unexpected event %d in send socket Event Queue", event);
     509           0 :         exit(2);
     510             :     }
     511             : }
     512             : 
     513             : 
     514             : void
     515         211 : on_listen_socket_cm_event(int fd, void* ptr)
     516             : {
     517         211 :     struct netio_listen_socket* lsocket = (struct netio_listen_socket*)ptr;
     518         211 :     log_dbg("listen socket: connection event");
     519             : 
     520         211 :     if(lsocket->eqfd != fd){
     521           0 :         log_dbg("Listen socket CM event: inconsistend fd. Ignoring event.");
     522           0 :         return;
     523             :     }
     524             : 
     525             : 
     526         211 :     struct fi_info *info = NULL;
     527         211 :     struct fi_eq_err_entry err_entry;
     528         211 :     int event = read_cm_event(lsocket->eq, &info, &err_entry);
     529             : 
     530             : 
     531         211 :     switch (event)
     532             :     {
     533         211 :         case FI_CONNREQ:
     534         211 :             log_dbg("fi_verbs_process_listen_socket_cm_event: FI_CONNREQ");
     535         211 :             struct netio_socket_list* new_entry = add_socket(&lsocket->recv_sockets, URECV);
     536         211 :             struct netio_recv_socket* rsocket = (struct netio_recv_socket*)new_entry->socket;
     537         211 :             netio_init_recv_socket(rsocket, lsocket);
     538         211 :             handle_connreq(rsocket, lsocket, info, on_recv_socket_cm_event, rsocket);
     539         211 :             if(lsocket->recv_sub_msg == 1){
     540          77 :                 rsocket->sub_msg_buffers = malloc(32*sizeof(struct netio_buffer*));
     541        2541 :                 for (int i = 0; i < 32; i++){
     542        2464 :                     rsocket->sub_msg_buffers[i] = malloc(sizeof(struct netio_buffer));
     543        2464 :                     rsocket->sub_msg_buffers[i]->size = sizeof(struct netio_subscription_message);
     544        2464 :                     rsocket->sub_msg_buffers[i]->data = malloc(rsocket->sub_msg_buffers[i]->size);
     545        2464 :                     netio_register_recv_buffer(rsocket, rsocket->sub_msg_buffers[i], 0);
     546        2464 :                     netio_post_recv(rsocket, rsocket->sub_msg_buffers[i]);
     547             :                 }
     548             :                 log_dbg("Posted recv for subscription messages");
     549         134 :             } else if (lsocket->attr.num_buffers > 0) {
     550         134 :                 log_dbg("Connection established, posting %lu buffers", lsocket->attr.num_buffers);
     551         134 :                 rsocket->sub_msg_buffers = NULL;
     552         134 :                 rsocket->recv_buffers = malloc(lsocket->attr.num_buffers * sizeof(struct netio_buffer));
     553       30358 :                 for(unsigned i=0; i<lsocket->attr.num_buffers; i++) {
     554       30224 :                     log_trc("registering buffer of size %lu", lsocket->attr.buffer_size);
     555       30224 :                     rsocket->recv_buffers[i].size = lsocket->attr.buffer_size;
     556       30224 :                     rsocket->recv_buffers[i].data = malloc(lsocket->attr.buffer_size);
     557       30224 :                     netio_register_recv_buffer(rsocket, &rsocket->recv_buffers[i], 0);
     558       30224 :                     netio_post_recv(rsocket, &rsocket->recv_buffers[i]);
     559             :                 }
     560             :             } else {
     561           0 :                 log_error("Something went wrong. Not allocating any buffers for recv socket.");
     562             :             }
     563             :             break;
     564             : 
     565           0 :         case FI_CONNECTED:
     566           0 :             log_fatal("FI_CONNECTED received on listen socket");
     567           0 :             exit(2);
     568             : 
     569           0 :         case FI_SHUTDOWN:
     570           0 :             log_fatal("FI_SHUTDOWN received on listen socket");
     571           0 :             exit(2);
     572             : 
     573           0 :         case -FI_EAGAIN:
     574           0 :             struct fid* fp = &lsocket->eq->fid;
     575           0 :             fi_trywait(lsocket->fabric, &fp, 1);
     576           0 :             break;
     577             : 
     578           0 :         case -FI_EAVAIL:
     579           0 :             log_error("Unhandled error in listen socket EQ code: %d, provider specific code: %d",
     580             :                 err_entry.err, err_entry.prov_errno);
     581           0 :             break;
     582             :     }
     583         211 :     fi_freeinfo(info);
     584             : }
     585             : 
     586             : 
     587             : void
     588         252 : on_buffered_listen_socket_cm_event(int fd, void* ptr)
     589             : {
     590         252 :     struct netio_buffered_listen_socket* lsocket = (struct netio_buffered_listen_socket*)ptr;
     591         252 :     log_dbg("buffered listen socket: connection event");
     592             : 
     593         252 :     if(lsocket->listen_socket.eqfd != fd){
     594           0 :         log_dbg("Buffered listen socket CM event: inconsistend fd. Ignoring event.");
     595           0 :         return;
     596             :     }
     597             : 
     598             : 
     599         252 :     struct fi_info *info = NULL;
     600         252 :     struct fi_eq_err_entry err_entry;
     601         252 :     int event = read_cm_event(lsocket->listen_socket.eq, &info, &err_entry);
     602             : 
     603         252 :     switch (event)
     604             :     {
     605         252 :         case FI_CONNREQ:
     606         252 :             log_dbg("FI_CONNREQ");
     607             : 
     608         252 :             struct netio_socket_list* new_entry = add_socket(&(lsocket->listen_socket.recv_sockets), BRECV);
     609         252 :             struct netio_buffered_recv_socket* rsocket = (struct netio_buffered_recv_socket*)new_entry->socket;
     610         252 :             netio_buffered_recv_socket_init(rsocket, lsocket);
     611         252 :             handle_connreq(&rsocket->recv_socket, &lsocket->listen_socket, info, on_buffered_recv_socket_cm_event, rsocket);
     612         252 :             log_info("fi_verbs_process_recv_socket_cm_event: FI_CONNECTED, posting %d buffers", rsocket->num_pages);
     613       71084 :             for(unsigned int i=0; i<rsocket->num_pages; i++) {
     614       70832 :                 netio_register_recv_buffer(&rsocket->recv_socket, &rsocket->pages[i], 0);
     615       70832 :                 netio_post_recv(&rsocket->recv_socket, &rsocket->pages[i]);
     616             :             }
     617             :             break;
     618             : 
     619           0 :         case FI_CONNECTED:
     620           0 :             log_fatal("FI_CONNECTED received on buffered listen socket");
     621           0 :             exit(2);
     622             : 
     623           0 :         case FI_SHUTDOWN:
     624           0 :             log_fatal("FI_SHUTDOWN received on buffered listen socket");
     625           0 :             exit(2);
     626             : 
     627           0 :         case -FI_EAGAIN:
     628           0 :             struct fid* fp = &lsocket->listen_socket.eq->fid;
     629           0 :             fi_trywait(lsocket->listen_socket.fabric, &fp, 1);
     630           0 :             break;
     631             : 
     632           0 :         case -FI_EAVAIL:
     633           0 :             log_error("Unhandled error in buffer listen socket EQ code: %d, provider specific code: %d",
     634             :                 err_entry.err, err_entry.prov_errno);
     635           0 :             break;
     636             :     }
     637         252 :     fi_freeinfo(info);
     638             : }
     639             : 
     640             : 
     641             : void
     642         374 : on_recv_socket_cm_event(int fd, void* ptr)
     643             : {
     644         374 :     struct netio_recv_socket* socket = (struct netio_recv_socket*)ptr;
     645         374 :     log_info("recv socket %d: connection event", socket->eqfd);
     646         374 :     if(socket->eqfd != fd){
     647             :         log_dbg("Recv socket CM event: inconsistend fd. Ignoring event.");
     648           0 :         return;
     649             :     }
     650         374 :     int ret;
     651         374 :     struct fi_eq_err_entry err_entry;
     652         374 :     uint32_t event = read_cm_event(socket->eq, NULL, &err_entry);
     653             : 
     654         374 :     switch (event)
     655             :     {
     656         211 :     case FI_CONNECTED:
     657         211 :         log_info("fi_verbs_process_recv_socket_cm_event: FI_CONNECTED");
     658         211 :         socket->cqfd = -1;
     659         211 :         if((ret = fi_control(&socket->cq->fid, FI_GETWAIT, &socket->cqfd)))
     660             :         {
     661           0 :             FATAL("Failed to retrieve recv socket Completion Queue wait object", ret);
     662             :         }
     663             : 
     664         211 :         socket->cq_ev_ctx.fd = socket->cqfd;
     665         211 :         socket->cq_ev_ctx.data = socket;
     666         211 :         socket->cq_ev_ctx.cb = on_recv_socket_cq_event;
     667             : 
     668         211 :         log_dbg("Adding recv CQ polled fid %d %p", socket->cqfd, &socket->cq->fid);
     669         211 :         add_polled_fid(&socket->ctx->evloop.pfids, socket->lsocket->fabric, &socket->cq->fid, socket->cqfd, socket, on_recv_socket_cq_event);
     670         211 :         add_open_fd(&socket->ctx->evloop.openfds, socket->cqfd, NETIO_CQ, URECV, socket);
     671         211 :         netio_register_read_fd(&socket->ctx->evloop, &socket->cq_ev_ctx);
     672             : 
     673         211 :         log_dbg("recv_socket: EQ fd %d CQ fd %d connected", socket->eqfd, socket->cqfd);
     674         211 :         if(socket->lsocket->cb_connection_established) {
     675         211 :             socket->lsocket->cb_connection_established(socket);
     676             :         }
     677             : 
     678         374 :         break;
     679             : 
     680         163 :     case FI_SHUTDOWN:
     681         163 :         if (socket->eqfd < 0 ){
     682           0 :             log_info("Ignoring FI_SHUTDOWN on recv_socket, invalid eqfd (socket already closed)");
     683           0 :             return;
     684             :         }
     685         163 :         log_info("fi_verbs_process_recv_socket_cm_event: FI_SHUTDOWN");
     686         163 :         handle_recv_socket_shutdown(socket);
     687         163 :         if(socket->lsocket->cb_connection_closed) {
     688         106 :             socket->lsocket->cb_connection_closed(socket);
     689             :         }
     690             : 
     691         163 :         if (socket->lsocket->attr.num_buffers > 0){
     692       26222 :             for(int i = 0; i < socket->lsocket->attr.num_buffers; i++){
     693       26116 :                 free(socket->recv_buffers[i].data);
     694             :             }
     695         106 :             free(socket->recv_buffers);
     696             :         }
     697             :         
     698         163 :         int return_value = remove_socket(&socket->lsocket->recv_sockets, socket);
     699         163 :         log_info("Recv socket removed, result: %d", return_value);
     700         163 :         break;
     701             : 
     702             :     case FI_MR_COMPLETE:
     703             :     case FI_AV_COMPLETE:
     704             :     case FI_JOIN_COMPLETE:
     705             :         // Not implemented
     706             :         break;
     707             : 
     708           0 :     case -FI_EAGAIN:
     709           0 :         struct fid* fp = &socket->eq->fid;
     710           0 :         fi_trywait(socket->lsocket->fabric, &fp, 1);
     711           0 :         break;
     712             : 
     713           0 :     case -FI_EAVAIL:
     714           0 :         log_error("Unhandled error in recv socket EQ code: %d, provider specific code: %d",
     715             :             err_entry.err, err_entry.prov_errno);
     716           0 :         break;
     717             : 
     718           0 :     default:
     719           0 :         log_error("Unexpected event %d in recv socket Event Queue", event);
     720           0 :         exit(2);
     721         374 :         break;
     722             :     }
     723             : }
     724             : 
     725             : 
     726             : void
     727         439 : on_buffered_recv_socket_cm_event(int fd, void* ptr)
     728             : {
     729         439 :     struct netio_buffered_recv_socket* socket = (struct netio_buffered_recv_socket*)ptr;
     730         439 :     log_dbg("buffered recv socket %d: connection event (FD = %d)", socket->recv_socket.eqfd, fd);
     731         439 :     if(socket->recv_socket.eqfd != fd){
     732             :         log_dbg("Buffered recv socket CM event: inconsistend fd. Ignoring event.");
     733           0 :         return;
     734             :     }
     735         439 :     int ret;
     736         439 :     struct fi_eq_err_entry err_entry;
     737         439 :     uint32_t event = read_cm_event(socket->recv_socket.eq, NULL, &err_entry);
     738             : 
     739         439 :     switch (event)
     740             :     {
     741         252 :     case FI_CONNECTED:
     742         252 :         socket->recv_socket.cqfd = -1;
     743         252 :         if((ret = fi_control(&socket->recv_socket.cq->fid, FI_GETWAIT, &socket->recv_socket.cqfd)))
     744             :         {
     745           0 :             FATAL("Failed to retrieve recv socket Completion Queue wait object", ret);
     746             :         }
     747             : 
     748         252 :         socket->recv_socket.cq_ev_ctx.fd = socket->recv_socket.cqfd;
     749         252 :         socket->recv_socket.cq_ev_ctx.data = &socket->recv_socket;
     750         252 :         socket->recv_socket.cq_ev_ctx.cb = on_recv_socket_cq_event;
     751             : 
     752         252 :         log_dbg("Adding BUFFERED RECV CQ polled fid %d %p", socket->recv_socket.cqfd, &socket->recv_socket.cq->fid);
     753         252 :         add_open_fd(&socket->recv_socket.ctx->evloop.openfds, socket->recv_socket.cqfd, NETIO_CQ, BRECV, &socket->recv_socket);
     754         252 :         add_polled_fid(&socket->recv_socket.ctx->evloop.pfids,
     755         252 :                     socket->recv_socket.lsocket->fabric,
     756         252 :                     &socket->recv_socket.cq->fid,
     757             :                     socket->recv_socket.cqfd,
     758             :                     &socket->recv_socket,
     759             :                     on_recv_socket_cq_event);
     760             : 
     761         252 :         netio_register_read_fd(&socket->recv_socket.ctx->evloop, &socket->recv_socket.cq_ev_ctx);
     762         252 :         log_dbg("buffered_recv_socket: registering CQ fd %d", socket->recv_socket.cqfd);
     763             : 
     764         252 :         if(socket->lsocket->cb_connection_established) {
     765         252 :             socket->lsocket->cb_connection_established(socket);
     766             :         }
     767             : 
     768         439 :         break;
     769             : 
     770         187 :     case FI_SHUTDOWN:
     771         187 :         log_dbg("recv socket shutdown");
     772         187 :         if (socket->recv_socket.eqfd < 0 ){
     773           0 :             log_dbg("Ignoring FI_SHUTDOWN on buffered recv_socket, invalid eqfd (socket already closed)");
     774           0 :             return;
     775             :         }
     776         187 :         handle_recv_socket_shutdown(&socket->recv_socket);
     777       49739 :         for(unsigned int i=0; i<socket->num_pages; i++) {
     778       49552 :             free(socket->pages[i].data);
     779             :         }
     780         187 :         free(socket->pages);
     781         187 :         if(socket->lsocket->cb_connection_closed) {
     782         187 :             socket->lsocket->cb_connection_closed(socket);
     783             :         }
     784             : 
     785         187 :         remove_socket(&socket->lsocket->listen_socket.recv_sockets, socket);
     786         187 :         break;
     787             : 
     788             :     case FI_MR_COMPLETE:
     789             :     case FI_AV_COMPLETE:
     790             :     case FI_JOIN_COMPLETE:
     791             :         // Not implemented
     792             :         break;
     793             : 
     794           0 :     case -FI_EAGAIN:
     795           0 :         struct fid* fp = &socket->recv_socket.eq->fid;
     796           0 :         fi_trywait(socket->lsocket->listen_socket.fabric, &fp, 1);
     797           0 :         break;
     798             : 
     799           0 :     case -FI_EAVAIL:
     800             :         // error was found
     801           0 :         log_error("Unhandled error in buffered recv socket EQ code: %d, provider specific code: %d",
     802             :             err_entry.err, err_entry.prov_errno);
     803           0 :         break;
     804             : 
     805           0 :     default:
     806           0 :         log_error("Unexpected event %d in buffered recv socket Event Queue", event);
     807           0 :         exit(2);
     808         439 :         break;
     809             :     }
     810             : }
     811             : 
     812             : 
     813             : // CALLBACKS FOR GARBAGE COLLECTION  //////////////////////////////////////////////////////////
     814             : void
     815           0 : close_send_socket(void* ptr)
     816             : {
     817           0 :     log_info("Close_send_socket. Not supported anymore");
     818             :     // struct signal_data* sd = (struct signal_data*)ptr;
     819             :     // struct netio_send_socket* socket = (struct netio_send_socket*)sd->ptr;
     820             :     // handle_send_socket_shutdown(socket);
     821             :     // //delete the used signal
     822             :     // netio_signal_close(sd->evloop, sd->signal);
     823             :     // free(sd->signal);
     824             :     // free(sd);
     825           0 : }
     826             : 
     827             : void
     828           0 : close_buffered_send_socket(void *ptr)
     829             : {
     830           0 :     log_info("Closing buffered_send_socket %p. Not supported anymore", ptr);
     831             : //     struct signal_data* sd = (struct signal_data*)ptr;
     832             : //     struct netio_buffered_send_socket* socket = (struct netio_buffered_send_socket*)sd->ptr;
     833             : //     struct netio_send_socket* ssocket = &socket->send_socket;
     834             : //     netio_timer_close(&(ssocket->ctx->evloop), &socket->flush_timer);
     835             : //     netio_signal_close(&(ssocket->ctx->evloop), &socket->signal_buffer_available);
     836             : //     handle_send_socket_shutdown(ssocket);
     837             : //     for(size_t i=0; i < socket->buffers.num_buffers; ++i ){
     838             : //         free(socket->buffers.buffers[i]);
     839             : //     }
     840             : //     //delete the used signal
     841             : //     netio_signal_close(sd->evloop, sd->signal);
     842             : //     free(sd->signal);
     843             : //     free(sd);
     844           0 : }
     845             : 
     846             : void
     847           0 : close_recv_socket(void* ptr)
     848             : {
     849           0 :     log_info("Closing recv_socket %p. Not supported anymore", ptr);
     850             : //     struct signal_data* sd = (struct signal_data*)ptr;
     851             : //     struct netio_recv_socket* socket = (struct netio_recv_socket*)sd->ptr;
     852             : //     handle_recv_socket_shutdown(socket);
     853             : //     //clean up signal
     854             : //     netio_signal_close(sd->evloop, sd->signal);
     855             : //     free(sd->signal);
     856             : //     free(sd);
     857           0 : }
     858             : 
     859             : void
     860           0 : close_buffered_recv_socket(void* ptr)
     861             : {
     862           0 :     log_info("Closing buffered_recv_socket %p. Not supported anymore", ptr);
     863             :     // struct signal_data* sd = (struct signal_data*)ptr;
     864             :     // struct netio_buffered_recv_socket* socket = (struct netio_buffered_recv_socket*)sd->ptr;
     865             :     // handle_recv_socket_shutdown(&socket->recv_socket);
     866             :     // for(unsigned int i=0; i<socket->num_pages; i++) {
     867             :     //     free(socket->pages[i].data);
     868             :     // }
     869             :     // free(socket->pages);
     870             :     // //clean up signal
     871             :     // netio_signal_close(sd->evloop, sd->signal);
     872             :     // free(sd->signal);
     873             :     // free(sd);
     874           0 : }
     875             : 
     876             : 
     877             : void
     878          82 : close_listen_socket(void* ptr)
     879             : {
     880          82 :     log_dbg("close_listen_socket");
     881          82 :     struct signal_data* sd = (struct signal_data*)ptr;
     882          82 :     struct netio_listen_socket* socket = (struct netio_listen_socket*)sd->ptr;
     883          82 :     if(socket->recv_sockets != NULL){
     884             :         struct netio_socket_list* entry = socket->recv_sockets;
     885             :         int still_open = 0;
     886           0 :         while(entry != NULL){
     887           0 :             struct netio_recv_socket* recv_socket = entry->socket;
     888           0 :             still_open += check_open_fd_exists(&socket->ctx->evloop.openfds, recv_socket->eqfd);
     889           0 :             entry = entry->next;
     890             :         }
     891           0 :         if(still_open){
     892           0 :             netio_signal_fire(sd->signal);
     893           0 :             return;
     894             :         }
     895           0 :         free_socket_list(&socket->recv_sockets);
     896             :     }
     897          82 :     handle_listen_socket_shutdown(socket);      
     898             :     //clean up signal
     899          82 :     netio_signal_close(sd->evloop, sd->signal);
     900          82 :     free(sd->signal);
     901          82 :     free(sd);
     902             : }
     903             : 
     904             : 
     905             : void
     906         136 : close_buffered_listen_socket(void* ptr)
     907             : {
     908         136 :     log_dbg("close_buffered_listen_socket");
     909         136 :     struct signal_data* sd = (struct signal_data*)ptr;
     910         136 :     struct netio_listen_socket* socket = (struct netio_listen_socket*)sd->ptr;
     911         136 :     if(socket->recv_sockets != NULL){
     912             :         struct netio_socket_list* entry = socket->recv_sockets;
     913             :         int still_open = 0;
     914           0 :         while(entry != NULL){
     915           0 :             struct netio_buffered_recv_socket* recv_socket = entry->socket;
     916           0 :             still_open += check_open_fd_exists(&socket->ctx->evloop.openfds, recv_socket->recv_socket.eqfd);
     917           0 :             entry = entry->next;
     918             :         }
     919           0 :         if(still_open){
     920           0 :             netio_signal_fire(sd->signal);
     921           0 :             return;
     922             :         }
     923           0 :         free_socket_list(&socket->recv_sockets);
     924             :     }      
     925         136 :     handle_listen_socket_shutdown(socket);
     926             :     //clean up signal
     927         136 :     netio_signal_close(sd->evloop, sd->signal);
     928         136 :     free(sd->signal);
     929         136 :     free(sd);
     930             : }
     931             : 
     932             : 
     933             : void
     934         327 : handle_listen_socket_shutdown(struct netio_listen_socket* socket)
     935             : {
     936         327 :     if(socket->eqfd < 0){return;}//nothing to do
     937         327 :     struct epoll_event ep_event; /* needed only for kernel <2.6.9  */
     938         327 :     log_dbg("Handle_listen_socket_shutdown. Lsocket EQ: %d with evloop %d socket %p", socket->eqfd, socket->ctx->evloop.epollfd, socket);
     939         327 :     int ret = epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->eqfd, &ep_event);
     940         327 :     if(ret){ log_warn("Cannot deregister listen socket EQ %d from evloop %d", socket->eqfd, socket->ctx->evloop.epollfd); }
     941             : 
     942         327 :     if(socket->pep != NULL){
     943         327 :        fi_close(&socket->pep->fid);
     944         327 :        socket->pep = NULL;
     945             :     }
     946             : 
     947         327 :     log_dbg("netio_listen_socket: removing EQ fd %d from evloop %d, ret %d", socket->eqfd, socket->ctx->evloop.epollfd,  ret);
     948         327 :     if(socket->eq && (ret = fi_close(&socket->eq->fid)))
     949             :     {
     950           0 :         log_error("Failed to close listen socket %d: %s", ret, fi_strerror(-ret));
     951           0 :         ret = close(socket->eq_ev_ctx.fd);
     952           0 :         if ( ret ) {log_warn("Cannot close listen socket EQ fd %d: %s", socket->eq_ev_ctx.fd, strerror(errno));}
     953             :     }
     954         327 :     remove_open_fd(&socket->ctx->evloop, socket->eqfd);
     955         327 :     socket->eqfd = -1;
     956             : 
     957         327 :     if(socket->fi != NULL){
     958         327 :         fi_freeinfo(socket->fi);
     959         327 :         socket->fi = NULL;
     960             :     }
     961             :     //fi_close(&socket->fabric->fid);
     962             : }

Generated by: LCOV version 1.0