Line data Source code
1 : #include <stdio.h>
2 : #include <unistd.h>
3 : #include "log.h"
4 : #include "netio/netio.h"
5 : #include "connection_event.h"
6 : #include "completion_event.h"
7 :
8 : #if defined DEBUG || defined DEBUG_CM
9 : #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
10 : #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
11 : #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
12 : #else
13 : #define log_dbg(...)
14 : #define log_trc(...)
15 : #endif
16 :
17 : #define FATAL(msg, c) \
18 : do { \
19 : log_fatal("%s %d: %s", msg, c, fi_strerror(-c)); \
20 : exit(2); \
21 : } while(0);
22 :
23 :
24 : // STATIC FUNCTIONS ////////////////////////////////////////////////////////////
25 :
26 : static int
27 4346 : read_cm_event(struct fid_eq* eq, struct fi_info** info, struct fi_eq_err_entry* err_entry)
28 : {
29 4346 : uint32_t event;
30 4346 : struct fi_eq_cm_entry entry;
31 :
32 4346 : ssize_t rd = fi_eq_sread(eq, &event, &entry, sizeof entry, 0, 0);
33 4346 : if(rd < 0)
34 : {
35 1333 : if(rd == -FI_EAGAIN)
36 : {
37 : return rd;
38 : }
39 215 : if(rd == -FI_EAVAIL)
40 : {
41 215 : int r;
42 215 : if((r = fi_eq_readerr(eq, err_entry, 0)) < 0)
43 : {
44 0 : FATAL("Failed to retrieve details on Event Queue error", r);
45 : }
46 215 : log_error("Event Queue error: %s (code: %d), provider specific: %s (code: %d)",
47 : fi_strerror(err_entry->err), err_entry->err,
48 : fi_eq_strerror(eq, err_entry->prov_errno, err_entry->err_data, NULL, 0),
49 : err_entry->prov_errno);
50 215 : return rd;
51 : }
52 : }
53 3013 : if (rd != sizeof entry)
54 : {
55 0 : FATAL("Failed to read from Event Queue: %d", (int)rd);
56 : }
57 :
58 3013 : if(info != NULL)
59 463 : *info = entry.info;
60 :
61 3013 : return event;
62 : }
63 :
64 :
65 : static void
66 463 : handle_connreq(struct netio_recv_socket* rsocket, struct netio_listen_socket* lsocket, struct fi_info *info, void (*cb)(int,void*), void* cbdata)
67 : {
68 463 : int ret;
69 463 : struct fi_eq_attr eq_attr;
70 463 : eq_attr.wait_obj = FI_WAIT_FD;
71 :
72 463 : if((ret = fi_domain(lsocket->fabric, info, &rsocket->domain, NULL)))
73 : {
74 0 : fi_reject(lsocket->pep, info->handle, NULL, 0);
75 0 : FATAL("Listen socket, cannot open fabric, error ", ret);
76 : }
77 :
78 463 : if((ret = fi_endpoint(rsocket->domain, info, &rsocket->ep, NULL)))
79 : {
80 0 : fi_reject(lsocket->pep, info->handle, NULL, 0);
81 0 : FATAL("Listen socket, cannot open endpoint, error ", ret);
82 : }
83 :
84 : /* Create a new event queue for the new active socket */
85 463 : if((ret = fi_eq_open(lsocket->fabric, &eq_attr, &rsocket->eq, NULL)))
86 : {
87 0 : fi_reject(lsocket->pep, info->handle, NULL, 0);
88 0 : FATAL("Listen socket, cannot open Event Queue, error ", ret);
89 : }
90 :
91 463 : if((ret = fi_ep_bind((rsocket->ep), &rsocket->eq->fid, 0)))
92 : {
93 0 : fi_reject(lsocket->pep, info->handle, NULL, 0);
94 0 : FATAL("Listen socket, cannot bind Event Queue to endpoint, error ", ret);
95 : }
96 :
97 463 : if((ret = fi_control(&rsocket->eq->fid, FI_GETWAIT, &rsocket->eqfd)))
98 : {
99 0 : fi_reject(lsocket->pep, info->handle, NULL, 0);
100 0 : FATAL("Listen socket failed to obtain Event Queue wait object", ret);
101 : }
102 463 : rsocket->eq_ev_ctx.fd = rsocket->eqfd;
103 463 : rsocket->eq_ev_ctx.data = cbdata;
104 463 : rsocket->eq_ev_ctx.cb = cb;
105 :
106 463 : log_dbg("Adding RECV EQ polled fid %d %p", rsocket->eqfd, &rsocket->eq->fid);
107 463 : add_polled_fid(&rsocket->ctx->evloop.pfids, lsocket->fabric, &rsocket->eq->fid, rsocket->eqfd, &rsocket, cb);
108 463 : add_open_fd(&rsocket->ctx->evloop.openfds, rsocket->eqfd, NETIO_EQ, URECV, &rsocket);
109 463 : netio_register_read_fd(&rsocket->ctx->evloop, &rsocket->eq_ev_ctx);
110 463 : log_dbg("recv_socket: EQ fd %d waiting for connection", rsocket->eqfd);
111 :
112 463 : struct fi_cq_attr cq_attr;
113 463 : cq_attr.size = NETIO_MAX_CQ_ENTRIES; /* # entries for CQ */
114 463 : cq_attr.flags = 0; /* operation flags */
115 463 : cq_attr.format = FI_CQ_FORMAT_DATA; /* completion format */
116 463 : cq_attr.wait_obj= FI_WAIT_FD; /* requested wait object */
117 463 : cq_attr.signaling_vector = 0; /* interrupt affinity */
118 463 : cq_attr.wait_cond = FI_CQ_COND_NONE; /* wait condition format */
119 463 : cq_attr.wait_set = NULL; /* optional wait set */
120 :
121 : //FI_RECV CQ
122 463 : if((ret = fi_cq_open(rsocket->domain, &cq_attr, &rsocket->cq, NULL)))
123 : {
124 0 : fi_reject(lsocket->pep, info->handle, NULL, 0);
125 0 : FATAL("Listen socket, cannot open Completion Queue, error ", ret);
126 : }
127 :
128 463 : if((ret = fi_ep_bind((rsocket->ep), &rsocket->cq->fid, FI_RECV)))
129 : {
130 0 : fi_reject(lsocket->pep, info->handle, NULL, 0);
131 0 : FATAL("Listen socket, cannot bind Completion Queue to endpoint, error ", ret);;
132 : }
133 :
134 : //FI_TRANSMIT CQ - also necessary
135 463 : cq_attr.format = FI_CQ_FORMAT_UNSPEC;
136 463 : cq_attr.wait_obj= FI_WAIT_NONE;
137 463 : if((ret = fi_cq_open(rsocket->domain, &cq_attr, &rsocket->tcq, NULL)))
138 : {
139 0 : fi_reject(lsocket->pep, info->handle, NULL, 0);
140 0 : FATAL("Listen socket, cannot open Completion Queue, error ", ret);
141 : }
142 :
143 463 : if((ret = fi_ep_bind((rsocket->ep), &rsocket->tcq->fid, FI_TRANSMIT)))
144 : {
145 0 : fi_reject(lsocket->pep, info->handle, NULL, 0);
146 0 : FATAL("Listen socket, cannot bind Completion Queue to endpoint, error ", ret);;
147 : }
148 :
149 463 : if((ret = fi_enable(rsocket->ep)))
150 : {
151 0 : fi_reject(lsocket->pep, info->handle, NULL, 0);
152 0 : FATAL("Listen socket, cannot enable recv socket endpoint, error ", ret);
153 : }
154 :
155 463 : if((ret = fi_accept(rsocket->ep, NULL, 0)))
156 : {
157 0 : fi_reject(lsocket->pep, info->handle, NULL, 0);
158 0 : FATAL("Listen socket, connection rejected, error ", ret);;
159 : }
160 463 : log_dbg("connection accepted. Lsocket EQ: %d with evloop %d, rsocket EQ %d with evloop %d", lsocket->eqfd, lsocket->ctx->evloop.epollfd, rsocket->eqfd, rsocket->ctx->evloop.epollfd);
161 463 : }
162 :
163 :
164 : void
165 350 : handle_recv_socket_shutdown(struct netio_recv_socket* socket)
166 : {
167 350 : if(socket->eqfd < 0){
168 0 : log_info("handle_recv_socket_shutdown on closed socket (eqfd %d)", socket->eqfd);
169 0 : return;
170 : }
171 350 : int ret = 0;
172 350 : struct epoll_event ep_event; /* needed only for kernel <2.6.9 */
173 350 : log_dbg("Handle_recv_socket_shutdown for socket %p, evloop: %d", socket, socket->ctx->evloop.epollfd);
174 :
175 350 : if(socket->ep && (ret = fi_close(&socket->ep->fid)))
176 : {
177 0 : log_error("Failed to close recv socket Endpoint %d: %s", ret, fi_strerror(-ret));
178 : }
179 350 : log_dbg("fi_close done for endpoint.");
180 :
181 : //EQ
182 350 : remove_polled_fid(&socket->ctx->evloop.pfids, socket->eq_ev_ctx.fd);
183 350 : ret = epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->eq_ev_ctx.fd, &ep_event);
184 350 : if ( ret ){ log_warn("netio_recv_socket: cannot remove EQ fd %d from evloop %d: %s", socket->eq_ev_ctx.fd, socket->ctx->evloop.epollfd, strerror(errno));}
185 350 : log_dbg("netio_recv_socket: removing EQ fd %d from evloop %d", socket->eq_ev_ctx.fd, socket->ctx->evloop.epollfd);
186 350 : if(socket->eq && (ret = fi_close(&socket->eq->fid)))
187 : {
188 0 : log_error("Failed to fi_close recv socket Event Queue %d: %s", ret, fi_strerror(-ret));
189 0 : ret = close(socket->eq_ev_ctx.fd);
190 0 : if ( ret ) {log_warn("Cannot close recv socket EQ fd %d: %s", socket->eq_ev_ctx.fd, strerror(errno));}
191 : }
192 350 : remove_open_fd(&socket->ctx->evloop, socket->eq_ev_ctx.fd);
193 350 : socket->eqfd = -1;
194 :
195 : //FI_RECV CQ
196 350 : remove_polled_fid(&socket->ctx->evloop.pfids, socket->cq_ev_ctx.fd);
197 350 : ret = epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->cq_ev_ctx.fd, &ep_event);
198 350 : if ( ret ){ log_warn("netio_recv_socket: cannot remove CQ fd %d from evloop %d: %s", socket->cq_ev_ctx.fd, socket->ctx->evloop.epollfd, strerror(errno));}
199 350 : log_dbg("netio_recv_socket: removing CQ fd %d from evloop %d", socket->cq_ev_ctx.fd, socket->ctx->evloop.epollfd);
200 350 : if(socket->cq && (ret = fi_close(&socket->cq->fid)))
201 : {
202 0 : log_error("Failed to close recv socket Completion Queue %d: %s", ret, fi_strerror(-ret));
203 0 : ret = close(socket->cq_ev_ctx.fd);
204 0 : if ( ret ) {log_warn("Cannot close recv socket CQ fd %d: %s", socket->cq_ev_ctx.fd, strerror(errno));}
205 : }
206 350 : remove_open_fd(&socket->ctx->evloop, socket->cq_ev_ctx.fd);
207 350 : socket->cqfd = -1;
208 : //FI_TRANSMIT CQ
209 350 : if((ret = fi_close(&socket->tcq->fid)))
210 : {
211 0 : log_error("Failed to close FI_TRANSMIT recv socket Completion Queue %d: %s", ret, fi_strerror(-ret));
212 : }
213 :
214 : //MR
215 350 : uint16_t mem_regions = socket->reg_mr;
216 77842 : for(uint16_t i=0; i<mem_regions; ++i)
217 : {
218 77492 : if ((ret = fi_close(socket->mr[i]))){
219 0 : log_warn("Failed to close recv Memory Region %d, error %d.", i, ret);
220 : }
221 : else{
222 77492 : socket->reg_mr-=1;
223 : }
224 : }
225 350 : if(socket->reg_mr==0){free(socket->mr);socket->mr=NULL;}
226 350 : if((ret = fi_close(&socket->domain->fid)))
227 : {
228 0 : log_error("Failed to close recv socket Domain %d: %s", ret, fi_strerror(-ret));
229 : }
230 350 : if (socket->sub_msg_buffers != NULL){
231 1881 : for(int i = 0; i < 32; i++){
232 1824 : free(socket->sub_msg_buffers[i]->data);
233 1824 : free(socket->sub_msg_buffers[i]);
234 : }
235 57 : free(socket->sub_msg_buffers);
236 57 : socket->sub_msg_buffers = NULL;
237 : }
238 : }
239 :
240 : void
241 786 : handle_send_socket_shutdown(struct netio_send_socket* socket)
242 : {
243 : // if(socket->state != CONNECTED){
244 : // log_dbg("Nothing to do, send socket has already been freed....");
245 : // return;
246 : // }
247 786 : int ret = 0;
248 786 : struct epoll_event ep_event; /* needed only for kernel <2.6.9 */
249 786 : log_dbg("Handle_send_socket_shutdown. Socket with EQ: %d", socket->eqfd);
250 :
251 786 : if(socket->ep && (ret = fi_close(&socket->ep->fid)))
252 : {
253 0 : log_error("Failed to close send socket Endpoint %d: %s", ret, fi_strerror(-ret));
254 : }
255 786 : socket->ep = NULL;
256 :
257 : //EQ
258 786 : log_dbg("netio_send_socket: removing EQ fd %d from evloop %d", socket->eqfd, socket->ctx->evloop.epollfd);
259 786 : remove_polled_fid(&socket->ctx->evloop.pfids, socket->eqfd);
260 786 : ret = epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->eqfd, &ep_event);
261 786 : if ( ret ){ log_warn("netio_send_socket: cannot remove EQ fd %d from evloop %d: %s", socket->eqfd, socket->ctx->evloop.epollfd, strerror(errno));}
262 786 : if(socket->eq && (ret = fi_close(&socket->eq->fid)))
263 : {
264 0 : log_error("Failed to close send socket Event Queue %d: %s", ret, fi_strerror(-ret));
265 0 : ret = close(socket->eq_ev_ctx.fd);
266 0 : if ( ret ) { log_warn("Cannot close send socket eq fd %d, %s", socket->eq_ev_ctx.fd, strerror(errno));}
267 :
268 : }
269 786 : remove_open_fd(&socket->ctx->evloop, socket->eqfd);
270 786 : socket->eqfd = -1;
271 :
272 : //FI_TRANSMIT CQ
273 786 : log_dbg("netio_send_socket: removing CQ fd %d from evloop %d", socket->cq_ev_ctx.fd, socket->ctx->evloop.epollfd);
274 786 : remove_polled_fid(&socket->ctx->evloop.pfids, socket->cqfd);
275 786 : ret = epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->cqfd, &ep_event);
276 786 : if ( ret ){ log_warn("netio_send_socket: cannot remove CQ fd %d from evloop %d: %s", socket->cqfd, socket->ctx->evloop.epollfd, strerror(errno));}
277 786 : if(socket->cq && (ret = fi_close(&socket->cq->fid)))
278 : {
279 0 : log_error("Failed to close send socket Completion Queue %d: %s", ret, fi_strerror(-ret));
280 0 : ret = close(socket->cq_ev_ctx.fd);
281 0 : if ( ret ){ log_warn("Could not close send socket cq fd %d: %s", socket->cq_ev_ctx.fd, strerror(errno));}
282 : }
283 786 : remove_open_fd(&socket->ctx->evloop, socket->cqfd);
284 786 : socket->cqfd = -1;
285 : //FI_RECV CQ
286 786 : if((ret = fi_close(&socket->rcq->fid)))
287 : {
288 0 : log_error("Failed to close FI_RECV senf socket Completion Queue %d: %s", ret, fi_strerror(-ret));
289 : }
290 :
291 786 : socket->domain->nb_sockets -= 1;
292 786 : if (socket->domain->nb_sockets == 0) {
293 79377 : for(unsigned int i=0; i < socket->domain->reg_mr; ++i) {
294 78592 : if ((ret = fi_close(socket->domain->mr[i]))) {
295 78592 : log_warn("Failed to close send Memory Region %d, error %d.", i, ret);;
296 : }
297 : }
298 785 : if ((ret = fi_close(&socket->domain->domain->fid))) {
299 0 : log_error("Failed to close send socket Domain %d: %s", ret, fi_strerror(-ret));
300 : }
301 :
302 785 : if(socket->ctx->evloop.pfids.count == 0){
303 641 : if ((ret = fi_close(&socket->domain->fabric->fid))) {
304 0 : log_error("Failed to close send socket Fabric %d: %s", ret, fi_strerror(-ret));
305 : }
306 : }
307 785 : free(socket->domain->mr);
308 785 : socket->domain->mr = NULL;
309 785 : free(socket->domain);
310 785 : socket->domain = NULL;
311 : }
312 786 : if(socket->fi != NULL){
313 786 : fi_freeinfo(socket->fi);
314 786 : socket->fi = NULL;
315 : }
316 786 : socket->state = DISCONNECTED;
317 786 : }
318 :
319 :
320 : static void
321 215 : handle_send_socket_shutdown_on_connetion_refused(struct netio_send_socket* socket)
322 : {
323 215 : int ret = 0;
324 : //struct fi_eq_err_entry err_entry;
325 215 : struct epoll_event ep_event; /* needed only for kernel <2.6.9 */
326 215 : if(socket->ep && (ret = fi_close(&socket->ep->fid)))
327 : {
328 0 : log_error("Failed to close send socket endpoint %d: %s", ret, fi_strerror(-ret));
329 : }
330 :
331 : //EQ
332 215 : remove_polled_fid(&socket->ctx->evloop.pfids, socket->eqfd);
333 215 : ret = epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->eqfd, &ep_event);
334 215 : if (ret) {log_warn("Conn refused: cannot deregister send_socket EQ fd %d from evloop %d, %s", socket->eqfd, socket->ctx->evloop.epollfd, strerror(errno));}
335 215 : if(socket->eq && (ret = fi_close(&socket->eq->fid)))
336 : {
337 0 : log_error("Failed to close send socket Event Queue %d: %s", ret, fi_strerror(-ret));
338 0 : ret = close(socket->eq_ev_ctx.fd);
339 0 : if ( ret ) { log_warn("Cannot close send socket eq fd %d, %s", socket->eq_ev_ctx.fd, strerror(errno));}
340 : }
341 215 : remove_open_fd(&socket->ctx->evloop, socket->eqfd);
342 215 : socket->eqfd = -1;
343 :
344 : //FI_TRANSMIT CQ - wait object not retrieved yet
345 215 : if(socket->cq && (ret = fi_close(&socket->cq->fid)))
346 : {
347 0 : log_error("Failed to close FI_TRANSMIT send socket Completion Queue %d: %s", ret, fi_strerror(-ret));
348 : }
349 : //FI_RECV CQ
350 215 : if(fi_close(&socket->rcq->fid))
351 : {
352 0 : log_error("Failed to close FI_RECV send socket Completion Queue %d: %s", ret, fi_strerror(-ret));
353 : }
354 :
355 215 : socket->domain->nb_sockets -= 1;
356 215 : if (socket->domain->nb_sockets == 0) {
357 261 : for(unsigned int i=0; i < socket->domain->reg_mr; ++i) {
358 46 : if ((ret = fi_close(socket->domain->mr[i]))) {
359 0 : log_warn("Failed to close send Memory Region %d, error %d.", i, ret);
360 : }
361 : }
362 215 : if ((ret = fi_close(&socket->domain->domain->fid))) {
363 0 : log_error("Failed to close send socket Domain %d: %s", ret, fi_strerror(-ret));
364 : }
365 215 : if(socket->ctx->evloop.pfids.count == 0){
366 157 : if ((ret = fi_close(&socket->domain->fabric->fid))) {
367 0 : log_error("Failed to close send socket Fabric %d: %s", ret, fi_strerror(-ret));
368 : }
369 : }
370 215 : free(socket->domain->mr);
371 215 : socket->domain->mr = NULL;
372 215 : free(socket->domain);
373 215 : socket->domain = NULL;
374 :
375 : }
376 215 : socket->state = UNCONNECTED;
377 215 : }
378 :
379 : // WAIT OBJECT CALLBACKS ///////////////////////////////////////////////////////////
380 :
381 : void
382 3070 : on_send_socket_cm_event(int fd, void* ptr)
383 : {
384 3070 : struct netio_send_socket* socket = (struct netio_send_socket*)ptr;
385 3070 : struct fi_eq_err_entry err_entry;
386 3070 : log_dbg("send socket %p fd %d: connection event, evloop: %d", socket, fd, socket->ctx->evloop.epollfd);
387 :
388 5022 : if(socket->state == DISCONNECTED){return;} //Check if event queue was already closed. TODO: Need to fix receiving messages after closing
389 3070 : int event = read_cm_event(socket->eq, NULL, &err_entry);
390 3070 : int ret = 0;
391 :
392 3070 : log_dbg("event %d", event);
393 :
394 3070 : switch(event)
395 : {
396 788 : case FI_SHUTDOWN:
397 788 : if (socket->eqfd < 0 ){
398 0 : log_info("Ignoring FI_SHUTDOWN on send_socket, invalid eqfd (socket already closed)");
399 1952 : return;
400 : }
401 788 : log_info("fi_verbs_process_send_socket_cm_event: FI_SHUTDOWN");
402 788 : if(socket->cb_internal_connection_closed){
403 318 : socket->cb_internal_connection_closed(socket);
404 : }
405 788 : if(socket->cb_connection_closed) {
406 788 : socket->cb_connection_closed(socket);
407 : }
408 : return;
409 :
410 949 : case FI_CONNECTED:
411 949 : socket->cqfd = -1;
412 949 : if((ret = fi_control(&socket->cq->fid, FI_GETWAIT, &socket->cqfd)))
413 : {
414 0 : FATAL("Failed to retrieve wait object for send socket Completion Queue", ret);
415 : }
416 :
417 949 : socket->cq_ev_ctx.fd = socket->cqfd;
418 949 : socket->cq_ev_ctx.data = socket;
419 949 : socket->cq_ev_ctx.cb = on_send_socket_cq_event;
420 :
421 949 : log_dbg("send_socket: EQ fd %d connected, CQ fd %d", socket->eqfd, socket->cqfd);
422 949 : log_dbg("Adding SEND CQ polled fid %d %p", socket->cqfd, &socket->cq->fid);
423 949 : add_polled_fid(&socket->ctx->evloop.pfids, socket->domain->fabric, &socket->cq->fid, socket->cqfd, socket, on_send_socket_cq_event);
424 949 : add_open_fd(&socket->ctx->evloop.openfds, socket->cqfd, NETIO_CQ, USEND, socket);
425 949 : netio_register_read_fd(&socket->ctx->evloop, &socket->cq_ev_ctx);
426 :
427 949 : socket->state = CONNECTED;
428 :
429 949 : if(socket->cb_connection_established) {
430 949 : socket->cb_connection_established(socket);
431 : }
432 :
433 : return;
434 :
435 : case FI_MR_COMPLETE:
436 : case FI_AV_COMPLETE:
437 : case FI_JOIN_COMPLETE:
438 : // Not implemented
439 1118 : break;
440 :
441 :
442 215 : case -FI_EAVAIL:
443 :
444 215 : switch(err_entry.err) {
445 :
446 215 : case FI_ECONNREFUSED:
447 :
448 215 : log_dbg("Connection refused (FI_ECONNREFUSED). Deallocating send_socket resources.");
449 :
450 215 : if(socket->eqfd < 0){
451 : log_dbg("FI_ECONNREFUSED on send socket with EQ fd %d. Not clearing it.", socket->eqfd);
452 : return;
453 : }
454 :
455 215 : handle_send_socket_shutdown_on_connetion_refused(socket);
456 :
457 215 : if(socket->cb_error_connection_refused) {
458 215 : socket->cb_error_connection_refused(socket);
459 : }
460 :
461 : case FI_ETIMEDOUT:
462 215 : log_info("fi_verbs_process_send_socket_cm_event: FI_ETIMEDOUT");
463 215 : if (socket->eqfd < 0 ){
464 215 : log_info("Ignoring FI_SHUTDOWN on send_socket, invalid eqfd (socket already closed)");
465 215 : break;
466 : }
467 :
468 : // Need to take care of receive socket as well
469 0 : if (socket->recv_socket != NULL){
470 0 : if (socket->recv_socket->eqfd < 0 ){
471 0 : log_info("Ignoring FI_ETIMEDOUT on recv_socket, invalid eqfd (socket already closed)");
472 0 : return;
473 : }
474 0 : log_info("Shutting down receive socket on FI_ETIMEDOUT");
475 0 : handle_recv_socket_shutdown(socket->recv_socket);
476 0 : if(socket->recv_socket->lsocket->cb_connection_closed) {
477 0 : socket->recv_socket->lsocket->cb_connection_closed(socket->recv_socket);
478 : }
479 : }
480 :
481 0 : if(socket->cqfd < 0){ //cq not initalized yet
482 0 : handle_send_socket_shutdown_on_connetion_refused(socket);
483 : } else {
484 0 : if(socket->cb_internal_connection_closed){
485 0 : socket->cb_internal_connection_closed(socket);
486 : }
487 0 : if(socket->cb_connection_closed) {
488 0 : socket->cb_connection_closed(socket);
489 : }
490 : }
491 : break;
492 :
493 0 : default:
494 :
495 0 : log_error("Unhandled error in the Event Queue: %s (code: %d), provider specific: %s (code: %d)",
496 : fi_strerror(err_entry.err), err_entry.err,
497 : fi_eq_strerror(socket->eq, err_entry.prov_errno, err_entry.err_data, NULL, 0),
498 : err_entry.prov_errno);
499 : }
500 : return;
501 :
502 1118 : case -FI_EAGAIN:
503 1118 : struct fid* fp = &socket->eq->fid;
504 1118 : fi_trywait(socket->domain->fabric, &fp, 1);
505 1118 : break;
506 :
507 0 : default:
508 0 : log_error("Unexpected event %d in send socket Event Queue", event);
509 0 : exit(2);
510 : }
511 : }
512 :
513 :
514 : void
515 211 : on_listen_socket_cm_event(int fd, void* ptr)
516 : {
517 211 : struct netio_listen_socket* lsocket = (struct netio_listen_socket*)ptr;
518 211 : log_dbg("listen socket: connection event");
519 :
520 211 : if(lsocket->eqfd != fd){
521 0 : log_dbg("Listen socket CM event: inconsistend fd. Ignoring event.");
522 0 : return;
523 : }
524 :
525 :
526 211 : struct fi_info *info = NULL;
527 211 : struct fi_eq_err_entry err_entry;
528 211 : int event = read_cm_event(lsocket->eq, &info, &err_entry);
529 :
530 :
531 211 : switch (event)
532 : {
533 211 : case FI_CONNREQ:
534 211 : log_dbg("fi_verbs_process_listen_socket_cm_event: FI_CONNREQ");
535 211 : struct netio_socket_list* new_entry = add_socket(&lsocket->recv_sockets, URECV);
536 211 : struct netio_recv_socket* rsocket = (struct netio_recv_socket*)new_entry->socket;
537 211 : netio_init_recv_socket(rsocket, lsocket);
538 211 : handle_connreq(rsocket, lsocket, info, on_recv_socket_cm_event, rsocket);
539 211 : if(lsocket->recv_sub_msg == 1){
540 77 : rsocket->sub_msg_buffers = malloc(32*sizeof(struct netio_buffer*));
541 2541 : for (int i = 0; i < 32; i++){
542 2464 : rsocket->sub_msg_buffers[i] = malloc(sizeof(struct netio_buffer));
543 2464 : rsocket->sub_msg_buffers[i]->size = sizeof(struct netio_subscription_message);
544 2464 : rsocket->sub_msg_buffers[i]->data = malloc(rsocket->sub_msg_buffers[i]->size);
545 2464 : netio_register_recv_buffer(rsocket, rsocket->sub_msg_buffers[i], 0);
546 2464 : netio_post_recv(rsocket, rsocket->sub_msg_buffers[i]);
547 : }
548 : log_dbg("Posted recv for subscription messages");
549 134 : } else if (lsocket->attr.num_buffers > 0) {
550 134 : log_dbg("Connection established, posting %lu buffers", lsocket->attr.num_buffers);
551 134 : rsocket->sub_msg_buffers = NULL;
552 134 : rsocket->recv_buffers = malloc(lsocket->attr.num_buffers * sizeof(struct netio_buffer));
553 30358 : for(unsigned i=0; i<lsocket->attr.num_buffers; i++) {
554 30224 : log_trc("registering buffer of size %lu", lsocket->attr.buffer_size);
555 30224 : rsocket->recv_buffers[i].size = lsocket->attr.buffer_size;
556 30224 : rsocket->recv_buffers[i].data = malloc(lsocket->attr.buffer_size);
557 30224 : netio_register_recv_buffer(rsocket, &rsocket->recv_buffers[i], 0);
558 30224 : netio_post_recv(rsocket, &rsocket->recv_buffers[i]);
559 : }
560 : } else {
561 0 : log_error("Something went wrong. Not allocating any buffers for recv socket.");
562 : }
563 : break;
564 :
565 0 : case FI_CONNECTED:
566 0 : log_fatal("FI_CONNECTED received on listen socket");
567 0 : exit(2);
568 :
569 0 : case FI_SHUTDOWN:
570 0 : log_fatal("FI_SHUTDOWN received on listen socket");
571 0 : exit(2);
572 :
573 0 : case -FI_EAGAIN:
574 0 : struct fid* fp = &lsocket->eq->fid;
575 0 : fi_trywait(lsocket->fabric, &fp, 1);
576 0 : break;
577 :
578 0 : case -FI_EAVAIL:
579 0 : log_error("Unhandled error in listen socket EQ code: %d, provider specific code: %d",
580 : err_entry.err, err_entry.prov_errno);
581 0 : break;
582 : }
583 211 : fi_freeinfo(info);
584 : }
585 :
586 :
587 : void
588 252 : on_buffered_listen_socket_cm_event(int fd, void* ptr)
589 : {
590 252 : struct netio_buffered_listen_socket* lsocket = (struct netio_buffered_listen_socket*)ptr;
591 252 : log_dbg("buffered listen socket: connection event");
592 :
593 252 : if(lsocket->listen_socket.eqfd != fd){
594 0 : log_dbg("Buffered listen socket CM event: inconsistend fd. Ignoring event.");
595 0 : return;
596 : }
597 :
598 :
599 252 : struct fi_info *info = NULL;
600 252 : struct fi_eq_err_entry err_entry;
601 252 : int event = read_cm_event(lsocket->listen_socket.eq, &info, &err_entry);
602 :
603 252 : switch (event)
604 : {
605 252 : case FI_CONNREQ:
606 252 : log_dbg("FI_CONNREQ");
607 :
608 252 : struct netio_socket_list* new_entry = add_socket(&(lsocket->listen_socket.recv_sockets), BRECV);
609 252 : struct netio_buffered_recv_socket* rsocket = (struct netio_buffered_recv_socket*)new_entry->socket;
610 252 : netio_buffered_recv_socket_init(rsocket, lsocket);
611 252 : handle_connreq(&rsocket->recv_socket, &lsocket->listen_socket, info, on_buffered_recv_socket_cm_event, rsocket);
612 252 : log_info("fi_verbs_process_recv_socket_cm_event: FI_CONNECTED, posting %d buffers", rsocket->num_pages);
613 71084 : for(unsigned int i=0; i<rsocket->num_pages; i++) {
614 70832 : netio_register_recv_buffer(&rsocket->recv_socket, &rsocket->pages[i], 0);
615 70832 : netio_post_recv(&rsocket->recv_socket, &rsocket->pages[i]);
616 : }
617 : break;
618 :
619 0 : case FI_CONNECTED:
620 0 : log_fatal("FI_CONNECTED received on buffered listen socket");
621 0 : exit(2);
622 :
623 0 : case FI_SHUTDOWN:
624 0 : log_fatal("FI_SHUTDOWN received on buffered listen socket");
625 0 : exit(2);
626 :
627 0 : case -FI_EAGAIN:
628 0 : struct fid* fp = &lsocket->listen_socket.eq->fid;
629 0 : fi_trywait(lsocket->listen_socket.fabric, &fp, 1);
630 0 : break;
631 :
632 0 : case -FI_EAVAIL:
633 0 : log_error("Unhandled error in buffer listen socket EQ code: %d, provider specific code: %d",
634 : err_entry.err, err_entry.prov_errno);
635 0 : break;
636 : }
637 252 : fi_freeinfo(info);
638 : }
639 :
640 :
641 : void
642 374 : on_recv_socket_cm_event(int fd, void* ptr)
643 : {
644 374 : struct netio_recv_socket* socket = (struct netio_recv_socket*)ptr;
645 374 : log_info("recv socket %d: connection event", socket->eqfd);
646 374 : if(socket->eqfd != fd){
647 : log_dbg("Recv socket CM event: inconsistend fd. Ignoring event.");
648 0 : return;
649 : }
650 374 : int ret;
651 374 : struct fi_eq_err_entry err_entry;
652 374 : uint32_t event = read_cm_event(socket->eq, NULL, &err_entry);
653 :
654 374 : switch (event)
655 : {
656 211 : case FI_CONNECTED:
657 211 : log_info("fi_verbs_process_recv_socket_cm_event: FI_CONNECTED");
658 211 : socket->cqfd = -1;
659 211 : if((ret = fi_control(&socket->cq->fid, FI_GETWAIT, &socket->cqfd)))
660 : {
661 0 : FATAL("Failed to retrieve recv socket Completion Queue wait object", ret);
662 : }
663 :
664 211 : socket->cq_ev_ctx.fd = socket->cqfd;
665 211 : socket->cq_ev_ctx.data = socket;
666 211 : socket->cq_ev_ctx.cb = on_recv_socket_cq_event;
667 :
668 211 : log_dbg("Adding recv CQ polled fid %d %p", socket->cqfd, &socket->cq->fid);
669 211 : add_polled_fid(&socket->ctx->evloop.pfids, socket->lsocket->fabric, &socket->cq->fid, socket->cqfd, socket, on_recv_socket_cq_event);
670 211 : add_open_fd(&socket->ctx->evloop.openfds, socket->cqfd, NETIO_CQ, URECV, socket);
671 211 : netio_register_read_fd(&socket->ctx->evloop, &socket->cq_ev_ctx);
672 :
673 211 : log_dbg("recv_socket: EQ fd %d CQ fd %d connected", socket->eqfd, socket->cqfd);
674 211 : if(socket->lsocket->cb_connection_established) {
675 211 : socket->lsocket->cb_connection_established(socket);
676 : }
677 :
678 374 : break;
679 :
680 163 : case FI_SHUTDOWN:
681 163 : if (socket->eqfd < 0 ){
682 0 : log_info("Ignoring FI_SHUTDOWN on recv_socket, invalid eqfd (socket already closed)");
683 0 : return;
684 : }
685 163 : log_info("fi_verbs_process_recv_socket_cm_event: FI_SHUTDOWN");
686 163 : handle_recv_socket_shutdown(socket);
687 163 : if(socket->lsocket->cb_connection_closed) {
688 106 : socket->lsocket->cb_connection_closed(socket);
689 : }
690 :
691 163 : if (socket->lsocket->attr.num_buffers > 0){
692 26222 : for(int i = 0; i < socket->lsocket->attr.num_buffers; i++){
693 26116 : free(socket->recv_buffers[i].data);
694 : }
695 106 : free(socket->recv_buffers);
696 : }
697 :
698 163 : int return_value = remove_socket(&socket->lsocket->recv_sockets, socket);
699 163 : log_info("Recv socket removed, result: %d", return_value);
700 163 : break;
701 :
702 : case FI_MR_COMPLETE:
703 : case FI_AV_COMPLETE:
704 : case FI_JOIN_COMPLETE:
705 : // Not implemented
706 : break;
707 :
708 0 : case -FI_EAGAIN:
709 0 : struct fid* fp = &socket->eq->fid;
710 0 : fi_trywait(socket->lsocket->fabric, &fp, 1);
711 0 : break;
712 :
713 0 : case -FI_EAVAIL:
714 0 : log_error("Unhandled error in recv socket EQ code: %d, provider specific code: %d",
715 : err_entry.err, err_entry.prov_errno);
716 0 : break;
717 :
718 0 : default:
719 0 : log_error("Unexpected event %d in recv socket Event Queue", event);
720 0 : exit(2);
721 374 : break;
722 : }
723 : }
724 :
725 :
726 : void
727 439 : on_buffered_recv_socket_cm_event(int fd, void* ptr)
728 : {
729 439 : struct netio_buffered_recv_socket* socket = (struct netio_buffered_recv_socket*)ptr;
730 439 : log_dbg("buffered recv socket %d: connection event (FD = %d)", socket->recv_socket.eqfd, fd);
731 439 : if(socket->recv_socket.eqfd != fd){
732 : log_dbg("Buffered recv socket CM event: inconsistend fd. Ignoring event.");
733 0 : return;
734 : }
735 439 : int ret;
736 439 : struct fi_eq_err_entry err_entry;
737 439 : uint32_t event = read_cm_event(socket->recv_socket.eq, NULL, &err_entry);
738 :
739 439 : switch (event)
740 : {
741 252 : case FI_CONNECTED:
742 252 : socket->recv_socket.cqfd = -1;
743 252 : if((ret = fi_control(&socket->recv_socket.cq->fid, FI_GETWAIT, &socket->recv_socket.cqfd)))
744 : {
745 0 : FATAL("Failed to retrieve recv socket Completion Queue wait object", ret);
746 : }
747 :
748 252 : socket->recv_socket.cq_ev_ctx.fd = socket->recv_socket.cqfd;
749 252 : socket->recv_socket.cq_ev_ctx.data = &socket->recv_socket;
750 252 : socket->recv_socket.cq_ev_ctx.cb = on_recv_socket_cq_event;
751 :
752 252 : log_dbg("Adding BUFFERED RECV CQ polled fid %d %p", socket->recv_socket.cqfd, &socket->recv_socket.cq->fid);
753 252 : add_open_fd(&socket->recv_socket.ctx->evloop.openfds, socket->recv_socket.cqfd, NETIO_CQ, BRECV, &socket->recv_socket);
754 252 : add_polled_fid(&socket->recv_socket.ctx->evloop.pfids,
755 252 : socket->recv_socket.lsocket->fabric,
756 252 : &socket->recv_socket.cq->fid,
757 : socket->recv_socket.cqfd,
758 : &socket->recv_socket,
759 : on_recv_socket_cq_event);
760 :
761 252 : netio_register_read_fd(&socket->recv_socket.ctx->evloop, &socket->recv_socket.cq_ev_ctx);
762 252 : log_dbg("buffered_recv_socket: registering CQ fd %d", socket->recv_socket.cqfd);
763 :
764 252 : if(socket->lsocket->cb_connection_established) {
765 252 : socket->lsocket->cb_connection_established(socket);
766 : }
767 :
768 439 : break;
769 :
770 187 : case FI_SHUTDOWN:
771 187 : log_dbg("recv socket shutdown");
772 187 : if (socket->recv_socket.eqfd < 0 ){
773 0 : log_dbg("Ignoring FI_SHUTDOWN on buffered recv_socket, invalid eqfd (socket already closed)");
774 0 : return;
775 : }
776 187 : handle_recv_socket_shutdown(&socket->recv_socket);
777 49739 : for(unsigned int i=0; i<socket->num_pages; i++) {
778 49552 : free(socket->pages[i].data);
779 : }
780 187 : free(socket->pages);
781 187 : if(socket->lsocket->cb_connection_closed) {
782 187 : socket->lsocket->cb_connection_closed(socket);
783 : }
784 :
785 187 : remove_socket(&socket->lsocket->listen_socket.recv_sockets, socket);
786 187 : break;
787 :
788 : case FI_MR_COMPLETE:
789 : case FI_AV_COMPLETE:
790 : case FI_JOIN_COMPLETE:
791 : // Not implemented
792 : break;
793 :
794 0 : case -FI_EAGAIN:
795 0 : struct fid* fp = &socket->recv_socket.eq->fid;
796 0 : fi_trywait(socket->lsocket->listen_socket.fabric, &fp, 1);
797 0 : break;
798 :
799 0 : case -FI_EAVAIL:
800 : // error was found
801 0 : log_error("Unhandled error in buffered recv socket EQ code: %d, provider specific code: %d",
802 : err_entry.err, err_entry.prov_errno);
803 0 : break;
804 :
805 0 : default:
806 0 : log_error("Unexpected event %d in buffered recv socket Event Queue", event);
807 0 : exit(2);
808 439 : break;
809 : }
810 : }
811 :
812 :
813 : // CALLBACKS FOR GARBAGE COLLECTION //////////////////////////////////////////////////////////
814 : void
815 0 : close_send_socket(void* ptr)
816 : {
817 0 : log_info("Close_send_socket. Not supported anymore");
818 : // struct signal_data* sd = (struct signal_data*)ptr;
819 : // struct netio_send_socket* socket = (struct netio_send_socket*)sd->ptr;
820 : // handle_send_socket_shutdown(socket);
821 : // //delete the used signal
822 : // netio_signal_close(sd->evloop, sd->signal);
823 : // free(sd->signal);
824 : // free(sd);
825 0 : }
826 :
827 : void
828 0 : close_buffered_send_socket(void *ptr)
829 : {
830 0 : log_info("Closing buffered_send_socket %p. Not supported anymore", ptr);
831 : // struct signal_data* sd = (struct signal_data*)ptr;
832 : // struct netio_buffered_send_socket* socket = (struct netio_buffered_send_socket*)sd->ptr;
833 : // struct netio_send_socket* ssocket = &socket->send_socket;
834 : // netio_timer_close(&(ssocket->ctx->evloop), &socket->flush_timer);
835 : // netio_signal_close(&(ssocket->ctx->evloop), &socket->signal_buffer_available);
836 : // handle_send_socket_shutdown(ssocket);
837 : // for(size_t i=0; i < socket->buffers.num_buffers; ++i ){
838 : // free(socket->buffers.buffers[i]);
839 : // }
840 : // //delete the used signal
841 : // netio_signal_close(sd->evloop, sd->signal);
842 : // free(sd->signal);
843 : // free(sd);
844 0 : }
845 :
846 : void
847 0 : close_recv_socket(void* ptr)
848 : {
849 0 : log_info("Closing recv_socket %p. Not supported anymore", ptr);
850 : // struct signal_data* sd = (struct signal_data*)ptr;
851 : // struct netio_recv_socket* socket = (struct netio_recv_socket*)sd->ptr;
852 : // handle_recv_socket_shutdown(socket);
853 : // //clean up signal
854 : // netio_signal_close(sd->evloop, sd->signal);
855 : // free(sd->signal);
856 : // free(sd);
857 0 : }
858 :
859 : void
860 0 : close_buffered_recv_socket(void* ptr)
861 : {
862 0 : log_info("Closing buffered_recv_socket %p. Not supported anymore", ptr);
863 : // struct signal_data* sd = (struct signal_data*)ptr;
864 : // struct netio_buffered_recv_socket* socket = (struct netio_buffered_recv_socket*)sd->ptr;
865 : // handle_recv_socket_shutdown(&socket->recv_socket);
866 : // for(unsigned int i=0; i<socket->num_pages; i++) {
867 : // free(socket->pages[i].data);
868 : // }
869 : // free(socket->pages);
870 : // //clean up signal
871 : // netio_signal_close(sd->evloop, sd->signal);
872 : // free(sd->signal);
873 : // free(sd);
874 0 : }
875 :
876 :
877 : void
878 82 : close_listen_socket(void* ptr)
879 : {
880 82 : log_dbg("close_listen_socket");
881 82 : struct signal_data* sd = (struct signal_data*)ptr;
882 82 : struct netio_listen_socket* socket = (struct netio_listen_socket*)sd->ptr;
883 82 : if(socket->recv_sockets != NULL){
884 : struct netio_socket_list* entry = socket->recv_sockets;
885 : int still_open = 0;
886 0 : while(entry != NULL){
887 0 : struct netio_recv_socket* recv_socket = entry->socket;
888 0 : still_open += check_open_fd_exists(&socket->ctx->evloop.openfds, recv_socket->eqfd);
889 0 : entry = entry->next;
890 : }
891 0 : if(still_open){
892 0 : netio_signal_fire(sd->signal);
893 0 : return;
894 : }
895 0 : free_socket_list(&socket->recv_sockets);
896 : }
897 82 : handle_listen_socket_shutdown(socket);
898 : //clean up signal
899 82 : netio_signal_close(sd->evloop, sd->signal);
900 82 : free(sd->signal);
901 82 : free(sd);
902 : }
903 :
904 :
905 : void
906 136 : close_buffered_listen_socket(void* ptr)
907 : {
908 136 : log_dbg("close_buffered_listen_socket");
909 136 : struct signal_data* sd = (struct signal_data*)ptr;
910 136 : struct netio_listen_socket* socket = (struct netio_listen_socket*)sd->ptr;
911 136 : if(socket->recv_sockets != NULL){
912 : struct netio_socket_list* entry = socket->recv_sockets;
913 : int still_open = 0;
914 0 : while(entry != NULL){
915 0 : struct netio_buffered_recv_socket* recv_socket = entry->socket;
916 0 : still_open += check_open_fd_exists(&socket->ctx->evloop.openfds, recv_socket->recv_socket.eqfd);
917 0 : entry = entry->next;
918 : }
919 0 : if(still_open){
920 0 : netio_signal_fire(sd->signal);
921 0 : return;
922 : }
923 0 : free_socket_list(&socket->recv_sockets);
924 : }
925 136 : handle_listen_socket_shutdown(socket);
926 : //clean up signal
927 136 : netio_signal_close(sd->evloop, sd->signal);
928 136 : free(sd->signal);
929 136 : free(sd);
930 : }
931 :
932 :
933 : void
934 327 : handle_listen_socket_shutdown(struct netio_listen_socket* socket)
935 : {
936 327 : if(socket->eqfd < 0){return;}//nothing to do
937 327 : struct epoll_event ep_event; /* needed only for kernel <2.6.9 */
938 327 : log_dbg("Handle_listen_socket_shutdown. Lsocket EQ: %d with evloop %d socket %p", socket->eqfd, socket->ctx->evloop.epollfd, socket);
939 327 : int ret = epoll_ctl(socket->ctx->evloop.epollfd, EPOLL_CTL_DEL, socket->eqfd, &ep_event);
940 327 : if(ret){ log_warn("Cannot deregister listen socket EQ %d from evloop %d", socket->eqfd, socket->ctx->evloop.epollfd); }
941 :
942 327 : if(socket->pep != NULL){
943 327 : fi_close(&socket->pep->fid);
944 327 : socket->pep = NULL;
945 : }
946 :
947 327 : log_dbg("netio_listen_socket: removing EQ fd %d from evloop %d, ret %d", socket->eqfd, socket->ctx->evloop.epollfd, ret);
948 327 : if(socket->eq && (ret = fi_close(&socket->eq->fid)))
949 : {
950 0 : log_error("Failed to close listen socket %d: %s", ret, fi_strerror(-ret));
951 0 : ret = close(socket->eq_ev_ctx.fd);
952 0 : if ( ret ) {log_warn("Cannot close listen socket EQ fd %d: %s", socket->eq_ev_ctx.fd, strerror(errno));}
953 : }
954 327 : remove_open_fd(&socket->ctx->evloop, socket->eqfd);
955 327 : socket->eqfd = -1;
956 :
957 327 : if(socket->fi != NULL){
958 327 : fi_freeinfo(socket->fi);
959 327 : socket->fi = NULL;
960 : }
961 : //fi_close(&socket->fabric->fid);
962 : }
|