Line data Source code
1 : #include <stdio.h>
2 : #include <unistd.h>
3 : #include "log.h"
4 : #include "netio/netio.h"
5 : #include "netio/netio_tcp.h"
6 : #include <sys/types.h>
7 : #include <sys/socket.h>
8 : #include <netdb.h>
9 :
10 : #include "connection_event.h"
11 : #include "completion_event.h"
12 : #include "log.h"
13 :
14 : #if defined DEBUG || defined DEBUG_CM
15 : #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
16 : #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
17 : #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
18 : #else
19 : #define log_dbg(...)
20 : #define log_trc(...)
21 : #endif
22 :
23 : #define FATAL(msg, c) \
24 : do { \
25 : log_fatal("%s %d: %s", msg, c, fi_strerror(-c)); \
26 : exit(2); \
27 : } while(0);
28 :
29 :
30 : // STATIC FUNCTIONS ////////////////////////////////////////////////////////////
31 :
32 : static int
33 2104 : read_cm_event(struct fid_eq* eq, struct fi_info** info, struct fi_eq_err_entry* err_entry)
34 : {
35 2104 : uint32_t event;
36 2104 : struct fi_eq_cm_entry entry;
37 :
38 2104 : ssize_t rd = fi_eq_sread(eq, &event, &entry, sizeof entry, 0, 0);
39 2104 : if(rd < 0)
40 : {
41 633 : if(rd == -FI_EAGAIN)
42 : {
43 538 : return rd;
44 : }
45 95 : if(rd == -FI_EAVAIL)
46 : {
47 95 : int r;
48 95 : if((r = fi_eq_readerr(eq, err_entry, 0)) < 0)
49 : {
50 0 : FATAL("Failed to retrieve details on Event Queue error", r);
51 : }
52 95 : log_error("Event Queue error: %s (code: %d), provider specific: %s (code: %d)",
53 : fi_strerror(err_entry->err), err_entry->err,
54 : fi_eq_strerror(eq, err_entry->prov_errno, err_entry->err_data, NULL, 0),
55 : err_entry->prov_errno);
56 95 : return rd;
57 : }
58 : }
59 1471 : if (rd != sizeof entry)
60 : {
61 0 : FATAL("Failed to read from Event Queue: %d", (int)rd);
62 : }
63 :
64 1471 : if(info != NULL)
65 220 : *info = entry.info;
66 :
67 1471 : return event;
68 : }
69 :
70 :
71 : static void
72 220 : handle_connreq(struct netio_recv_socket* rsocket, struct netio_listen_socket* lsocket, struct fi_info *info, void (*cb)(int,void*), void* cbdata)
73 : {
74 220 : int ret;
75 220 : struct fi_eq_attr eq_attr = {10000, 0, FI_WAIT_FD, 0, 0};
76 :
77 220 : if((ret = fi_domain(lsocket->fabric, info, &rsocket->domain, NULL)))
78 : {
79 0 : fi_reject(lsocket->pep, info->handle, NULL, 0);
80 0 : FATAL("Listen socket, cannot open fabric, error ", ret);
81 : }
82 :
83 220 : if((ret = fi_endpoint(rsocket->domain, info, &rsocket->ep, NULL)))
84 : {
85 0 : fi_reject(lsocket->pep, info->handle, NULL, 0);
86 0 : FATAL("Listen socket, cannot open endpoint, error ", ret);
87 : }
88 :
89 : /* Create a new event queue for the new active socket */
90 220 : if((ret = fi_eq_open(lsocket->fabric, &eq_attr, &rsocket->eq, NULL)))
91 : {
92 0 : fi_reject(lsocket->pep, info->handle, NULL, 0);
93 0 : FATAL("Listen socket, cannot open Event Queue, error ", ret);
94 : }
95 :
96 220 : if((ret = fi_ep_bind((rsocket->ep), &rsocket->eq->fid, 0)))
97 : {
98 0 : fi_reject(lsocket->pep, info->handle, NULL, 0);
99 0 : FATAL("Listen socket, cannot bind Event Queue to endpoint, error ", ret);
100 : }
101 :
102 220 : if((ret = fi_control(&rsocket->eq->fid, FI_GETWAIT, &rsocket->eqfd)))
103 : {
104 0 : fi_reject(lsocket->pep, info->handle, NULL, 0);
105 0 : FATAL("Listen socket failed to obtain Event Queue wait object", ret);
106 : }
107 220 : rsocket->eq_ev_ctx.fd = rsocket->eqfd;
108 220 : rsocket->eq_ev_ctx.data = cbdata;
109 220 : rsocket->eq_ev_ctx.cb = cb;
110 :
111 220 : log_dbg("Adding RECV EQ polled fid %d %p", rsocket->eqfd, &rsocket->eq->fid);
112 220 : add_polled_fid(&rsocket->ctx->evloop.pfids, lsocket->fabric, &rsocket->eq->fid, rsocket->eqfd, &rsocket, cb);
113 220 : add_open_fd(&rsocket->ctx->evloop.openfds, rsocket->eqfd, NETIO_EQ, URECV, &rsocket);
114 220 : netio_register_read_fd(&rsocket->ctx->evloop, &rsocket->eq_ev_ctx);
115 220 : log_dbg("recv_socket: EQ fd %d waiting for connection", rsocket->eqfd);
116 :
117 220 : struct fi_cq_attr cq_attr;
118 220 : cq_attr.size = NETIO_MAX_CQ_ENTRIES; /* # entries for CQ */
119 220 : cq_attr.flags = 0; /* operation flags */
120 220 : cq_attr.format = FI_CQ_FORMAT_DATA; /* completion format */
121 220 : cq_attr.wait_obj= FI_WAIT_FD; /* requested wait object */
122 220 : cq_attr.signaling_vector = 0; /* interrupt affinity */
123 220 : cq_attr.wait_cond = FI_CQ_COND_NONE; /* wait condition format */
124 220 : cq_attr.wait_set = NULL; /* optional wait set */
125 :
126 : //FI_RECV CQ
127 220 : if((ret = fi_cq_open(rsocket->domain, &cq_attr, &rsocket->cq, NULL)))
128 : {
129 0 : fi_reject(lsocket->pep, info->handle, NULL, 0);
130 0 : FATAL("Listen socket, cannot open Completion Queue, error ", ret);
131 : }
132 :
133 220 : if((ret = fi_ep_bind((rsocket->ep), &rsocket->cq->fid, FI_RECV)))
134 : {
135 0 : fi_reject(lsocket->pep, info->handle, NULL, 0);
136 0 : FATAL("Listen socket, cannot bind Completion Queue to endpoint, error ", ret);;
137 : }
138 :
139 : //FI_TRANSMIT CQ - also necessary
140 220 : cq_attr.format = FI_CQ_FORMAT_UNSPEC;
141 220 : cq_attr.wait_obj= FI_WAIT_NONE;
142 220 : if((ret = fi_cq_open(rsocket->domain, &cq_attr, &rsocket->tcq, NULL)))
143 : {
144 0 : fi_reject(lsocket->pep, info->handle, NULL, 0);
145 0 : FATAL("Listen socket, cannot open Completion Queue, error ", ret);
146 : }
147 :
148 220 : if((ret = fi_ep_bind((rsocket->ep), &rsocket->tcq->fid, FI_TRANSMIT)))
149 : {
150 0 : fi_reject(lsocket->pep, info->handle, NULL, 0);
151 0 : FATAL("Listen socket, cannot bind Completion Queue to endpoint, error ", ret);;
152 : }
153 :
154 220 : if((ret = fi_enable(rsocket->ep)))
155 : {
156 0 : fi_reject(lsocket->pep, info->handle, NULL, 0);
157 0 : FATAL("Listen socket, cannot enable recv socket endpoint, error ", ret);
158 : }
159 :
160 220 : if((ret = fi_accept(rsocket->ep, NULL, 0)))
161 : {
162 0 : fi_reject(lsocket->pep, info->handle, NULL, 0);
163 0 : FATAL("Listen socket, connection rejected, error ", ret);;
164 : }
165 220 : log_dbg("connection accepted. Lsocket EQ: %d with evloop %d, rsocket EQ %d with evloop %d", lsocket->eqfd, lsocket->ctx->evloop.epollfd, rsocket->eqfd, rsocket->ctx->evloop.epollfd);
166 220 : }
167 :
168 :
169 : void
170 172 : handle_recv_socket_shutdown(struct netio_recv_socket* socket)
171 : {
172 172 : if(socket->eqfd < 0){
173 0 : log_info("handle_recv_socket_shutdown on closed socket (eqfd %d)", socket->eqfd);
174 0 : return;
175 : }
176 172 : int ret = 0;
177 172 : struct epoll_event ep_event; /* needed only for kernel <2.6.9 */
178 172 : log_dbg("Handle_recv_socket_shutdown for socket %p, evloop: %d", socket, socket->ctx->evloop.epollfd);
179 :
180 172 : if(socket->ep && (ret = fi_close(&socket->ep->fid)))
181 : {
182 0 : log_error("Failed to close recv socket Endpoint %d: %s", ret, fi_strerror(-ret));
183 : }
184 172 : log_dbg("fi_close done for endpoint.");
185 :
186 : //EQ
187 172 : remove_polled_fid(&socket->ctx->evloop.pfids, socket->eq_ev_ctx.fd);
188 172 : ret = epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->eq_ev_ctx.fd, &ep_event);
189 172 : if ( ret ){ log_warn("netio_recv_socket: cannot remove EQ fd %d from evloop %d: %s", socket->eq_ev_ctx.fd, socket->ctx->evloop.epollfd, strerror(errno));}
190 172 : log_dbg("netio_recv_socket: removing EQ fd %d from evloop %d", socket->eq_ev_ctx.fd, socket->ctx->evloop.epollfd);
191 172 : if(socket->eq && (ret = fi_close(&socket->eq->fid)))
192 : {
193 0 : log_error("Failed to fi_close recv socket Event Queue %d: %s", ret, fi_strerror(-ret));
194 0 : ret = close(socket->eq_ev_ctx.fd);
195 0 : if ( ret ) {log_warn("Cannot close recv socket EQ fd %d: %s", socket->eq_ev_ctx.fd, strerror(errno));}
196 : }
197 172 : remove_open_fd(&socket->ctx->evloop, socket->eq_ev_ctx.fd);
198 172 : socket->eqfd = -1;
199 :
200 : //FI_RECV CQ
201 172 : remove_polled_fid(&socket->ctx->evloop.pfids, socket->cq_ev_ctx.fd);
202 172 : ret = epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->cq_ev_ctx.fd, &ep_event);
203 172 : if ( ret ){ log_warn("netio_recv_socket: cannot remove CQ fd %d from evloop %d: %s", socket->cq_ev_ctx.fd, socket->ctx->evloop.epollfd, strerror(errno));}
204 172 : log_dbg("netio_recv_socket: removing CQ fd %d from evloop %d", socket->cq_ev_ctx.fd, socket->ctx->evloop.epollfd);
205 172 : if(socket->cq && (ret = fi_close(&socket->cq->fid)))
206 : {
207 0 : log_error("Failed to close recv socket Completion Queue %d: %s", ret, fi_strerror(-ret));
208 0 : ret = close(socket->cq_ev_ctx.fd);
209 0 : if ( ret ) {log_warn("Cannot close recv socket CQ fd %d: %s", socket->cq_ev_ctx.fd, strerror(errno));}
210 : }
211 172 : remove_open_fd(&socket->ctx->evloop, socket->cq_ev_ctx.fd);
212 172 : socket->cqfd = -1;
213 : //FI_TRANSMIT CQ
214 172 : if((ret = fi_close(&socket->tcq->fid)))
215 : {
216 0 : log_error("Failed to close FI_TRANSMIT recv socket Completion Queue %d: %s", ret, fi_strerror(-ret));
217 : }
218 :
219 : //MR
220 172 : uint16_t mem_regions = socket->reg_mr;
221 29142 : for(uint16_t i=0; i<mem_regions; ++i)
222 : {
223 28970 : if ((ret = fi_close(socket->mr[i]))){
224 0 : log_warn("Failed to close recv Memory Region %d, error %d.", i, ret);
225 : }
226 : else{
227 28970 : socket->reg_mr-=1;
228 : }
229 : }
230 172 : if(socket->reg_mr==0){free(socket->mr);socket->mr=NULL;}
231 172 : if((ret = fi_close(&socket->domain->fid)))
232 : {
233 0 : log_error("Failed to close recv socket Domain %d: %s", ret, fi_strerror(-ret));
234 : }
235 172 : if (socket->sub_msg_buffers != NULL){
236 1155 : for(int i = 0; i < 32; i++){
237 1120 : free(socket->sub_msg_buffers[i]->data);
238 1120 : free(socket->sub_msg_buffers[i]);
239 : }
240 35 : free(socket->sub_msg_buffers);
241 35 : socket->sub_msg_buffers = NULL;
242 : }
243 : }
244 :
245 : void
246 391 : handle_send_socket_shutdown(struct netio_send_socket* socket)
247 : {
248 : // if(socket->state != CONNECTED){
249 : // log_dbg("Nothing to do, send socket has already been freed....");
250 : // return;
251 : // }
252 391 : int ret = 0;
253 391 : struct epoll_event ep_event; /* needed only for kernel <2.6.9 */
254 391 : log_dbg("Handle_send_socket_shutdown. Socket with EQ: %d", socket->eqfd);
255 :
256 391 : if(socket->ep && (ret = fi_close(&socket->ep->fid)))
257 : {
258 0 : log_error("Failed to close send socket Endpoint %d: %s", ret, fi_strerror(-ret));
259 : }
260 391 : socket->ep = NULL;
261 :
262 : //EQ
263 391 : log_dbg("netio_send_socket: removing EQ fd %d from evloop %d", socket->eqfd, socket->ctx->evloop.epollfd);
264 391 : remove_polled_fid(&socket->ctx->evloop.pfids, socket->eqfd);
265 391 : ret = epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->eqfd, &ep_event);
266 391 : if ( ret ){ log_warn("netio_send_socket: cannot remove EQ fd %d from evloop %d: %s", socket->eqfd, socket->ctx->evloop.epollfd, strerror(errno));}
267 391 : if(socket->eq && (ret = fi_close(&socket->eq->fid)))
268 : {
269 0 : log_error("Failed to close send socket Event Queue %d: %s", ret, fi_strerror(-ret));
270 0 : ret = close(socket->eq_ev_ctx.fd);
271 0 : if ( ret ) { log_warn("Cannot close send socket eq fd %d, %s", socket->eq_ev_ctx.fd, strerror(errno));}
272 : }
273 391 : remove_open_fd(&socket->ctx->evloop, socket->eqfd);
274 391 : socket->eqfd = -1;
275 :
276 : //FI_TRANSMIT CQ
277 391 : log_dbg("netio_send_socket: removing CQ fd %d from evloop %d", socket->cq_ev_ctx.fd, socket->ctx->evloop.epollfd);
278 391 : remove_polled_fid(&socket->ctx->evloop.pfids, socket->cqfd);
279 391 : ret = epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->cqfd, &ep_event);
280 391 : if ( ret ){ log_warn("netio_send_socket: cannot remove CQ fd %d from evloop %d: %s", socket->cqfd, socket->ctx->evloop.epollfd, strerror(errno));}
281 391 : if(socket->cq && (ret = fi_close(&socket->cq->fid)))
282 : {
283 0 : log_error("Failed to close send socket Completion Queue %d: %s", ret, fi_strerror(-ret));
284 0 : ret = close(socket->cq_ev_ctx.fd);
285 0 : if ( ret ){ log_warn("Could not close send socket cq fd %d: %s", socket->cq_ev_ctx.fd, strerror(errno));}
286 : }
287 391 : remove_open_fd(&socket->ctx->evloop, socket->cqfd);
288 391 : socket->cqfd = -1;
289 : //FI_RECV CQ
290 391 : if((ret = fi_close(&socket->rcq->fid)))
291 : {
292 0 : log_error("Failed to close FI_RECV senf socket Completion Queue %d: %s", ret, fi_strerror(-ret));
293 : }
294 :
295 391 : socket->domain->nb_sockets -= 1;
296 391 : if (socket->domain->nb_sockets == 0) {
297 37633 : for(unsigned int i=0; i < socket->domain->reg_mr; ++i) {
298 37244 : if ((ret = fi_close(socket->domain->mr[i]))) {
299 37244 : log_warn("Failed to close send Memory Region %d, error %d.", i, ret);;
300 : }
301 : }
302 389 : if ((ret = fi_close(&socket->domain->domain->fid))) {
303 0 : log_error("Failed to close send socket Domain %d: %s", ret, fi_strerror(-ret));
304 : }
305 :
306 389 : if(socket->ctx->evloop.pfids.count == 0){
307 318 : if ((ret = fi_close(&socket->domain->fabric->fid))) {
308 0 : log_error("Failed to close send socket Fabric %d: %s", ret, fi_strerror(-ret));
309 : }
310 : }
311 389 : free(socket->domain->mr);
312 389 : socket->domain->mr = NULL;
313 389 : free(socket->domain);
314 389 : socket->domain = NULL;
315 : }
316 391 : if(socket->fi != NULL){
317 391 : fi_freeinfo(socket->fi);
318 391 : socket->fi = NULL;
319 : }
320 391 : socket->state = DISCONNECTED;
321 391 : }
322 :
323 :
324 : void
325 8 : handle_tcp_send_socket_shutdown(struct netio_send_socket* socket)
326 : {
327 8 : netio_signal_close(&socket->ctx->evloop, &socket->tcp_signal);
328 8 : epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->eq_ev_ctx.fd, NULL);
329 8 : int rc = close(socket->eq_ev_ctx.fd);
330 8 : if ( !rc ){ remove_open_fd(&socket->ctx->evloop, socket->eq_ev_ctx.fd); }
331 8 : }
332 :
333 :
334 : static void
335 95 : handle_send_socket_shutdown_on_connection_refused(struct netio_send_socket* socket)
336 : {
337 95 : int ret = 0;
338 : //struct fi_eq_err_entry err_entry;
339 95 : struct epoll_event ep_event; /* needed only for kernel <2.6.9 */
340 95 : if(socket->ep && (ret = fi_close(&socket->ep->fid)))
341 : {
342 0 : log_error("Failed to close send socket endpoint %d: %s", ret, fi_strerror(-ret));
343 : }
344 :
345 : //EQ
346 95 : remove_polled_fid(&socket->ctx->evloop.pfids, socket->eqfd);
347 95 : ret = epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->eqfd, &ep_event);
348 95 : if (ret) {log_warn("Conn refused: cannot deregister send_socket EQ fd %d from evloop %d, %s", socket->eqfd, socket->ctx->evloop.epollfd, strerror(errno));}
349 95 : if(socket->eq && (ret = fi_close(&socket->eq->fid)))
350 : {
351 0 : log_error("Failed to close send socket Event Queue %d: %s", ret, fi_strerror(-ret));
352 0 : ret = close(socket->eq_ev_ctx.fd);
353 0 : if ( ret ) { log_warn("Cannot close send socket eq fd %d, %s", socket->eq_ev_ctx.fd, strerror(errno));}
354 : }
355 95 : remove_open_fd(&socket->ctx->evloop, socket->eqfd);
356 95 : socket->eqfd = -1;
357 :
358 : //FI_TRANSMIT CQ - wait object not retrieved yet
359 95 : if(socket->cq && (ret = fi_close(&socket->cq->fid)))
360 : {
361 0 : log_error("Failed to close FI_TRANSMIT send socket Completion Queue %d: %s", ret, fi_strerror(-ret));
362 : }
363 : //FI_RECV CQ
364 95 : if(fi_close(&socket->rcq->fid))
365 : {
366 0 : log_error("Failed to close FI_RECV send socket Completion Queue %d: %s", ret, fi_strerror(-ret));
367 : }
368 :
369 95 : socket->domain->nb_sockets -= 1;
370 95 : if (socket->domain->nb_sockets == 0) {
371 118 : for(unsigned int i=0; i < socket->domain->reg_mr; ++i) {
372 23 : if ((ret = fi_close(socket->domain->mr[i]))) {
373 0 : log_warn("Failed to close send Memory Region %d, error %d.", i, ret);
374 : }
375 : }
376 95 : if ((ret = fi_close(&socket->domain->domain->fid))) {
377 0 : log_error("Failed to close send socket Domain %d: %s", ret, fi_strerror(-ret));
378 : }
379 95 : if(socket->ctx->evloop.pfids.count == 0){
380 73 : if ((ret = fi_close(&socket->domain->fabric->fid))) {
381 0 : log_error("Failed to close send socket Fabric %d: %s", ret, fi_strerror(-ret));
382 : }
383 : }
384 95 : free(socket->domain->mr);
385 95 : socket->domain->mr = NULL;
386 95 : free(socket->domain);
387 95 : socket->domain = NULL;
388 :
389 : }
390 95 : if(socket->fi != NULL){
391 95 : fi_freeinfo(socket->fi);
392 95 : socket->fi = NULL;
393 : }
394 95 : socket->state = UNCONNECTED;
395 95 : }
396 :
397 : // WAIT OBJECT CALLBACKS ///////////////////////////////////////////////////////////
398 :
399 : void
400 1492 : on_send_socket_libfabric_cm_event(int fd, void* ptr)
401 : {
402 1492 : struct netio_send_socket* socket = (struct netio_send_socket*)ptr;
403 1492 : struct fi_eq_err_entry err_entry;
404 1492 : log_dbg("send socket %p fd %d: connection event, evloop: %d", socket, fd, socket->ctx->evloop.epollfd);
405 :
406 1492 : if(socket->state == DISCONNECTED || socket->eqfd != fd){
407 : log_dbg("discarding send_socket cm events: state disconnected or fd mismatch eqfd %d fd %d", socket->eqfd, fd);
408 954 : return;
409 : } //Check if event queue was already closed. TODO: Need to fix receiving messages after closing
410 1492 : int event = read_cm_event(socket->eq, NULL, &err_entry);
411 1492 : int ret = 0;
412 :
413 1492 : log_dbg("event %d", event);
414 :
415 1492 : switch(event)
416 : {
417 393 : case FI_SHUTDOWN:
418 393 : if (socket->eqfd < 0 ){
419 0 : log_info("Ignoring FI_SHUTDOWN on send_socket, invalid eqfd (socket already closed)");
420 954 : return;
421 : }
422 393 : log_info("fi_verbs_process_send_socket_cm_event: FI_SHUTDOWN");
423 393 : if(socket->cb_internal_connection_closed){
424 155 : socket->cb_internal_connection_closed(socket);
425 : }
426 393 : if(socket->cb_connection_closed) {
427 393 : socket->cb_connection_closed(socket);
428 : }
429 : return;
430 :
431 466 : case FI_CONNECTED:
432 466 : socket->cqfd = -1;
433 466 : if((ret = fi_control(&socket->cq->fid, FI_GETWAIT, &socket->cqfd)))
434 : {
435 0 : FATAL("Failed to retrieve wait object for send socket Completion Queue", ret);
436 : }
437 :
438 466 : socket->cq_ev_ctx.fd = socket->cqfd;
439 466 : socket->cq_ev_ctx.data = socket;
440 466 : socket->cq_ev_ctx.cb = on_send_socket_cq_event;
441 :
442 466 : log_dbg("send_socket: EQ fd %d connected, CQ fd %d", socket->eqfd, socket->cqfd);
443 466 : log_dbg("Adding SEND CQ polled fid %d %p", socket->cqfd, &socket->cq->fid);
444 466 : add_polled_fid(&socket->ctx->evloop.pfids, socket->domain->fabric, &socket->cq->fid, socket->cqfd, socket, on_send_socket_cq_event);
445 466 : add_open_fd(&socket->ctx->evloop.openfds, socket->cqfd, NETIO_CQ, USEND, socket);
446 466 : netio_register_read_fd(&socket->ctx->evloop, &socket->cq_ev_ctx);
447 :
448 466 : socket->state = CONNECTED;
449 :
450 466 : if(socket->cb_connection_established) {
451 466 : socket->cb_connection_established(socket);
452 : }
453 :
454 : return;
455 :
456 : case FI_MR_COMPLETE:
457 : case FI_AV_COMPLETE:
458 : case FI_JOIN_COMPLETE:
459 : // Not implemented
460 538 : break;
461 :
462 :
463 95 : case -FI_EAVAIL:
464 :
465 95 : switch(err_entry.err) {
466 :
467 95 : case FI_ECONNREFUSED:
468 :
469 95 : log_dbg("Connection refused (FI_ECONNREFUSED). Deallocating send_socket resources.");
470 :
471 95 : if(socket->eqfd < 0){
472 : log_dbg("FI_ECONNREFUSED on send socket with EQ fd %d. Not clearing it.", socket->eqfd);
473 : return;
474 : }
475 :
476 95 : handle_send_socket_shutdown_on_connection_refused(socket);
477 :
478 95 : if(socket->cb_error_connection_refused) {
479 95 : socket->cb_error_connection_refused(socket);
480 : }
481 :
482 95 : if (socket->recv_socket != NULL){
483 0 : if (socket->recv_socket->eqfd < 0 ){
484 0 : log_info("Ignoring FI_ETIMEDOUT on recv_socket, invalid eqfd (socket already closed)");
485 0 : return;
486 : }
487 0 : log_info("Shutting down receive socket on FI_ETIMEDOUT");
488 0 : handle_recv_socket_shutdown(socket->recv_socket);
489 0 : if(socket->recv_socket->lsocket->cb_connection_closed) {
490 0 : socket->recv_socket->lsocket->cb_connection_closed(socket->recv_socket);
491 : }
492 0 : if (socket->recv_socket->lsocket->attr.num_buffers > 0){
493 0 : for(int i = 0; i < socket->recv_socket->lsocket->attr.num_buffers; i++){
494 0 : free(socket->recv_socket->recv_buffers[i].data);
495 : }
496 0 : free(socket->recv_socket->recv_buffers);
497 : }
498 : }
499 :
500 : //print_openfds(&socket->ctx->evloop.openfds);
501 : break;
502 :
503 0 : case FI_ETIMEDOUT:
504 0 : log_info("fi_verbs_process_send_socket_cm_event: FI_ETIMEDOUT");
505 0 : if (socket->eqfd < 0 ){
506 0 : log_info("Ignoring FI_SHUTDOWN on send_socket, invalid eqfd (socket already closed)");
507 0 : break;
508 : }
509 :
510 : // Need to take care of receive socket as well
511 0 : if (socket->recv_socket != NULL){
512 0 : if (socket->recv_socket->eqfd < 0 ){
513 0 : log_info("Ignoring FI_ETIMEDOUT on recv_socket, invalid eqfd (socket already closed)");
514 0 : return;
515 : }
516 0 : log_info("Shutting down receive socket on FI_ETIMEDOUT");
517 0 : handle_recv_socket_shutdown(socket->recv_socket);
518 0 : if(socket->recv_socket->lsocket->cb_connection_closed) {
519 0 : socket->recv_socket->lsocket->cb_connection_closed(socket->recv_socket);
520 : }
521 0 : if (socket->recv_socket->lsocket->attr.num_buffers > 0){
522 0 : for(int i = 0; i < socket->recv_socket->lsocket->attr.num_buffers; i++){
523 0 : free(socket->recv_socket->recv_buffers[i].data);
524 : }
525 0 : free(socket->recv_socket->recv_buffers);
526 : }
527 : }
528 :
529 0 : if(socket->cqfd < 0){ //cq not initalized yet
530 0 : handle_send_socket_shutdown_on_connection_refused(socket);
531 : } else {
532 0 : if(socket->cb_internal_connection_closed){
533 0 : socket->cb_internal_connection_closed(socket);
534 : }
535 0 : if(socket->cb_connection_closed) {
536 0 : socket->cb_connection_closed(socket);
537 : }
538 : }
539 :
540 : break;
541 :
542 0 : default:
543 :
544 0 : log_error("Unhandled error in the Event Queue: %s (code: %d), provider specific: %s (code: %d)",
545 : fi_strerror(err_entry.err), err_entry.err,
546 : fi_eq_strerror(socket->eq, err_entry.prov_errno, err_entry.err_data, NULL, 0),
547 : err_entry.prov_errno);
548 : }
549 : return;
550 :
551 538 : case -FI_EAGAIN:
552 538 : struct fid* fp = &socket->eq->fid;
553 538 : fi_trywait(socket->domain->fabric, &fp, 1);
554 538 : break;
555 :
556 0 : default:
557 0 : log_error("Unexpected event %d in send socket Event Queue", event);
558 0 : exit(2);
559 : }
560 : }
561 :
562 :
563 : void
564 107 : on_listen_socket_libfabric_cm_event(int fd, void* ptr)
565 : {
566 107 : struct netio_listen_socket* lsocket = (struct netio_listen_socket*)ptr;
567 107 : log_dbg("listen socket: connection event");
568 107 : if(lsocket->eqfd != fd){
569 0 : log_dbg("listen socket is already closed.");
570 0 : return;
571 : }
572 :
573 107 : struct fi_info *info = NULL;
574 107 : struct fi_eq_err_entry err_entry;
575 107 : int event = read_cm_event(lsocket->eq, &info, &err_entry);
576 :
577 :
578 107 : switch (event)
579 : {
580 107 : case FI_CONNREQ:
581 107 : log_dbg("fi_verbs_process_listen_socket_cm_event: FI_CONNREQ");
582 107 : struct netio_socket_list* new_entry = add_socket(&lsocket->recv_sockets, URECV);
583 107 : struct netio_recv_socket* rsocket = (struct netio_recv_socket*)new_entry->socket;
584 107 : netio_init_recv_socket(rsocket, lsocket);
585 107 : handle_connreq(rsocket, lsocket, info, on_recv_socket_libfabric_cm_event, rsocket);
586 107 : if(lsocket->recv_sub_msg == 1){
587 44 : rsocket->sub_msg_buffers = malloc(32*sizeof(struct netio_buffer*));
588 1452 : for (int i = 0; i < 32; i++){
589 1408 : rsocket->sub_msg_buffers[i] = malloc(sizeof(struct netio_buffer));
590 1408 : rsocket->sub_msg_buffers[i]->size = sizeof(struct netio_subscription_message);
591 1408 : rsocket->sub_msg_buffers[i]->data = malloc(rsocket->sub_msg_buffers[i]->size);
592 1408 : netio_register_recv_buffer(rsocket, rsocket->sub_msg_buffers[i], 0);
593 1408 : netio_post_recv(rsocket, rsocket->sub_msg_buffers[i]);
594 : }
595 : log_dbg("Posted recv for subscription messages");
596 63 : } else if (lsocket->attr.num_buffers > 0) {
597 63 : log_dbg("Connection established, posting %lu buffers", lsocket->attr.num_buffers);
598 63 : rsocket->sub_msg_buffers = NULL;
599 63 : rsocket->recv_buffers = malloc(lsocket->attr.num_buffers * sizeof(struct netio_buffer));
600 14406 : for(unsigned i=0; i<lsocket->attr.num_buffers; i++) {
601 14343 : log_trc("registering buffer of size %lu", lsocket->attr.buffer_size);
602 14343 : rsocket->recv_buffers[i].size = lsocket->attr.buffer_size;
603 14343 : rsocket->recv_buffers[i].data = malloc(lsocket->attr.buffer_size);
604 14343 : netio_register_recv_buffer(rsocket, &rsocket->recv_buffers[i], 0);
605 14343 : netio_post_recv(rsocket, &rsocket->recv_buffers[i]);
606 : }
607 : } else {
608 0 : log_error("Something went wrong. Not allocating any buffers for recv socket.");
609 : }
610 : break;
611 :
612 0 : case FI_CONNECTED:
613 0 : log_fatal("FI_CONNECTED received on listen socket");
614 0 : exit(2);
615 :
616 0 : case FI_SHUTDOWN:
617 0 : log_fatal("FI_SHUTDOWN received on listen socket");
618 0 : exit(2);
619 :
620 0 : case -FI_EAGAIN:
621 0 : struct fid* fp = &lsocket->eq->fid;
622 0 : fi_trywait(lsocket->fabric, &fp, 1);
623 0 : break;
624 :
625 0 : case -FI_EAVAIL:
626 0 : log_error("Unhandled error in listen socket EQ code: %d, provider specific code: %d",
627 : err_entry.err, err_entry.prov_errno);
628 0 : break;
629 : }
630 107 : fi_freeinfo(info);
631 : }
632 :
633 :
634 : void
635 113 : on_buffered_listen_socket_libfabric_cm_event(int fd, void* ptr)
636 : {
637 113 : struct netio_buffered_listen_socket* lsocket = (struct netio_buffered_listen_socket*)ptr;
638 113 : log_dbg("buffered listen socket: connection event");
639 113 : if(lsocket->listen_socket.eqfd != fd){
640 0 : log_dbg("Buffered listen socket CM event: inconsistend fd. Ignoring event.");
641 0 : return;
642 : }
643 :
644 113 : struct fi_info *info = NULL;
645 113 : struct fi_eq_err_entry err_entry;
646 113 : int event = read_cm_event(lsocket->listen_socket.eq, &info, &err_entry);
647 :
648 113 : switch (event)
649 : {
650 113 : case FI_CONNREQ:
651 113 : log_dbg("FI_CONNREQ");
652 113 : struct netio_socket_list* new_entry = add_socket(&(lsocket->listen_socket.recv_sockets), BRECV);
653 113 : struct netio_buffered_recv_socket* rsocket = (struct netio_buffered_recv_socket*)new_entry->socket;
654 113 : netio_buffered_recv_socket_init(rsocket, lsocket);
655 113 : handle_connreq(&rsocket->recv_socket, &lsocket->listen_socket, info, on_buffered_recv_socket_libfabric_cm_event, rsocket);
656 113 : log_info("fi_verbs_process_recv_socket_cm_event: FI_CONNECTED, posting %d buffers", rsocket->num_pages);
657 24649 : for(unsigned int i=0; i<rsocket->num_pages; i++) {
658 24536 : netio_register_recv_buffer(&rsocket->recv_socket, &rsocket->pages[i], 0);
659 24536 : netio_post_recv(&rsocket->recv_socket, &rsocket->pages[i]);
660 : }
661 : break;
662 :
663 0 : case FI_CONNECTED:
664 0 : log_fatal("FI_CONNECTED received on buffered listen socket");
665 0 : exit(2);
666 :
667 0 : case FI_SHUTDOWN:
668 0 : log_fatal("FI_SHUTDOWN received on buffered listen socket");
669 0 : exit(2);
670 :
671 0 : case -FI_EAGAIN:
672 0 : struct fid* fp = &lsocket->listen_socket.eq->fid;
673 0 : fi_trywait(lsocket->listen_socket.fabric, &fp, 1);
674 0 : break;
675 :
676 0 : case -FI_EAVAIL:
677 0 : log_error("Unhandled error in buffer listen socket EQ code: %d, provider specific code: %d",
678 : err_entry.err, err_entry.prov_errno);
679 0 : break;
680 : }
681 113 : fi_freeinfo(info);
682 : }
683 :
684 :
685 : void
686 189 : on_recv_socket_libfabric_cm_event(int fd, void* ptr)
687 : {
688 189 : struct netio_recv_socket* socket = (struct netio_recv_socket*)ptr;
689 189 : log_info("recv socket %d: connection event", socket->eqfd);
690 189 : if(socket->eqfd != fd){
691 : log_dbg("Recv socket CM event: inconsistend fd. Ignoring event.");
692 0 : return;
693 : }
694 189 : int ret;
695 189 : struct fi_eq_err_entry err_entry;
696 189 : uint32_t event = read_cm_event(socket->eq, NULL, &err_entry);
697 :
698 189 : switch (event)
699 : {
700 107 : case FI_CONNECTED:
701 107 : log_info("fi_verbs_process_recv_socket_cm_event: FI_CONNECTED");
702 107 : socket->cqfd = -1;
703 107 : if((ret = fi_control(&socket->cq->fid, FI_GETWAIT, &socket->cqfd)))
704 : {
705 0 : FATAL("Failed to retrieve recv socket Completion Queue wait object", ret);
706 : }
707 :
708 107 : socket->cq_ev_ctx.fd = socket->cqfd;
709 107 : socket->cq_ev_ctx.data = socket;
710 107 : socket->cq_ev_ctx.cb = on_recv_socket_cq_event;
711 :
712 107 : log_dbg("Adding recv CQ polled fid %d %p", socket->cqfd, &socket->cq->fid);
713 107 : add_polled_fid(&socket->ctx->evloop.pfids, socket->lsocket->fabric, &socket->cq->fid, socket->cqfd, socket, on_recv_socket_cq_event);
714 107 : add_open_fd(&socket->ctx->evloop.openfds, socket->cqfd, NETIO_CQ, URECV, socket);
715 107 : netio_register_read_fd(&socket->ctx->evloop, &socket->cq_ev_ctx);
716 :
717 107 : log_dbg("recv_socket: EQ fd %d CQ fd %d connected", socket->eqfd, socket->cqfd);
718 107 : if(socket->lsocket->cb_connection_established) {
719 107 : socket->lsocket->cb_connection_established(socket);
720 : }
721 :
722 189 : break;
723 :
724 82 : case FI_SHUTDOWN:
725 82 : if (socket->eqfd < 0 ){
726 0 : log_info("Ignoring FI_SHUTDOWN on recv_socket, invalid eqfd (socket already closed)");
727 0 : return;
728 : }
729 82 : log_info("fi_verbs_process_recv_socket_cm_event: FI_SHUTDOWN");
730 82 : handle_recv_socket_shutdown(socket);
731 82 : if(socket->lsocket->cb_connection_closed) {
732 47 : socket->lsocket->cb_connection_closed(socket);
733 : }
734 :
735 82 : if (socket->lsocket->attr.num_buffers > 0){
736 11569 : for(int i = 0; i < socket->lsocket->attr.num_buffers; i++){
737 11522 : free(socket->recv_buffers[i].data);
738 : }
739 47 : free(socket->recv_buffers);
740 : }
741 :
742 82 : int return_value = remove_socket(&socket->lsocket->recv_sockets, socket);
743 82 : log_info("Recv socket removed, result: %d", return_value);
744 82 : break;
745 :
746 : case FI_MR_COMPLETE:
747 : case FI_AV_COMPLETE:
748 : case FI_JOIN_COMPLETE:
749 : // Not implemented
750 : break;
751 :
752 0 : case -FI_EAGAIN:
753 0 : struct fid* fp = &socket->eq->fid;
754 0 : fi_trywait(socket->lsocket->fabric, &fp, 1);
755 0 : break;
756 :
757 0 : case -FI_EAVAIL:
758 0 : log_error("Unhandled error in recv socket EQ code: %d, provider specific code: %d",
759 : err_entry.err, err_entry.prov_errno);
760 0 : break;
761 :
762 0 : default:
763 0 : log_error("Unexpected event %d in recv socket Event Queue", event);
764 0 : exit(2);
765 189 : break;
766 : }
767 : }
768 :
769 :
770 : void
771 203 : on_buffered_recv_socket_libfabric_cm_event(int fd, void* ptr)
772 : {
773 203 : struct netio_buffered_recv_socket* socket = (struct netio_buffered_recv_socket*)ptr;
774 203 : log_dbg("buffered recv socket %d: connection event (FD = %d)", socket->recv_socket.eqfd, fd);
775 203 : if(socket->recv_socket.eqfd != fd){
776 : log_dbg("Buffered recv socket CM event: inconsistend fd. Ignoring event.");
777 0 : return;
778 : }
779 203 : int ret;
780 203 : struct fi_eq_err_entry err_entry;
781 203 : uint32_t event = read_cm_event(socket->recv_socket.eq, NULL, &err_entry);
782 :
783 203 : switch (event)
784 : {
785 113 : case FI_CONNECTED:
786 113 : socket->recv_socket.cqfd = -1;
787 113 : if((ret = fi_control(&socket->recv_socket.cq->fid, FI_GETWAIT, &socket->recv_socket.cqfd)))
788 : {
789 0 : FATAL("Failed to retrieve recv socket Completion Queue wait object", ret);
790 : }
791 :
792 113 : socket->recv_socket.cq_ev_ctx.fd = socket->recv_socket.cqfd;
793 113 : socket->recv_socket.cq_ev_ctx.data = &socket->recv_socket;
794 113 : socket->recv_socket.cq_ev_ctx.cb = on_recv_socket_cq_event;
795 :
796 113 : log_dbg("Adding BUFFERED RECV CQ polled fid %d %p", socket->recv_socket.cqfd, &socket->recv_socket.cq->fid);
797 113 : add_open_fd(&socket->recv_socket.ctx->evloop.openfds, socket->recv_socket.cqfd, NETIO_CQ, BRECV, &socket->recv_socket);
798 113 : add_polled_fid(&socket->recv_socket.ctx->evloop.pfids,
799 113 : socket->recv_socket.lsocket->fabric,
800 113 : &socket->recv_socket.cq->fid,
801 : socket->recv_socket.cqfd,
802 : &socket->recv_socket,
803 : on_recv_socket_cq_event);
804 :
805 113 : netio_register_read_fd(&socket->recv_socket.ctx->evloop, &socket->recv_socket.cq_ev_ctx);
806 113 : log_dbg("buffered_recv_socket: registering CQ fd %d", socket->recv_socket.cqfd);
807 :
808 113 : if(socket->lsocket->cb_connection_established) {
809 113 : socket->lsocket->cb_connection_established(socket);
810 : }
811 :
812 203 : break;
813 :
814 90 : case FI_SHUTDOWN:
815 90 : log_dbg("recv socket shutdown");
816 90 : if (socket->recv_socket.eqfd < 0 ){
817 0 : log_dbg("Ignoring FI_SHUTDOWN on buffered recv_socket, invalid eqfd (socket already closed)");
818 0 : return;
819 : }
820 90 : handle_recv_socket_shutdown(&socket->recv_socket);
821 16418 : for(unsigned int i=0; i<socket->num_pages; i++) {
822 16328 : free(socket->pages[i].data);
823 : }
824 90 : free(socket->pages);
825 90 : if(socket->lsocket->cb_connection_closed) {
826 90 : socket->lsocket->cb_connection_closed(socket);
827 : }
828 :
829 90 : remove_socket(&socket->lsocket->listen_socket.recv_sockets, socket);
830 90 : break;
831 :
832 : case FI_MR_COMPLETE:
833 : case FI_AV_COMPLETE:
834 : case FI_JOIN_COMPLETE:
835 : // Not implemented
836 : break;
837 :
838 0 : case -FI_EAGAIN:
839 0 : struct fid* fp = &socket->recv_socket.eq->fid;
840 0 : fi_trywait(socket->lsocket->listen_socket.fabric, &fp, 1);
841 0 : break;
842 :
843 0 : case -FI_EAVAIL:
844 : // error was found
845 0 : log_error("Unhandled error in buffered recv socket EQ code: %d, provider specific code: %d",
846 : err_entry.err, err_entry.prov_errno);
847 0 : break;
848 :
849 0 : default:
850 0 : log_error("Unexpected event %d in buffered recv socket Event Queue", event);
851 0 : exit(2);
852 203 : break;
853 : }
854 : }
855 :
856 :
857 : // CALLBACKS FOR GARBAGE COLLECTION //////////////////////////////////////////////////////////
858 : void
859 0 : close_send_socket(void* ptr)
860 : {
861 0 : log_info("Close_send_socket. Not supported anymore");
862 0 : struct signal_data* sd = (struct signal_data*)ptr;
863 :
864 0 : struct netio_send_socket* socket = (struct netio_send_socket*)sd->ptr;
865 0 : if(socket->tcp_fi_mode == NETIO_MODE_TCP){
866 0 : shutdown(socket->cq_ev_ctx.fd, SHUT_WR);
867 0 : netio_signal_close(&socket->ctx->evloop, &socket->tcp_signal);
868 0 : epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->eq_ev_ctx.fd, NULL);
869 0 : int rc = close(socket->cq_ev_ctx.fd);
870 0 : if ( rc ){ log_warn("Cannot close TCP send socket fd %d: %s", socket->cq_ev_ctx.fd, strerror(errno));}
871 0 : if ( !rc ){ remove_open_fd(&socket->ctx->evloop, socket->eq_ev_ctx.fd); }
872 0 : socket->state=DISCONNECTED;
873 0 : if(socket->cb_connection_closed) {
874 0 : socket->cb_connection_closed(socket);
875 : }
876 0 : } else if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC) {
877 : //commented out as closing libfabric sockets relies on exhanges of fi_shutdown
878 : //handle_send_socket_shutdown(socket);
879 : } else {
880 0 : log_error("Closing send socket with unknown netio mode %d", socket->tcp_fi_mode);
881 : }
882 :
883 : //delete the used signal
884 0 : netio_signal_close(sd->evloop, sd->signal);
885 0 : free(sd->signal);
886 0 : free(sd);
887 0 : }
888 :
889 : void
890 0 : close_buffered_send_socket(void *ptr)
891 : {
892 0 : log_dbg("Closing buffered_send_socket %p.", ptr);
893 0 : struct signal_data* sd = (struct signal_data*)ptr;
894 :
895 0 : struct netio_buffered_send_socket* socket = (struct netio_buffered_send_socket*)sd->ptr;
896 0 : struct netio_send_socket* ssocket = &socket->send_socket;
897 0 : if(ssocket->tcp_fi_mode == NETIO_MODE_TCP){
898 0 : netio_timer_close(&(ssocket->ctx->evloop), &socket->flush_timer);
899 0 : netio_signal_close(&(ssocket->ctx->evloop), &socket->signal_buffer_available);
900 0 : shutdown(ssocket->eq_ev_ctx.fd, SHUT_WR);
901 0 : netio_signal_close(&ssocket->ctx->evloop, &ssocket->tcp_signal);
902 0 : epoll_ctl(ssocket->ctx->evloop.epollfd, EPOLL_CTL_DEL, ssocket->eq_ev_ctx.fd, NULL);
903 0 : int rc = close(ssocket->eq_ev_ctx.fd);
904 0 : if ( !rc ){ remove_open_fd(&ssocket->ctx->evloop, ssocket->eq_ev_ctx.fd); }
905 0 : if(ssocket->cb_connection_closed) {
906 0 : ssocket->cb_connection_closed(ssocket);
907 : }
908 0 : } else if(ssocket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
909 : //commented out as closing libfabric sockets relies on exhanges of fi_shutdown
910 : //handle_send_socket_shutdown(ssocket);
911 : } else {
912 0 : log_error("Closing send socket with unkown netio mode %d", ssocket->tcp_fi_mode);
913 : }
914 0 : for(size_t i=0; i < socket->buffers.num_buffers; ++i ){
915 0 : free(socket->buffers.buffers[i]);
916 : }
917 :
918 : //delete the used signal
919 0 : netio_signal_close(sd->evloop, sd->signal);
920 0 : free(sd->signal);
921 0 : free(sd);
922 0 : }
923 :
924 : void
925 2 : close_recv_socket(void* ptr)
926 : {
927 2 : log_dbg("Closing recv_socket %p.", ptr);
928 2 : struct signal_data* sd = (struct signal_data*)ptr;
929 2 : struct netio_recv_socket* socket = (struct netio_recv_socket*)sd->ptr;
930 2 : if(socket->tcp_fi_mode == NETIO_MODE_TCP){
931 2 : shutdown(socket->eq_ev_ctx.fd, SHUT_RD);
932 : // netio_signal_close(&socket->ctx->evloop, &socket->tcp_signal); // causes bad file descriptor
933 2 : epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->eq_ev_ctx.fd, NULL);
934 2 : int rc = close(socket->eq_ev_ctx.fd);
935 2 : if ( !rc ){ remove_open_fd(&socket->ctx->evloop, socket->eq_ev_ctx.fd); }
936 2 : if(socket->lsocket->cb_connection_closed) {
937 0 : socket->lsocket->cb_connection_closed(socket);
938 : }
939 0 : } else if(socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
940 : //commented out as closing libfabric sockets relies on exhanges of fi_shutdown
941 : //handle_recv_socket_shutdown(socket);
942 : } else {
943 0 : log_error("Closing recv socket with unkown netio mode %d", socket->tcp_fi_mode);
944 : }
945 :
946 : //clean up signal
947 2 : netio_signal_close(sd->evloop, sd->signal);
948 2 : free(sd->signal);
949 2 : free(sd);
950 2 : }
951 :
952 : void
953 0 : close_buffered_recv_socket(void* ptr)
954 : {
955 0 : log_info("Closing buffered_recv_socket %p. Not supported anymore", ptr);
956 0 : struct signal_data* sd = (struct signal_data*)ptr;
957 : /*
958 : struct netio_buffered_recv_socket* socket = (struct netio_buffered_recv_socket*)sd->ptr;
959 : struct netio_recv_socket* rsocket = &socket->recv_socket;
960 : if(rsocket->tcp_fi_mode == NETIO_MODE_TCP){
961 : shutdown(rsocket->eq_ev_ctx.fd, SHUT_RD);
962 : netio_signal_close(&rsocket->ctx->evloop, &rsocket->tcp_signal);
963 : epoll_ctl(rsocket->ctx->evloop.epollfd, EPOLL_CTL_DEL, rsocket->eq_ev_ctx.fd, NULL);
964 : int rc = close(rsocket->eq_ev_ctx.fd);
965 : if ( !rc ){ remove_open_fd(&rsocket->ctx->evloop.openfds, rsocket->eq_ev_ctx.fd); }
966 : } else if (rsocket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
967 : handle_recv_socket_shutdown(rsocket);
968 : } else {
969 : log_error("Closing buffered recv socket with unkown netio mode %d", rsocket->tcp_fi_mode);
970 : }
971 : free(socket->pages);
972 : */
973 : //clean up signal
974 0 : netio_signal_close(sd->evloop, sd->signal);
975 0 : free(sd->signal);
976 0 : free(sd);
977 0 : }
978 :
979 :
980 : void
981 41 : close_listen_socket(void* ptr)
982 : {
983 41 : log_dbg("close_listen_socket");
984 41 : struct signal_data* sd = (struct signal_data*)ptr;
985 41 : struct netio_listen_socket* socket = (struct netio_listen_socket*)sd->ptr;
986 :
987 41 : if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
988 41 : if(socket->recv_sockets != NULL){
989 : struct netio_socket_list* entry = socket->recv_sockets;
990 : int still_open = 0;
991 0 : while(entry != NULL){
992 0 : struct netio_recv_socket* recv_socket = entry->socket;
993 0 : still_open += check_open_fd_exists(&socket->ctx->evloop.openfds, recv_socket->eqfd);
994 0 : entry = entry->next;
995 : }
996 0 : if(still_open){
997 0 : netio_signal_fire(sd->signal);
998 0 : return;
999 : }
1000 0 : free_socket_list(&socket->recv_sockets);
1001 : }
1002 41 : handle_listen_socket_shutdown(socket);
1003 : }
1004 0 : else if (socket->tcp_fi_mode == NETIO_MODE_TCP){
1005 0 : if(socket->recv_sockets != NULL){
1006 : struct netio_socket_list* entry = socket->recv_sockets;
1007 0 : while(entry != NULL){
1008 0 : struct netio_recv_socket* recv_socket = entry->socket;
1009 0 : shutdown(recv_socket->eq_ev_ctx.fd, SHUT_RDWR);
1010 0 : close(recv_socket->eq_ev_ctx.fd);
1011 0 : remove_open_fd(&socket->ctx->evloop, recv_socket->eq_ev_ctx.fd);
1012 0 : entry = entry->next;
1013 : }
1014 0 : free_socket_list(&socket->recv_sockets);
1015 : }
1016 : }
1017 : else {
1018 0 : log_error("Connection mode unsupported");
1019 : }
1020 :
1021 : //clean up signal
1022 41 : netio_signal_close(sd->evloop, sd->signal);
1023 41 : free(sd->signal);
1024 41 : free(sd);
1025 : }
1026 :
1027 :
1028 :
1029 : void
1030 61 : close_buffered_listen_socket(void* ptr)
1031 : {
1032 61 : log_dbg("close_buffered_listen_socket");
1033 61 : struct signal_data* sd = (struct signal_data*)ptr;
1034 61 : struct netio_listen_socket* socket = (struct netio_listen_socket*)sd->ptr;
1035 :
1036 61 : if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
1037 61 : if(socket->recv_sockets != NULL){
1038 : struct netio_socket_list* entry = socket->recv_sockets;
1039 : int still_open = 0;
1040 0 : while(entry != NULL){
1041 0 : struct netio_buffered_recv_socket* recv_socket = entry->socket;
1042 0 : still_open += check_open_fd_exists(&socket->ctx->evloop.openfds, recv_socket->recv_socket.eqfd);
1043 0 : entry = entry->next;
1044 : }
1045 0 : if(still_open){
1046 0 : netio_signal_fire(sd->signal);
1047 0 : return;
1048 : }
1049 0 : free_socket_list(&socket->recv_sockets);
1050 : }
1051 61 : handle_listen_socket_shutdown(socket);
1052 : }
1053 0 : else if (socket->tcp_fi_mode == NETIO_MODE_TCP){
1054 0 : if(socket->recv_sockets != NULL){
1055 : struct netio_socket_list* entry = socket->recv_sockets;
1056 0 : while(entry != NULL){
1057 0 : struct netio_buffered_recv_socket* recv_socket = entry->socket;
1058 0 : shutdown(recv_socket->recv_socket.eqfd, SHUT_RDWR);
1059 0 : close(recv_socket->recv_socket.eqfd);
1060 0 : remove_open_fd(&socket->ctx->evloop, recv_socket->recv_socket.eqfd);
1061 0 : entry = entry->next;
1062 : }
1063 0 : free_socket_list(&socket->recv_sockets);
1064 : }
1065 : }
1066 : else {
1067 0 : log_error("Connection mode unsupported");
1068 : }
1069 :
1070 : //clean up signal
1071 61 : netio_signal_close(sd->evloop, sd->signal);
1072 61 : free(sd->signal);
1073 61 : free(sd);
1074 : }
1075 :
1076 :
1077 : void
1078 153 : handle_listen_socket_shutdown(struct netio_listen_socket* socket)
1079 : {
1080 153 : if(socket->eqfd < 0){return;}//nothing to do
1081 153 : struct epoll_event ep_event; /* needed only for kernel <2.6.9 */
1082 153 : log_dbg("Handle_listen_socket_shutdown. Lsocket EQ: %d with evloop %d socket %p", socket->eqfd, socket->ctx->evloop.epollfd, socket);
1083 153 : int ret = epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->eqfd, &ep_event);
1084 153 : if(ret){ log_warn("Cannot deregister listen socket EQ %d from evloop %d", socket->eqfd, socket->ctx->evloop.epollfd); }
1085 :
1086 153 : if(socket->pep != NULL){
1087 153 : fi_close(&socket->pep->fid);
1088 153 : socket->pep = NULL;
1089 : }
1090 :
1091 153 : log_dbg("netio_listen_socket: removing EQ fd %d from evloop %d, ret %d", socket->eqfd, socket->ctx->evloop.epollfd, ret);
1092 153 : if(socket->eq && (ret = fi_close(&socket->eq->fid)))
1093 : {
1094 0 : log_error("Failed to close listen socket %d: %s", ret, fi_strerror(-ret));
1095 0 : ret = close(socket->eq_ev_ctx.fd);
1096 0 : if ( ret ) {log_warn("Cannot close listen socket EQ fd %d: %s", socket->eq_ev_ctx.fd, strerror(errno));}
1097 : }
1098 153 : remove_open_fd(&socket->ctx->evloop, socket->eqfd);
1099 153 : socket->eqfd = -1;
1100 :
1101 153 : if(socket->fi != NULL){
1102 153 : fi_freeinfo(socket->fi);
1103 153 : socket->fi = NULL;
1104 : }
1105 : //fi_close(&socket->fabric->fid);
1106 : }
1107 :
1108 :
1109 : //This function is the TCP equivalent of on_send_socket_cm_event()
1110 : void
1111 117 : on_send_socket_tcp_cm_event(int fd, void* ptr)
1112 : {
1113 117 : int ret, retVal;
1114 117 : socklen_t in_len;
1115 117 : struct netio_send_socket* socket = (struct netio_send_socket*)ptr;
1116 :
1117 117 : log_dbg("connection event called with fd = %d", fd);
1118 :
1119 117 : in_len = sizeof(retVal);
1120 117 : ret = getsockopt(fd, SOL_SOCKET, SO_ERROR, &retVal, &in_len);
1121 117 : if (ret < 0)
1122 : {
1123 0 : log_error("on_send_socket_tcp_cm_event: getsockopt failed", ret);
1124 : }
1125 :
1126 117 : if (retVal != 0)
1127 : {
1128 0 : log_error("on_send_socket_tcp_cm_event: connect did not work. retVal = %d, ret = %d, errno = %d", retVal, ret, errno);
1129 : }
1130 :
1131 : //MJ: The log_error above sometimes returns "retVal = 111, ret = 0, errno = 115". This error indicates that the receiving side temporarily
1132 : //MJ: cannot acknowledge the connection request.
1133 : //MJ: Should we somehow retry the connection request in netio or leave it to the user to redo it. In the latter case we have to give a clear error back to the user
1134 : //MJ: Experience shows that just waiting a few seconds and then retrying actually helps.
1135 :
1136 117 : log_dbg("Register the callback for on_send_completed. Using TCP socket %d", fd);
1137 117 : socket->cq_ev_ctx.fd = fd;
1138 117 : socket->cq_ev_ctx.data = socket;
1139 117 : socket->cq_ev_ctx.cb = on_send_socket_tcp_cq_event;
1140 117 : netio_register_write_tcp_fd(&socket->ctx->evloop, &socket->cq_ev_ctx);
1141 :
1142 : //MJ: In case of the TCP instruction chain "socket()->connect()->write()/recv()" one is always using the same FD.
1143 : //MJ: therefore we don't know if a connect() or a write()/recv() has completed when we receive a trigger from this FD.
1144 : //MJ: Therefore the code below may not work because cb_connection_established() will also get called on write()/recv()
1145 117 : socket->state = CONNECTED;
1146 :
1147 117 : if(socket->cb_connection_established)
1148 : {
1149 117 : log_dbg("Calling the callback that is registered for cb_connection_established");
1150 117 : socket->cb_connection_established(socket);
1151 : }
1152 117 : }
1153 :
1154 :
1155 : //This function is the TCP equivalent of on_listen_socket_cm_event()
1156 : void
1157 42 : on_listen_socket_tcp_cm_event(int fd, void* ptr)
1158 : {
1159 42 : int infd;
1160 42 : socklen_t in_len;
1161 42 : struct sockaddr in_addr;
1162 42 : log_info("on_listen_socket_tcp_cm_event: called with fd = %d", fd);
1163 :
1164 42 : struct netio_listen_socket* lsocket = (struct netio_listen_socket*)ptr;
1165 42 : in_len = sizeof in_addr;
1166 42 : infd = accept(fd, &in_addr, &in_len);
1167 :
1168 42 : log_dbg("on_listen_socket_tcp_cm_event: listen socket: infd = %d", infd);
1169 42 : if (infd == -1) {
1170 0 : if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) // We have processed all incoming connections.
1171 : log_dbg("on_listen_socket_tcp_cm_event: nothing to do?");
1172 : else
1173 : {
1174 0 : log_dbg("on_listen_socket_tcp_cm_event: ERROR. errno = %d", errno);
1175 0 : exit(-1);
1176 : }
1177 : }
1178 :
1179 : #if defined DEBUG || defined DEBUG_CM
1180 : char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
1181 : int ret = getnameinfo(&in_addr, in_len, hbuf, sizeof hbuf, sbuf, sizeof sbuf, 0);
1182 : if (ret == 0) {
1183 : log_dbg("on_listen_socket_tcp_cm_event: Accepted connection on descriptor %d (host=%s, service=%s)", infd, hbuf, sbuf);
1184 : }
1185 : #endif
1186 :
1187 42 : make_socket_non_blocking(infd); // Make the incoming socket non-blocking
1188 42 : log_dbg("Adding TCP/IP recv socket to listen socket");
1189 42 : struct netio_socket_list* new_entry = add_socket(&lsocket->recv_sockets, URECV);
1190 42 : struct netio_recv_socket* rsocket = (struct netio_recv_socket*)new_entry->socket;
1191 42 : netio_init_recv_tcp_socket(rsocket, lsocket);
1192 :
1193 42 : log_dbg("Adding receive socket %d to epoll", infd);
1194 42 : rsocket->eq_ev_ctx.fd = infd;
1195 42 : rsocket->eq_ev_ctx.data = rsocket;
1196 42 : rsocket->eq_ev_ctx.cb = on_recv_socket_tcp_cm_event;
1197 42 : netio_register_read_tcp_fd(&rsocket->ctx->evloop, &rsocket->eq_ev_ctx);
1198 :
1199 42 : if(lsocket->recv_sub_msg == 1){
1200 2 : rsocket->sub_msg_buffers = malloc(32*sizeof(struct netio_buffer*));
1201 66 : for (int i = 0; i < 32; i++){
1202 64 : rsocket->sub_msg_buffers[i] = malloc(sizeof(struct netio_buffer));
1203 64 : rsocket->sub_msg_buffers[i]->size = sizeof(struct netio_subscription_message);
1204 64 : rsocket->sub_msg_buffers[i]->data = malloc(rsocket->sub_msg_buffers[i]->size);
1205 64 : netio_post_recv(rsocket, rsocket->sub_msg_buffers[i]);
1206 : }
1207 : log_dbg("Posted recv for subscription messages");
1208 40 : } else if (lsocket->attr.num_buffers > 0) {
1209 40 : log_dbg("Connection established, posting %lu buffers", lsocket->attr.num_buffers);
1210 40 : rsocket->sub_msg_buffers = NULL;
1211 40 : rsocket->recv_buffers = malloc(lsocket->attr.num_buffers * sizeof(struct netio_buffer));
1212 10280 : for(unsigned i=0; i<lsocket->attr.num_buffers; i++) {
1213 10240 : log_trc("registering buffer of size %lu", lsocket->attr.buffer_size);
1214 10240 : rsocket->recv_buffers[i].size = lsocket->attr.buffer_size;
1215 10240 : rsocket->recv_buffers[i].data = malloc(lsocket->attr.buffer_size);
1216 10240 : netio_post_recv(rsocket, &rsocket->recv_buffers[i]);
1217 : }
1218 : } else {
1219 0 : log_error("Something went wrong. Not allocating any buffers for recv socket.");
1220 : }
1221 :
1222 42 : on_recv_socket_tcp_cm_event(infd, (void*)rsocket);
1223 42 : log_dbg("Done. connection accepted");
1224 42 : }
1225 :
1226 :
1227 : void
1228 58 : on_buffered_recv_socket_tcp_cm_event(int fd, void* ptr)
1229 : {
1230 58 : log_info("Connection event on buffered TCP/IP recv (FD = %d, ptr = %p)", fd, ptr);
1231 58 : struct netio_buffered_recv_socket *socket = (struct netio_buffered_recv_socket*)ptr;
1232 :
1233 58 : socket->recv_socket.cq_ev_ctx.fd = fd;
1234 58 : socket->recv_socket.cq_ev_ctx.data = socket;
1235 58 : socket->recv_socket.cq_ev_ctx.cb = on_recv_socket_tcp_cq_event;
1236 58 : netio_register_read_tcp_fd(&socket->recv_socket.ctx->evloop, &socket->recv_socket.cq_ev_ctx);
1237 :
1238 58 : log_dbg("posting %d buffers", socket->num_pages);
1239 7958 : for(unsigned int i=0; i<socket->num_pages; i++) {
1240 7900 : netio_post_recv(&socket->recv_socket, &socket->pages[i]);
1241 : }
1242 :
1243 58 : if(socket->lsocket->cb_connection_established) {
1244 58 : log_dbg("Calling user callback");
1245 58 : socket->lsocket->cb_connection_established(socket);
1246 : } else{
1247 0 : log_warn("no callback available");
1248 : }
1249 58 : }
1250 :
1251 :
1252 : //This function is the TCP equivalent of on_buffered_listen_socket_cm_event()
1253 : void
1254 58 : on_buffered_listen_socket_tcp_cm_event(int fd, void* ptr)
1255 : {
1256 58 : int infd;
1257 58 : socklen_t in_len;
1258 58 : struct sockaddr in_addr;
1259 58 : log_info("on_buffered_listen_socket_tcp_cm_event: called with fd = %d", fd);
1260 :
1261 58 : struct netio_buffered_listen_socket* lsocket = (struct netio_buffered_listen_socket*)ptr;
1262 58 : in_len = sizeof in_addr;
1263 58 : infd = accept(fd, &in_addr, &in_len);
1264 :
1265 58 : log_info("on_buffered_listen_socket_tcp_cm_event: listen socket: infd = %d", infd);
1266 58 : if (infd == -1)
1267 : {
1268 0 : if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) // We have processed all incoming connections.
1269 : log_dbg("on_buffered_listen_socket_tcp_cm_event: nothing to do?");
1270 : else
1271 : {
1272 0 : log_dbg("on_buffered_listen_socket_tcp_cm_event: ERROR. errno = %d", errno);
1273 0 : exit(-1);
1274 : }
1275 : }
1276 :
1277 : #if defined DEBUG || defined DEBUG_CM
1278 : char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];
1279 : int ret = getnameinfo(&in_addr, in_len, hbuf, sizeof hbuf, sbuf, sizeof sbuf, 0);
1280 : if (ret == 0) {
1281 : log_dbg("on_buffered_listen_socket_tcp_cm_event: Accepted connection on descriptor %d (host=%s, service=%s)", infd, hbuf, sbuf);
1282 : }
1283 : #endif
1284 :
1285 58 : make_socket_non_blocking(infd); // Make the incoming socket non-blocking
1286 58 : log_dbg("Adding TCP/IP recv socket to listen socket");
1287 58 : struct netio_socket_list* new_entry = add_socket(&(lsocket->listen_socket.recv_sockets), BRECV);
1288 58 : struct netio_buffered_recv_socket* rsocket = (struct netio_buffered_recv_socket*)new_entry->socket;
1289 58 : netio_buffered_recv_tcp_socket_init(rsocket, lsocket);
1290 :
1291 58 : log_dbg("Adding receive socket %d to epoll", infd);
1292 58 : rsocket->recv_socket.eq_ev_ctx.fd = infd;
1293 58 : rsocket->recv_socket.eq_ev_ctx.data = rsocket;
1294 58 : rsocket->recv_socket.eq_ev_ctx.cb = on_buffered_recv_socket_tcp_cm_event;
1295 58 : netio_register_read_tcp_fd(&rsocket->recv_socket.ctx->evloop, &rsocket->recv_socket.eq_ev_ctx);
1296 :
1297 7958 : for(unsigned int i=0; i<rsocket->num_pages; i++) {
1298 7900 : netio_post_recv(&rsocket->recv_socket, &rsocket->pages[i]);
1299 : }
1300 :
1301 58 : on_buffered_recv_socket_tcp_cm_event(infd, (void*)rsocket);
1302 58 : log_dbg("Connection accepted");
1303 58 : }
1304 :
1305 :
1306 : //This is the equivalent of on_recv_socket_cm_event
1307 : void
1308 42 : on_recv_socket_tcp_cm_event(int fd, void *ptr)
1309 : {
1310 42 : log_dbg("on_recv_socket_tcp_cm_event: called with fd = %d and ptr = %p", fd, ptr);
1311 42 : struct netio_recv_socket *socket = (struct netio_recv_socket*)ptr;
1312 42 : socket->cq_ev_ctx.fd = fd;
1313 42 : socket->cq_ev_ctx.data = socket;
1314 42 : socket->cq_ev_ctx.cb = on_recv_socket_tcp_cq_event;
1315 42 : netio_register_read_tcp_fd(&socket->ctx->evloop, &socket->cq_ev_ctx);
1316 42 : if(socket->lsocket->cb_connection_established)
1317 : {
1318 42 : log_dbg("Calling user callback");
1319 42 : socket->lsocket->cb_connection_established(socket);
1320 : } else {
1321 0 : log_warn("no callback available");
1322 : }
1323 42 : }
|