Line data Source code
1 : #include <stdio.h>
2 : #include "log.h"
3 : #include "netio/netio.h"
4 : #include "completion_event.h"
5 :
6 : #if defined DEBUG || defined DEBUG_CQ
7 : #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
8 : #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
9 : #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
10 : #else
11 : #define log_dbg(...)
12 : #define log_trc(...)
13 : #endif
14 :
15 : // PRIVATE FUNCTIONS ///////////////////////////////////////////////////////////
16 :
17 : void
18 41188814 : on_recv_socket_cq_event(int fd, void* ptr)
19 : {
20 41188814 : struct netio_recv_socket* socket = (struct netio_recv_socket*)ptr;
21 :
22 41188814 : if(socket->cqfd < 0) {
23 0 : log_info("on_recv_socket_cq_event called for unconnected socket.");
24 13615397 : return;
25 : }
26 :
27 41188814 : log_trc("recv socket CQ max %lu", socket->cq_size);
28 41188814 : struct fi_cq_data_entry completion_entry[NETIO_MAX_CQ_EVENTS];
29 41188814 : int ret = fi_cq_read(socket->cq, &completion_entry, socket->cq_size);
30 41188799 : log_trc("recv socket fd %d: %d completion events", fd, ret);
31 :
32 41188799 : if(ret < 0)
33 : {
34 13615397 : if(ret == -FI_EAGAIN){
35 13615397 : struct fid* fp = &socket->cq->fid;
36 13615397 : fi_trywait(socket->lsocket->fabric, &fp, 1);
37 13615397 : return;
38 : }
39 0 : else if(ret == -FI_EAVAIL){
40 0 : int r;
41 0 : struct fi_cq_err_entry err_entry;
42 0 : if((r = fi_cq_readerr(socket->cq, &err_entry, 0)) < 0)
43 : {
44 0 : log_error("Failed to retrieve details on Completion Queue error of recv socket, error %d: %s", r, fi_strerror(-r));
45 : }
46 0 : log_error("code %d reading Completion Queue of recv socket: %s", err_entry.err, fi_strerror(err_entry.err));
47 0 : log_error("Provider-specific error %d: %s", err_entry.prov_errno, fi_cq_strerror(socket->cq, err_entry.prov_errno, err_entry.err_data, NULL, 0));
48 0 : if(err_entry.err == FI_EIO) {
49 : // I/O error, the CM event code can handle this
50 : log_dbg("Send socket CQ I/O error, connection possibly closed: ignored");
51 0 : return;
52 : }
53 0 : if(err_entry.err == FI_ECANCELED) {
54 : // Operation Cancelled
55 : log_dbg("Send socket CQ operation cancelled.");
56 : return;
57 : }
58 : }
59 : else{
60 0 : log_error("Recv socket unhandled Completion Queue error %d: %s", ret, fi_strerror(-ret));
61 : }
62 : }
63 : else{
64 93525495 : for(unsigned int i = 0; i < ret; ++i){
65 65952151 : struct netio_buffer* buffer = (struct netio_buffer*)completion_entry[i].op_context;
66 65952151 : void* data = buffer->data; //completion_entry.buf;
67 65952151 : size_t size = completion_entry[i].len;
68 65952151 : if(completion_entry[i].flags & FI_REMOTE_CQ_DATA) {
69 48701085 : log_trc("Completion has remote CQ data");
70 48701085 : uint64_t imm = completion_entry[i].data;
71 48701085 : log_trc("recv completion immediate data: 0x%lx", imm);
72 48701085 : if(socket->lsocket->cb_msg_imm_received) {
73 20000 : socket->lsocket->cb_msg_imm_received(socket, buffer, data, size, imm);
74 48681085 : } else if(socket->lsocket->cb_msg_received) {
75 48681085 : socket->lsocket->cb_msg_received(socket, buffer, data, size);
76 : }
77 : } else {
78 17251066 : log_trc("Completion has NO remote CQ data");
79 17251066 : if(socket->lsocket->cb_msg_received) {
80 17251066 : socket->lsocket->cb_msg_received(socket, buffer, data, size);
81 : }
82 : }
83 : /*if(completion_entry.flags & FI_MULTI_RECV) {
84 : DEBUG_LOG("FI_MULTI_RECV was set, buffer has to be reposted");
85 : }*/
86 : // FLX-1194, posting buffers to be done by USER !!
87 : // netio_post_recv(socket, buffer);
88 : }
89 : }
90 : }
91 :
92 : void
93 5703311 : on_send_socket_cq_event(int fd, void* ptr)
94 : {
95 5703311 : struct netio_send_socket* socket = (struct netio_send_socket*)ptr;
96 5703311 : if(socket->state != CONNECTED || socket->cqfd < 0) {
97 : log_dbg("on_send_socket_cq_event called for unconnected socket.");
98 799696 : return;
99 : }
100 5703311 : log_trc("send socket CQ max %lu", socket->cq_size);
101 5703311 : struct fi_cq_data_entry completion_entry[NETIO_MAX_CQ_EVENTS];
102 5703311 : int ret = fi_cq_read(socket->cq, &completion_entry, socket->cq_size);
103 5703311 : log_trc("send socket fd %d: %d completion events", fd, ret);
104 :
105 5703311 : if(ret < 0)
106 : {
107 799696 : if(ret == -FI_EAGAIN){ //If no completions are available to return from the CQ, -FI_EAGAIN will be returned.
108 799521 : struct fid* fp = &socket->cq->fid;
109 799521 : fi_trywait(socket->domain->fabric, &fp, 1);
110 799521 : return;
111 : }
112 175 : else if(ret == -FI_EAVAIL)
113 : {
114 175 : int r;
115 175 : struct fi_cq_err_entry err_entry;
116 175 : if((r = fi_cq_readerr(socket->cq, &err_entry, 0)) < 0)
117 : {
118 0 : log_error("Failed to retrieve details on Completion Queue error of send socket, error %d: %s", r, fi_strerror(-r));
119 : }
120 175 : log_error("Completion Queue read error %d of send socket: %s", err_entry.err, fi_strerror(err_entry.err));
121 175 : log_error("Provider-specific error %d: %s", err_entry.prov_errno, fi_cq_strerror(socket->cq, err_entry.prov_errno, err_entry.err_data, NULL, 0));
122 175 : if(err_entry.err == FI_EIO) {
123 : // I/O error, the CM event code can handle this
124 0 : log_error("Send socket CQ I/O error, connection possibly closed: ignored");
125 175 : return;
126 : }
127 175 : if(err_entry.err == FI_ECANCELED) {
128 : // Operation Cancelled
129 175 : log_error("Send socket CQ operation cancelled.");
130 175 : return;
131 : }
132 0 : log_error("Send socket Completion Queue unhandled specific read error %d: %s", err_entry.err, fi_strerror(err_entry.err));
133 : }
134 : else{
135 0 : log_error("Send socket Completion Queue unhandled read error %d: %s", ret, fi_strerror(-ret));
136 : }
137 : }
138 : else{
139 42818124 : for(unsigned int i=0; i < ret; ++i){
140 37914509 : uint64_t key = (uint64_t)completion_entry[i].op_context;
141 37914509 : log_trc("Send completed. Immediate data 0x%lx key 0x%lx", (uint64_t)completion_entry[i].data, key);
142 37914509 : if(socket->cb_send_completed) {
143 37894507 : socket->cb_send_completed(socket, key);
144 : }
145 : }
146 : }
147 : }
|