LCOV - code coverage report
Current view: top level - netio-next/src - completion_event.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 66 86 76.7 %
Date: 2025-08-12 04:15:35 Functions: 4 4 100.0 %

          Line data    Source code
       1             : #include <stdio.h>
       2             : #include "log.h"
       3             : #include "netio/netio.h"
       4             : #include "netio/netio_tcp.h"
       5             : 
       6             : #include "completion_event.h"
       7             : #include "log.h"
       8             : 
       9             : #if defined DEBUG || defined DEBUG_CQ
      10             : #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
      11             : #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
      12             : #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
      13             : #else
      14             : #define log_dbg(...)
      15             : #define log_trc(...)
      16             : #endif
      17             : 
      18             : // PRIVATE FUNCTIONS ///////////////////////////////////////////////////////////
      19             : 
      20             : void
      21    11916406 : on_recv_socket_cq_event(int fd, void* ptr)
      22             : {
      23    11916406 :   struct netio_recv_socket* socket = (struct netio_recv_socket*)ptr;
      24             : 
      25    11916406 :   if(socket->cqfd < 0) {
      26           0 :     log_info("on_recv_socket_cq_event called for unconnected socket.");
      27     4231967 :     return;
      28             :   }
      29             : 
      30    11916406 :   log_trc("recv socket CQ max %lu", socket->cq_size);
      31    11916406 :   struct fi_cq_data_entry completion_entry[NETIO_MAX_CQ_EVENTS];
      32    11916406 :   int ret = fi_cq_read(socket->cq, &completion_entry, socket->cq_size);
      33    11916378 :   log_trc("recv socket fd %d: %d completion events", fd, ret);
      34             : 
      35    11916378 :   if(ret < 0)
      36             :   {
      37     4232008 :     if(ret == -FI_EAGAIN){
      38     4232008 :       struct fid* fp = &socket->cq->fid;
      39     4232008 :       fi_trywait(socket->lsocket->fabric, &fp, 1);
      40     4231967 :       return;
      41             :     }
      42           0 :     else if(ret == -FI_EAVAIL){
      43           0 :       int r;
      44           0 :       struct fi_cq_err_entry err_entry;
      45           0 :       if((r = fi_cq_readerr(socket->cq, &err_entry, 0)) < 0)
      46             :       {
      47           0 :         log_error("Failed to retrieve details on Completion Queue error of recv socket, error %d: %s", r, fi_strerror(-r));
      48             :       }
      49           0 :       log_error("code %d reading Completion Queue of recv socket: %s", err_entry.err, fi_strerror(err_entry.err));
      50           0 :       log_error("Provider-specific error %d: %s", err_entry.prov_errno, fi_cq_strerror(socket->cq, err_entry.prov_errno, err_entry.err_data, NULL, 0));
      51           0 :       if(err_entry.err == FI_EIO) {
      52             :         // I/O error, the CM event code can handle this
      53           0 :         log_error("Send socket CQ I/O error, connection possibly closed: ignored");
      54           0 :         return;
      55             :       }
      56           0 :       if(err_entry.err == FI_ECANCELED) {
      57             :         // Operation Cancelled
      58           0 :         log_error("Send socket CQ operation cancelled.");
      59           0 :         return;
      60             :       }
      61             :     }
      62             :     else{
      63           0 :       log_error("Recv socket unhandled Completion Queue error %d: %s", ret, fi_strerror(-ret));
      64             :     }
      65             :   }
      66             :   else{
      67    29286869 :     for(unsigned int i = 0; i < ret; ++i){
      68    21602511 :       struct netio_buffer* buffer = (struct netio_buffer*)completion_entry[i].op_context;
      69    21602511 :       void* data = buffer->data; //completion_entry.buf;
      70    21602511 :       size_t size = completion_entry[i].len;
      71    21602511 :       if(completion_entry[i].flags & FI_REMOTE_CQ_DATA) {
      72    16576518 :         log_trc("Completion has remote CQ data");
      73    16576518 :         uint64_t imm = completion_entry[i].data;
      74    16576518 :         log_trc("recv completion immediate data: 0x%lx", imm);
      75    16576518 :         if(socket->lsocket->cb_msg_imm_received) {
      76           0 :           socket->lsocket->cb_msg_imm_received(socket, buffer, data, size, imm);
      77    16576518 :         } else if(socket->lsocket->cb_msg_received) {
      78    16576518 :           socket->lsocket->cb_msg_received(socket, buffer, data, size);
      79             :         }
      80             :       } else {
      81     5025993 :         log_trc("Completion has NO remote CQ data");
      82     5025993 :         if(socket->lsocket->cb_msg_received) {
      83     5025993 :           socket->lsocket->cb_msg_received(socket, buffer, data, size);
      84             :         }
      85             :       }
      86             :       /*if(completion_entry.flags & FI_MULTI_RECV) {
      87             :         DEBUG_LOG("FI_MULTI_RECV was set, buffer has to be reposted");
      88             :       }*/
      89             :       // FLX-1194, posting buffers to be done by USER !!
      90             :       // netio_post_recv(socket, buffer);
      91             :     }
      92             :   }
      93             : }
      94             : 
      95             : void
      96     1969749 : on_send_socket_cq_event(int fd, void* ptr)
      97             : {
      98     1969749 :     struct netio_send_socket* socket = (struct netio_send_socket*)ptr;
      99     1969749 :     if(socket->state != CONNECTED || socket->cqfd < 0) {
     100             :       log_dbg("on_send_socket_cq_event called for unconnected socket.");
     101      322100 :       return;
     102             :     }
     103     1969749 :     log_trc("send socket CQ max %lu", socket->cq_size);
     104     1969749 :     struct fi_cq_data_entry completion_entry[NETIO_MAX_CQ_EVENTS];
     105     1969749 :     int ret = fi_cq_read(socket->cq, &completion_entry, socket->cq_size);
     106     1969784 :     log_trc("send socket fd %d: %d completion events", fd, ret);
     107             : 
     108     1969784 :     if(ret < 0)
     109             :     {
     110      322100 :       if(ret == -FI_EAGAIN){ //If no completions are available to return from the CQ, -FI_EAGAIN will be returned.
     111      322087 :         struct fid* fp = &socket->cq->fid;
     112      322087 :         fi_trywait(socket->domain->fabric, &fp, 1);
     113      322087 :         return;
     114             :       }
     115          13 :       else if(ret == -FI_EAVAIL)
     116             :       {
     117          13 :         int r;
     118          13 :         struct fi_cq_err_entry err_entry;
     119          13 :         if((r = fi_cq_readerr(socket->cq, &err_entry, 0)) < 0)
     120             :         {
     121           0 :             log_error("Failed to retrieve details on Completion Queue error of send socket, error %d: %s", r, fi_strerror(-r));
     122             :         }
     123          13 :         log_error("Completion Queue read error %d of send socket: %s", err_entry.err, fi_strerror(err_entry.err));
     124          13 :         log_error("Provider-specific error %d: %s", err_entry.prov_errno, fi_cq_strerror(socket->cq, err_entry.prov_errno, err_entry.err_data, NULL, 0));
     125          13 :         if(err_entry.err == FI_EIO) {
     126             :           // I/O error, the CM event code can handle this
     127           0 :           log_error("Send socket CQ I/O error, connection possibly closed: ignored");
     128          13 :           return;
     129             :         }
     130          13 :         if(err_entry.err == FI_ECANCELED) {
     131             :           // Operation Cancelled
     132          13 :           log_error("Send socket CQ operation cancelled.");
     133          13 :           return;
     134             :         }
     135           0 :         log_error("Send socket Completion Queue unhandled specific read error %d: %s", err_entry.err, fi_strerror(err_entry.err));
     136             :       }
     137             :       else{
     138           0 :         log_error("Send socket Completion Queue unhandled read error %d: %s", ret, fi_strerror(-ret));
     139             :       }
     140             :     }
     141             :     else{
     142    14068924 :       for(unsigned int i=0; i < ret; ++i){
     143    12421295 :         uint64_t key = (uint64_t)completion_entry[i].op_context;
     144    12421295 :         log_trc("Send completed. Immediate data 0x%lx key 0x%lx", (uint64_t)completion_entry[i].data, key);
     145    12421295 :         if(socket->cb_send_completed) {
     146    12421294 :           socket->cb_send_completed(socket, key);
     147             :         }
     148             :       }
     149             :     }
     150             : }
     151             : 
     152             : 
     153             : // TCP equivalent of on_send_socket_cq_event()
     154             : // Called when epoll_wait is unblocked because the TCP socket has something to say
     155             : void
     156         136 : on_send_socket_tcp_cq_event(int fd, void *ptr)
     157             : {  
     158         136 :   log_dbg("signal from the POSIX FD %d received, ptr 0x%08x. Calling netio_tcp_send_on_signal", fd, ptr);
     159         136 :   netio_tcp_send_on_signal(ptr);
     160             : 
     161         136 :   log_trc("done");
     162         136 : }
     163             : 
     164             : 
     165             : // TCP equivalent of on_recv_socket_cq_event
     166             : void
     167    15760583 : on_recv_socket_tcp_cq_event(int fd, void* ptr)
     168             : {
     169    15760583 :   log_dbg("signal from the POSIX FD %d received. Calling netio_tcp_recv_on_signal", fd);
     170    15760583 :   netio_tcp_recv_on_signal(ptr);
     171    15760585 :   log_trc("done");
     172    15760585 : }

Generated by: LCOV version 1.0