LCOV - code coverage report
Current view: top level - netio-next/src - completion_event.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 57 73 78.1 %
Date: 2025-06-10 03:23:28 Functions: 2 2 100.0 %

          Line data    Source code
       1             : #include <stdio.h>
       2             : #include "log.h"
       3             : #include "netio/netio.h"
       4             : #include "completion_event.h"
       5             : 
       6             : #if defined DEBUG || defined DEBUG_CQ
       7             : #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
       8             : #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
       9             : #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
      10             : #else
      11             : #define log_dbg(...)
      12             : #define log_trc(...)
      13             : #endif
      14             : 
      15             : // PRIVATE FUNCTIONS ///////////////////////////////////////////////////////////
      16             : 
      17             : void
      18    41188814 : on_recv_socket_cq_event(int fd, void* ptr)
      19             : {
      20    41188814 :   struct netio_recv_socket* socket = (struct netio_recv_socket*)ptr;
      21             : 
      22    41188814 :   if(socket->cqfd < 0) {
      23           0 :     log_info("on_recv_socket_cq_event called for unconnected socket.");
      24    13615397 :     return;
      25             :   }
      26             : 
      27    41188814 :   log_trc("recv socket CQ max %lu", socket->cq_size);
      28    41188814 :   struct fi_cq_data_entry completion_entry[NETIO_MAX_CQ_EVENTS];
      29    41188814 :   int ret = fi_cq_read(socket->cq, &completion_entry, socket->cq_size);
      30    41188799 :   log_trc("recv socket fd %d: %d completion events", fd, ret);
      31             : 
      32    41188799 :   if(ret < 0)
      33             :   {
      34    13615397 :     if(ret == -FI_EAGAIN){
      35    13615397 :       struct fid* fp = &socket->cq->fid;
      36    13615397 :       fi_trywait(socket->lsocket->fabric, &fp, 1);
      37    13615397 :       return;
      38             :     }
      39           0 :     else if(ret == -FI_EAVAIL){
      40           0 :       int r;
      41           0 :       struct fi_cq_err_entry err_entry;
      42           0 :       if((r = fi_cq_readerr(socket->cq, &err_entry, 0)) < 0)
      43             :       {
      44           0 :         log_error("Failed to retrieve details on Completion Queue error of recv socket, error %d: %s", r, fi_strerror(-r));
      45             :       }
      46           0 :       log_error("code %d reading Completion Queue of recv socket: %s", err_entry.err, fi_strerror(err_entry.err));
      47           0 :       log_error("Provider-specific error %d: %s", err_entry.prov_errno, fi_cq_strerror(socket->cq, err_entry.prov_errno, err_entry.err_data, NULL, 0));
      48           0 :       if(err_entry.err == FI_EIO) {
      49             :         // I/O error, the CM event code can handle this
      50             :         log_dbg("Send socket CQ I/O error, connection possibly closed: ignored");
      51           0 :         return;
      52             :       }
      53           0 :       if(err_entry.err == FI_ECANCELED) {
      54             :         // Operation Cancelled
      55             :         log_dbg("Send socket CQ operation cancelled.");
      56             :         return;
      57             :       }
      58             :     }
      59             :     else{
      60           0 :       log_error("Recv socket unhandled Completion Queue error %d: %s", ret, fi_strerror(-ret));
      61             :     }
      62             :   }
      63             :   else{
      64    93525495 :     for(unsigned int i = 0; i < ret; ++i){
      65    65952151 :       struct netio_buffer* buffer = (struct netio_buffer*)completion_entry[i].op_context;
      66    65952151 :       void* data = buffer->data; //completion_entry.buf;
      67    65952151 :       size_t size = completion_entry[i].len;
      68    65952151 :       if(completion_entry[i].flags & FI_REMOTE_CQ_DATA) {
      69    48701085 :         log_trc("Completion has remote CQ data");
      70    48701085 :         uint64_t imm = completion_entry[i].data;
      71    48701085 :         log_trc("recv completion immediate data: 0x%lx", imm);
      72    48701085 :         if(socket->lsocket->cb_msg_imm_received) {
      73       20000 :           socket->lsocket->cb_msg_imm_received(socket, buffer, data, size, imm);
      74    48681085 :         } else if(socket->lsocket->cb_msg_received) {
      75    48681085 :           socket->lsocket->cb_msg_received(socket, buffer, data, size);
      76             :         }
      77             :       } else {
      78    17251066 :         log_trc("Completion has NO remote CQ data");
      79    17251066 :         if(socket->lsocket->cb_msg_received) {
      80    17251066 :           socket->lsocket->cb_msg_received(socket, buffer, data, size);
      81             :         }
      82             :       }
      83             :       /*if(completion_entry.flags & FI_MULTI_RECV) {
      84             :         DEBUG_LOG("FI_MULTI_RECV was set, buffer has to be reposted");
      85             :       }*/
      86             :       // FLX-1194, posting buffers to be done by USER !!
      87             :       // netio_post_recv(socket, buffer);
      88             :     }
      89             :   }
      90             : }
      91             : 
      92             : void
      93     5703311 : on_send_socket_cq_event(int fd, void* ptr)
      94             : {
      95     5703311 :     struct netio_send_socket* socket = (struct netio_send_socket*)ptr;
      96     5703311 :     if(socket->state != CONNECTED || socket->cqfd < 0) {
      97             :       log_dbg("on_send_socket_cq_event called for unconnected socket.");
      98      799696 :       return;
      99             :     }
     100     5703311 :     log_trc("send socket CQ max %lu", socket->cq_size);
     101     5703311 :     struct fi_cq_data_entry completion_entry[NETIO_MAX_CQ_EVENTS];
     102     5703311 :     int ret = fi_cq_read(socket->cq, &completion_entry, socket->cq_size);
     103     5703311 :     log_trc("send socket fd %d: %d completion events", fd, ret);
     104             : 
     105     5703311 :     if(ret < 0)
     106             :     {
     107      799696 :       if(ret == -FI_EAGAIN){ //If no completions are available to return from the CQ, -FI_EAGAIN will be returned.
     108      799521 :         struct fid* fp = &socket->cq->fid;
     109      799521 :         fi_trywait(socket->domain->fabric, &fp, 1);
     110      799521 :         return;
     111             :       }
     112         175 :       else if(ret == -FI_EAVAIL)
     113             :       {
     114         175 :         int r;
     115         175 :         struct fi_cq_err_entry err_entry;
     116         175 :         if((r = fi_cq_readerr(socket->cq, &err_entry, 0)) < 0)
     117             :         {
     118           0 :             log_error("Failed to retrieve details on Completion Queue error of send socket, error %d: %s", r, fi_strerror(-r));
     119             :         }
     120         175 :         log_error("Completion Queue read error %d of send socket: %s", err_entry.err, fi_strerror(err_entry.err));
     121         175 :         log_error("Provider-specific error %d: %s", err_entry.prov_errno, fi_cq_strerror(socket->cq, err_entry.prov_errno, err_entry.err_data, NULL, 0));
     122         175 :         if(err_entry.err == FI_EIO) {
     123             :           // I/O error, the CM event code can handle this
     124           0 :           log_error("Send socket CQ I/O error, connection possibly closed: ignored");
     125         175 :           return;
     126             :         }
     127         175 :         if(err_entry.err == FI_ECANCELED) {
     128             :           // Operation Cancelled
     129         175 :           log_error("Send socket CQ operation cancelled.");
     130         175 :           return;
     131             :         }
     132           0 :         log_error("Send socket Completion Queue unhandled specific read error %d: %s", err_entry.err, fi_strerror(err_entry.err));
     133             :       }
     134             :       else{
     135           0 :         log_error("Send socket Completion Queue unhandled read error %d: %s", ret, fi_strerror(-ret));
     136             :       }
     137             :     }
     138             :     else{
     139    42818124 :       for(unsigned int i=0; i < ret; ++i){
     140    37914509 :         uint64_t key = (uint64_t)completion_entry[i].op_context;
     141    37914509 :         log_trc("Send completed. Immediate data 0x%lx key 0x%lx", (uint64_t)completion_entry[i].data, key);
     142    37914509 :         if(socket->cb_send_completed) {
     143    37894507 :           socket->cb_send_completed(socket, key);
     144             :         }
     145             :       }
     146             :     }
     147             : }

Generated by: LCOV version 1.0