Line data Source code
1 : #include <stdio.h>
2 : #include "log.h"
3 : #include "netio/netio.h"
4 : #include "netio/netio_tcp.h"
5 :
6 : #include "completion_event.h"
7 : #include "log.h"
8 :
9 : #if defined DEBUG || defined DEBUG_CQ
10 : #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
11 : #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
12 : #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
13 : #else
14 : #define log_dbg(...)
15 : #define log_trc(...)
16 : #endif
17 :
18 : // PRIVATE FUNCTIONS ///////////////////////////////////////////////////////////
19 :
20 : void
21 11916406 : on_recv_socket_cq_event(int fd, void* ptr)
22 : {
23 11916406 : struct netio_recv_socket* socket = (struct netio_recv_socket*)ptr;
24 :
25 11916406 : if(socket->cqfd < 0) {
26 0 : log_info("on_recv_socket_cq_event called for unconnected socket.");
27 4231967 : return;
28 : }
29 :
30 11916406 : log_trc("recv socket CQ max %lu", socket->cq_size);
31 11916406 : struct fi_cq_data_entry completion_entry[NETIO_MAX_CQ_EVENTS];
32 11916406 : int ret = fi_cq_read(socket->cq, &completion_entry, socket->cq_size);
33 11916378 : log_trc("recv socket fd %d: %d completion events", fd, ret);
34 :
35 11916378 : if(ret < 0)
36 : {
37 4232008 : if(ret == -FI_EAGAIN){
38 4232008 : struct fid* fp = &socket->cq->fid;
39 4232008 : fi_trywait(socket->lsocket->fabric, &fp, 1);
40 4231967 : return;
41 : }
42 0 : else if(ret == -FI_EAVAIL){
43 0 : int r;
44 0 : struct fi_cq_err_entry err_entry;
45 0 : if((r = fi_cq_readerr(socket->cq, &err_entry, 0)) < 0)
46 : {
47 0 : log_error("Failed to retrieve details on Completion Queue error of recv socket, error %d: %s", r, fi_strerror(-r));
48 : }
49 0 : log_error("code %d reading Completion Queue of recv socket: %s", err_entry.err, fi_strerror(err_entry.err));
50 0 : log_error("Provider-specific error %d: %s", err_entry.prov_errno, fi_cq_strerror(socket->cq, err_entry.prov_errno, err_entry.err_data, NULL, 0));
51 0 : if(err_entry.err == FI_EIO) {
52 : // I/O error, the CM event code can handle this
53 0 : log_error("Send socket CQ I/O error, connection possibly closed: ignored");
54 0 : return;
55 : }
56 0 : if(err_entry.err == FI_ECANCELED) {
57 : // Operation Cancelled
58 0 : log_error("Send socket CQ operation cancelled.");
59 0 : return;
60 : }
61 : }
62 : else{
63 0 : log_error("Recv socket unhandled Completion Queue error %d: %s", ret, fi_strerror(-ret));
64 : }
65 : }
66 : else{
67 29286869 : for(unsigned int i = 0; i < ret; ++i){
68 21602511 : struct netio_buffer* buffer = (struct netio_buffer*)completion_entry[i].op_context;
69 21602511 : void* data = buffer->data; //completion_entry.buf;
70 21602511 : size_t size = completion_entry[i].len;
71 21602511 : if(completion_entry[i].flags & FI_REMOTE_CQ_DATA) {
72 16576518 : log_trc("Completion has remote CQ data");
73 16576518 : uint64_t imm = completion_entry[i].data;
74 16576518 : log_trc("recv completion immediate data: 0x%lx", imm);
75 16576518 : if(socket->lsocket->cb_msg_imm_received) {
76 0 : socket->lsocket->cb_msg_imm_received(socket, buffer, data, size, imm);
77 16576518 : } else if(socket->lsocket->cb_msg_received) {
78 16576518 : socket->lsocket->cb_msg_received(socket, buffer, data, size);
79 : }
80 : } else {
81 5025993 : log_trc("Completion has NO remote CQ data");
82 5025993 : if(socket->lsocket->cb_msg_received) {
83 5025993 : socket->lsocket->cb_msg_received(socket, buffer, data, size);
84 : }
85 : }
86 : /*if(completion_entry.flags & FI_MULTI_RECV) {
87 : DEBUG_LOG("FI_MULTI_RECV was set, buffer has to be reposted");
88 : }*/
89 : // FLX-1194, posting buffers to be done by USER !!
90 : // netio_post_recv(socket, buffer);
91 : }
92 : }
93 : }
94 :
95 : void
96 1969749 : on_send_socket_cq_event(int fd, void* ptr)
97 : {
98 1969749 : struct netio_send_socket* socket = (struct netio_send_socket*)ptr;
99 1969749 : if(socket->state != CONNECTED || socket->cqfd < 0) {
100 : log_dbg("on_send_socket_cq_event called for unconnected socket.");
101 322100 : return;
102 : }
103 1969749 : log_trc("send socket CQ max %lu", socket->cq_size);
104 1969749 : struct fi_cq_data_entry completion_entry[NETIO_MAX_CQ_EVENTS];
105 1969749 : int ret = fi_cq_read(socket->cq, &completion_entry, socket->cq_size);
106 1969784 : log_trc("send socket fd %d: %d completion events", fd, ret);
107 :
108 1969784 : if(ret < 0)
109 : {
110 322100 : if(ret == -FI_EAGAIN){ //If no completions are available to return from the CQ, -FI_EAGAIN will be returned.
111 322087 : struct fid* fp = &socket->cq->fid;
112 322087 : fi_trywait(socket->domain->fabric, &fp, 1);
113 322087 : return;
114 : }
115 13 : else if(ret == -FI_EAVAIL)
116 : {
117 13 : int r;
118 13 : struct fi_cq_err_entry err_entry;
119 13 : if((r = fi_cq_readerr(socket->cq, &err_entry, 0)) < 0)
120 : {
121 0 : log_error("Failed to retrieve details on Completion Queue error of send socket, error %d: %s", r, fi_strerror(-r));
122 : }
123 13 : log_error("Completion Queue read error %d of send socket: %s", err_entry.err, fi_strerror(err_entry.err));
124 13 : log_error("Provider-specific error %d: %s", err_entry.prov_errno, fi_cq_strerror(socket->cq, err_entry.prov_errno, err_entry.err_data, NULL, 0));
125 13 : if(err_entry.err == FI_EIO) {
126 : // I/O error, the CM event code can handle this
127 0 : log_error("Send socket CQ I/O error, connection possibly closed: ignored");
128 13 : return;
129 : }
130 13 : if(err_entry.err == FI_ECANCELED) {
131 : // Operation Cancelled
132 13 : log_error("Send socket CQ operation cancelled.");
133 13 : return;
134 : }
135 0 : log_error("Send socket Completion Queue unhandled specific read error %d: %s", err_entry.err, fi_strerror(err_entry.err));
136 : }
137 : else{
138 0 : log_error("Send socket Completion Queue unhandled read error %d: %s", ret, fi_strerror(-ret));
139 : }
140 : }
141 : else{
142 14068924 : for(unsigned int i=0; i < ret; ++i){
143 12421295 : uint64_t key = (uint64_t)completion_entry[i].op_context;
144 12421295 : log_trc("Send completed. Immediate data 0x%lx key 0x%lx", (uint64_t)completion_entry[i].data, key);
145 12421295 : if(socket->cb_send_completed) {
146 12421294 : socket->cb_send_completed(socket, key);
147 : }
148 : }
149 : }
150 : }
151 :
152 :
153 : // TCP equivalent of on_send_socket_cq_event()
154 : // Called when epoll_wait is unblocked because the TCP socket has something to say
155 : void
156 136 : on_send_socket_tcp_cq_event(int fd, void *ptr)
157 : {
158 136 : log_dbg("signal from the POSIX FD %d received, ptr 0x%08x. Calling netio_tcp_send_on_signal", fd, ptr);
159 136 : netio_tcp_send_on_signal(ptr);
160 :
161 136 : log_trc("done");
162 136 : }
163 :
164 :
165 : // TCP equivalent of on_recv_socket_cq_event
166 : void
167 15760583 : on_recv_socket_tcp_cq_event(int fd, void* ptr)
168 : {
169 15760583 : log_dbg("signal from the POSIX FD %d received. Calling netio_tcp_recv_on_signal", fd);
170 15760583 : netio_tcp_recv_on_signal(ptr);
171 15760585 : log_trc("done");
172 15760585 : }
|