Line data Source code
1 : #include <stdio.h>
2 : #include "log.h"
3 : #include "netio/netio.h"
4 : #include "netio/netio_tcp.h"
5 :
6 : #include "completion_event.h"
7 : #include "log.h"
8 :
9 : #if defined DEBUG || defined DEBUG_CQ
10 : #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
11 : #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
12 : #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
13 : #else
14 : #define log_dbg(...)
15 : #define log_trc(...)
16 : #endif
17 :
18 : // PRIVATE FUNCTIONS ///////////////////////////////////////////////////////////
19 :
20 : void
21 9675206 : on_recv_socket_cq_event(int fd, void* ptr)
22 : {
23 : struct netio_recv_socket* socket = (struct netio_recv_socket*)ptr;
24 :
25 9675206 : if(socket->cqfd < 0) {
26 0 : log_info("on_recv_socket_cq_event called for unconnected socket.");
27 3687355 : return;
28 : }
29 :
30 : log_trc("recv socket CQ max %lu", socket->cq_size);
31 : struct fi_cq_data_entry completion_entry[NETIO_MAX_CQ_EVENTS];
32 9675206 : int ret = fi_cq_read(socket->cq, &completion_entry, socket->cq_size);
33 : log_trc("recv socket fd %d: %d completion events", fd, ret);
34 :
35 9675213 : if(ret < 0)
36 : {
37 3687357 : if(ret == -FI_EAGAIN){
38 3687357 : struct fid* fp = &socket->cq->fid;
39 3687357 : fi_trywait(socket->lsocket->fabric, &fp, 1);
40 : return;
41 : }
42 0 : else if(ret == -FI_EAVAIL){
43 : int r;
44 : struct fi_cq_err_entry err_entry;
45 0 : if((r = fi_cq_readerr(socket->cq, &err_entry, 0)) < 0)
46 : {
47 0 : log_error("Failed to retrieve details on Completion Queue error of recv socket, error %d: %s", r, fi_strerror(-r));
48 : }
49 0 : log_error("code %d reading Completion Queue of recv socket: %s", err_entry.err, fi_strerror(err_entry.err));
50 0 : log_error("Provider-specific error %d: %s", err_entry.prov_errno, fi_cq_strerror(socket->cq, err_entry.prov_errno, err_entry.err_data, NULL, 0));
51 0 : if(err_entry.err == FI_EIO) {
52 : // I/O error, the CM event code can handle this
53 0 : log_error("Send socket CQ I/O error, connection possibly closed: ignored");
54 0 : return;
55 : }
56 0 : if(err_entry.err == FI_ECANCELED) {
57 : // Operation Cancelled
58 0 : log_error("Send socket CQ operation cancelled.");
59 0 : return;
60 : }
61 : }
62 : else{
63 0 : log_error("Recv socket unhandled Completion Queue error %d: %s", ret, fi_strerror(-ret));
64 : }
65 : }
66 : else{
67 19516096 : for(unsigned int i = 0; i < ret; ++i){
68 13528233 : struct netio_buffer* buffer = (struct netio_buffer*)completion_entry[i].op_context;
69 13528233 : void* data = buffer->data; //completion_entry.buf;
70 13528233 : size_t size = completion_entry[i].len;
71 13528233 : if(completion_entry[i].flags & FI_REMOTE_CQ_DATA) {
72 : log_trc("Completion has remote CQ data");
73 9311664 : uint64_t imm = completion_entry[i].data;
74 : log_trc("recv completion immediate data: 0x%lx", imm);
75 9311664 : if(socket->lsocket->cb_msg_imm_received) {
76 0 : socket->lsocket->cb_msg_imm_received(socket, buffer, data, size, imm);
77 9311664 : } else if(socket->lsocket->cb_msg_received) {
78 9311664 : socket->lsocket->cb_msg_received(socket, buffer, data, size);
79 : }
80 : } else {
81 : log_trc("Completion has NO remote CQ data");
82 4216569 : if(socket->lsocket->cb_msg_received) {
83 4216569 : socket->lsocket->cb_msg_received(socket, buffer, data, size);
84 : }
85 : }
86 : /*if(completion_entry.flags & FI_MULTI_RECV) {
87 : DEBUG_LOG("FI_MULTI_RECV was set, buffer has to be reposted");
88 : }*/
89 : // FLX-1194, posting buffers to be done by USER !!
90 : // netio_post_recv(socket, buffer);
91 : }
92 : }
93 : }
94 :
95 : void
96 546773 : on_send_socket_cq_event(int fd, void* ptr)
97 : {
98 : struct netio_send_socket* socket = (struct netio_send_socket*)ptr;
99 546773 : if(socket->state != CONNECTED || socket->cqfd < 0) {
100 : log_dbg("on_send_socket_cq_event called for unconnected socket.");
101 265431 : return;
102 : }
103 : log_trc("send socket CQ max %lu", socket->cq_size);
104 : struct fi_cq_data_entry completion_entry[NETIO_MAX_CQ_EVENTS];
105 546773 : int ret = fi_cq_read(socket->cq, &completion_entry, socket->cq_size);
106 : log_trc("send socket fd %d: %d completion events", fd, ret);
107 :
108 546773 : if(ret < 0)
109 : {
110 265431 : if(ret == -FI_EAGAIN){ //If no completions are available to return from the CQ, -FI_EAGAIN will be returned.
111 265431 : struct fid* fp = &socket->cq->fid;
112 265431 : fi_trywait(socket->domain->fabric, &fp, 1);
113 : return;
114 : }
115 0 : else if(ret == -FI_EAVAIL)
116 : {
117 : int r;
118 : struct fi_cq_err_entry err_entry;
119 0 : if((r = fi_cq_readerr(socket->cq, &err_entry, 0)) < 0)
120 : {
121 0 : log_error("Failed to retrieve details on Completion Queue error of send socket, error %d: %s", r, fi_strerror(-r));
122 : }
123 0 : log_error("Completion Queue read error %d of send socket: %s", err_entry.err, fi_strerror(err_entry.err));
124 0 : log_error("Provider-specific error %d: %s", err_entry.prov_errno, fi_cq_strerror(socket->cq, err_entry.prov_errno, err_entry.err_data, NULL, 0));
125 0 : if(err_entry.err == FI_EIO) {
126 : // I/O error, the CM event code can handle this
127 0 : log_error("Send socket CQ I/O error, connection possibly closed: ignored");
128 0 : return;
129 : }
130 0 : if(err_entry.err == FI_ECANCELED) {
131 : // Operation Cancelled
132 0 : log_error("Send socket CQ operation cancelled.");
133 0 : return;
134 : }
135 0 : log_error("Send socket Completion Queue unhandled specific read error %d: %s", err_entry.err, fi_strerror(err_entry.err));
136 : }
137 : else{
138 0 : log_error("Send socket Completion Queue unhandled read error %d: %s", ret, fi_strerror(-ret));
139 : }
140 : }
141 : else{
142 783913 : for(unsigned int i=0; i < ret; ++i){
143 502571 : uint64_t key = (uint64_t)completion_entry[i].op_context;
144 : log_trc("Send completed. Immediate data 0x%lx key 0x%lx", (uint64_t)completion_entry[i].data, key);
145 502571 : if(socket->cb_send_completed) {
146 502570 : socket->cb_send_completed(socket, key);
147 : }
148 : }
149 : }
150 : }
151 :
152 :
153 : // TCP equivalent of on_send_socket_cq_event()
154 : // Called when epoll_wait is unblocked because the TCP socket has something to say
155 : void
156 40 : on_send_socket_tcp_cq_event(int fd, void *ptr)
157 : {
158 : log_dbg("signal from the POSIX FD %d received, ptr 0x%08x. Calling netio_tcp_send_on_signal", fd, ptr);
159 40 : netio_tcp_send_on_signal(ptr);
160 :
161 : log_trc("done");
162 40 : }
163 :
164 :
165 : // TCP equivalent of on_recv_socket_cq_event
166 : void
167 13899454 : on_recv_socket_tcp_cq_event(int fd, void* ptr)
168 : {
169 : log_dbg("signal from the POSIX FD %d received. Calling netio_tcp_recv_on_signal", fd);
170 13899454 : netio_tcp_recv_on_signal(ptr);
171 : log_trc("done");
172 13899456 : }
|