Line data Source code
1 : #include <unistd.h>
2 : #include <stdio.h>
3 : #include <sys/uio.h>
4 : #include <string.h>
5 : #include "netio/netio.h"
6 : #include "netio/netio_tcp.h"
7 : #include "connection_event.h"
8 : #include "completion_event.h"
9 : #include <stdlib.h>
10 : #include <sys/types.h>
11 : #include <errno.h>
12 : #include <netdb.h>
13 : #include <fcntl.h>
14 : #include <sys/socket.h>
15 : #include <netinet/in.h>
16 : #include <arpa/inet.h>
17 : #include "log.h"
18 :
19 : #if defined DEBUG || defined DEBUG_IO
20 : #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
21 : #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
22 : #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
23 : #else
24 : #define log_dbg(...)
25 : #define log_trc(...)
26 : #endif
27 :
28 : # define ERROR_LOG( ... ) do { log_fatal(__VA_ARGS__); fflush(stdout); exit(-2);} while(0)
29 :
30 : #define ON_ERROR_CONNECTION_REFUSED(s, msg, c) \
31 : do { \
32 : s->fi_errno = -c; \
33 : s->fi_message = strdup(msg); \
34 : netio_error_connection_refused_fire(s); \
35 : } while(0);
36 :
37 : #define ON_ERROR_BIND_REFUSED(s, msg, c) \
38 : do { \
39 : s->fi_errno = -c; \
40 : s->fi_message = strdup(msg); \
41 : netio_error_bind_refused_fire(s); \
42 : } while(0);
43 :
44 :
45 : //Globals
46 :
47 :
48 : // STATIC FUNCTIONS ////////////////////////////////////////////////////////////
49 :
50 : static int
51 569 : _socket_init_info(struct netio_send_socket* socket, const char* hostname, unsigned port, void* addr, size_t addrlen)
52 : {
53 569 : log_dbg("Going trough _socket_init_info");
54 569 : int ret=0;
55 569 : struct fi_info* hints;
56 :
57 569 : hints = fi_allocinfo();
58 569 : hints->addr_format = FI_FORMAT_UNSPEC;
59 569 : hints->ep_attr->type = FI_EP_MSG;
60 569 : hints->caps = FI_MSG;
61 569 : hints->mode = FI_LOCAL_MR;
62 : // As of libfabric 1.10, the tcp provider only support FI_PROGRESS_MANUAL
63 : // So the following will not allow the tcp provider to be used
64 569 : hints->domain_attr->data_progress = FI_PROGRESS_AUTO;
65 569 : hints->domain_attr->resource_mgmt = FI_RM_ENABLED;
66 :
67 569 : char port_addr[32];
68 569 : snprintf(port_addr, 32, "%u", port);
69 569 : log_dbg("connecting to endpoint %s:%u", hostname, port);
70 :
71 569 : uint64_t flags = 0;
72 569 : if(hostname == NULL) {
73 43 : hostname = "127.0.0.1";
74 43 : flags = FI_SOURCE;
75 : }
76 :
77 569 : if(addr) {
78 : // TODO we have to convert the sockaddr_in into hostname port - why can't we directly supply the addr+addrlen in hints->dest_addr?
79 43 : struct sockaddr_in* sockaddr = (struct sockaddr_in*)addr;
80 43 : char* str_addr = inet_ntoa(sockaddr->sin_addr);
81 43 : log_dbg("sockaddr: %s:%d", str_addr, ntohs(sockaddr->sin_port));
82 :
83 43 : hostname = str_addr;
84 43 : snprintf(port_addr, 32, "%d", ntohs(sockaddr->sin_port));
85 43 : flags = 0;
86 : }
87 :
88 569 : if((ret = fi_getinfo(FI_VERSION(1, 1), hostname, port_addr, flags, hints, &socket->fi)))
89 : {
90 4 : fi_freeinfo(hints);
91 4 : log_error("Failed to initialise socket, error %d: %s", ret, fi_strerror(-ret));
92 4 : ON_ERROR_CONNECTION_REFUSED(socket, "fi_getinfo failed", ret);
93 4 : return -1;
94 : }
95 :
96 565 : log_dbg("addr format: %x", socket->fi->addr_format);
97 565 : log_dbg("fi_freeinfo");
98 565 : fi_freeinfo(hints);
99 565 : return 0;
100 : }
101 :
102 :
103 : static int
104 561 : _socket_init_domain(struct netio_send_socket* socket)
105 : {
106 561 : int ret=0;
107 561 : struct netio_domain *domain = malloc(sizeof(struct netio_domain));
108 561 : domain->reg_mr = 0;
109 561 : domain->mr = malloc(NETIO_DOMAIN_MAX_MR*sizeof(struct fid*));
110 561 : domain->nb_sockets = 1;
111 561 : socket->domain = domain;
112 :
113 561 : if((ret = fi_fabric(socket->fi->fabric_attr, &domain->fabric, NULL)))
114 : {
115 0 : log_error("Failed to initialise fabric, error %d: %s", ret, fi_strerror(-ret));
116 0 : ON_ERROR_CONNECTION_REFUSED(socket, "fi_fabric failed", ret);
117 0 : return -1;
118 : }
119 :
120 561 : if((ret = fi_domain(domain->fabric, socket->fi, &domain->domain, NULL)))
121 : {
122 0 : log_error("Failed to initialise domain, error %d: %s", ret, fi_strerror(-ret));
123 0 : ON_ERROR_CONNECTION_REFUSED(socket, "fi_domain failed", ret);
124 0 : return -1;
125 : }
126 : return 0;
127 : }
128 :
129 : static int
130 565 : _socket_connect(struct netio_send_socket* socket)
131 : {
132 565 : int ret=0;
133 565 : struct fi_eq_attr eq_attr = {10000, 0, FI_WAIT_FD, 0, 0};
134 :
135 : //Resources initialisation
136 565 : socket->eqfd = -1;
137 565 : socket->cqfd = -1;
138 565 : socket->ep = NULL;
139 565 : socket->eq = NULL;
140 565 : socket->cq = NULL;
141 :
142 565 : if((ret = fi_eq_open(socket->domain->fabric, &eq_attr, &socket->eq, NULL)))
143 : {
144 0 : log_error("Failed to open Event Queue for send socket, error %d: %s", ret, fi_strerror(-ret));
145 0 : ON_ERROR_CONNECTION_REFUSED(socket, "fi_eq_open failed", ret);
146 0 : return -1;
147 : }
148 :
149 565 : if((ret = fi_endpoint(socket->domain->domain, socket->fi, &socket->ep, NULL)))
150 : {
151 0 : log_error("Failed to open Endpoint for send socket, error %d: %s", ret, fi_strerror(-ret));
152 0 : ON_ERROR_CONNECTION_REFUSED(socket, "fi_endpoint failed", ret);
153 0 : return -1;
154 : }
155 :
156 565 : if((ret = fi_ep_bind((socket->ep), &socket->eq->fid, 0)))
157 : {
158 0 : log_error("Failed to bind endpoint, error %d: %s", ret, fi_strerror(-ret));
159 0 : ON_ERROR_CONNECTION_REFUSED(socket, "fi_ep_bind failed", ret);
160 0 : return -1;
161 : }
162 :
163 565 : struct fi_cq_attr cq_attr;
164 565 : cq_attr.size = NETIO_MAX_CQ_ENTRIES; /* # entries for CQ */
165 565 : cq_attr.flags = 0; /* operation flags */
166 565 : cq_attr.format = FI_CQ_FORMAT_DATA; //FI_CQ_FORMAT_CONTEXT; /* completion format */
167 565 : cq_attr.wait_obj= FI_WAIT_FD; /* requested wait object */
168 565 : cq_attr.signaling_vector = 0; /* interrupt affinity */
169 565 : cq_attr.wait_cond = FI_CQ_COND_NONE; /* wait condition format */ // The threshold indicates the number of entries that are to be queued before at the CQ before the wait is satisfied.
170 565 : cq_attr.wait_set = NULL; /* optional wait set */
171 :
172 : //FI_TRANSMIT CQ
173 565 : if((ret = fi_cq_open(socket->domain->domain, &cq_attr, &socket->cq, NULL)) != 0)
174 : {
175 0 : log_error("Failed to open Completion Queue for send socket, error %d: %s", ret, fi_strerror(-ret));
176 0 : ON_ERROR_CONNECTION_REFUSED(socket, "fi_cq_open failed", ret);
177 0 : return -1;
178 : }
179 :
180 565 : if((ret = fi_ep_bind((socket->ep), &socket->cq->fid, FI_TRANSMIT)) != 0)
181 : {
182 0 : log_error("Failed to open Completion Queue for send socket, error %d: %s", ret, fi_strerror(-ret));
183 0 : ON_ERROR_CONNECTION_REFUSED(socket, "fi_ep_bind failed", ret);
184 0 : return -1;
185 : }
186 :
187 : //FI_RECV CQ - also necessary
188 565 : cq_attr.format = FI_CQ_FORMAT_UNSPEC;
189 565 : cq_attr.wait_obj= FI_WAIT_NONE;
190 565 : if((ret = fi_cq_open(socket->domain->domain, &cq_attr, &socket->rcq, NULL)) != 0)
191 : {
192 0 : log_error("Failed to open Completion Queue for send socket, error %d: %s", ret, fi_strerror(-ret));
193 0 : ON_ERROR_CONNECTION_REFUSED(socket, "fi_cq_open failed", ret);
194 0 : return -1;
195 : }
196 :
197 565 : if((ret = fi_ep_bind((socket->ep), &socket->rcq->fid, FI_RECV)) != 0)
198 : {
199 0 : log_error("Failed to open Completion Queue for send socket, error %d: %s", ret, fi_strerror(-ret));
200 0 : ON_ERROR_CONNECTION_REFUSED(socket, "fi_ep_bind failed", ret);
201 0 : return -1;
202 : }
203 :
204 :
205 565 : if((ret = fi_enable(socket->ep)) != 0)
206 : {
207 0 : log_error("Failed to enable endpoint for send socket, error %d: %s", ret, fi_strerror(-ret));
208 0 : ON_ERROR_CONNECTION_REFUSED(socket, "fi_enable failed", ret);
209 0 : return -1;
210 : }
211 :
212 : /* Connect to server */
213 565 : if((ret = fi_connect(socket->ep, socket->fi->dest_addr, NULL, 0)) != 0)
214 : {
215 4 : log_warn("Connection to remote failed, error %d: %s", ret, fi_strerror(-ret));
216 4 : ON_ERROR_CONNECTION_REFUSED(socket, "fi_connect failed", ret);
217 4 : return -1;
218 : }
219 :
220 561 : if((ret = fi_control(&socket->eq->fid, FI_GETWAIT, &socket->eqfd)) != 0)
221 : {
222 0 : log_error("Cannot retrieve the Event Queue wait object of send socket, error %d: %s", ret, fi_strerror(-ret));
223 0 : ON_ERROR_CONNECTION_REFUSED(socket, "fi_control failed", ret);
224 0 : return -1;
225 : }
226 :
227 561 : socket->eq_ev_ctx.fd = socket->eqfd;
228 561 : socket->eq_ev_ctx.data = socket;
229 561 : socket->eq_ev_ctx.cb = on_send_socket_libfabric_cm_event;
230 :
231 561 : log_dbg("Adding SEND EQ polled fid %d %p \n", socket->eqfd, &socket->eq->fid);
232 561 : add_polled_fid(&socket->ctx->evloop.pfids, socket->domain->fabric, &socket->eq->fid, socket->eqfd, socket, on_send_socket_libfabric_cm_event);
233 561 : add_open_fd(&socket->ctx->evloop.openfds, socket->eqfd, NETIO_EQ, USEND, socket);
234 561 : netio_register_read_fd(&socket->ctx->evloop, &socket->eq_ev_ctx);
235 561 : log_dbg("send_socket: EQ fd %d waiting for connection", socket->eqfd);
236 :
237 561 : return 0;
238 : }
239 :
240 :
241 : // API FUNCTIONS ///////////////////////////////////////////////////////////////
242 :
243 : /**
244 : * Configures the debug level.
245 : *
246 : * @param level: The debug level, an integer ranging from 0 (TRACE) to 5 (FATAL)
247 : */
248 : void
249 419 : netio_set_debug_level(int level)
250 : {
251 419 : log_set_level(level);
252 419 : }
253 :
254 : /**
255 : * Return 1 if hostname contains the tcp protocol (prefix tcp:)
256 : *
257 : * @param hostname: Hostname or IP address of the remote endpoint
258 : */
259 1597 : int netio_tcp_mode(const char* hostname) {
260 1597 : return (strncmp( hostname, "tcp:", 4) == 0);
261 : }
262 :
263 : /**
264 : * Return protocol (tcp:, libfabric:)
265 : * No other implementation than tcp or libfabric at this time.
266 : *
267 : * @param hostname: Hostname or IP address of the remote endpoint
268 : */
269 1260 : const char* netio_protocol(const char* hostname) {
270 1260 : return netio_tcp_mode(hostname) ? "tcp" : "libfabric";
271 : }
272 :
273 : /**
274 : * Return hostname only of hostname without any prefix (tcp:, libfabric:)
275 : *
276 : * @param hostname: Hostname or IP address of the remote endpoint
277 : */
278 2724 : const char* netio_hostname(const char* hostname) {
279 2724 : const char* split = strchr(hostname, ':');
280 2724 : return split ? &split[1] : hostname;
281 : }
282 :
283 : /**
284 : * Initializes the eventloop first resetting the context passed as argument.
285 : *
286 : * @param ctx: The netio context
287 : */
288 : void
289 552 : netio_init(struct netio_context* ctx)
290 : {
291 552 : log_set_level(DEFAULT_DEBUG_LEVEL);
292 552 : memset(ctx, 0, sizeof(*ctx));
293 552 : netio_eventloop_init(&ctx->evloop);
294 552 : }
295 :
296 :
297 : /**
298 : * Initializes an unbuffered send socket.
299 : *
300 : * @param socket: The socket to intialize
301 : * @param ctx: The netio context
302 : */
303 : void
304 569 : netio_init_send_socket(struct netio_send_socket* socket, struct netio_context* ctx)
305 : {
306 569 : memset(socket, 0, sizeof(*socket));
307 569 : socket->ctx = ctx;
308 569 : socket->epollfd = socket->ctx->evloop.epollfd;
309 569 : socket->state = UNCONNECTED;
310 569 : socket->tcp_fi_mode = NETIO_MODE_LIBFABRIC;
311 569 : socket->cq_size = NETIO_MAX_CQ_EVENTS;
312 569 : socket->unbuf_pub_socket = NULL;
313 569 : socket->cb_internal_connection_closed = NULL;
314 569 : socket->deferred_subs = NULL;
315 569 : socket->recv_socket = NULL;
316 569 : }
317 :
318 :
319 : /**
320 : * Initializes an unbuffered listen socket.
321 : *
322 : * @param socket: The socket to intialize
323 : * @param ctx: The netio context
324 : */
325 : void
326 265 : netio_init_listen_socket(struct netio_listen_socket* socket, struct netio_context* ctx, struct netio_unbuffered_socket_attr* attr)
327 : {
328 265 : memset(socket, 0, sizeof(*socket));
329 265 : socket->ctx = ctx;
330 265 : socket->recv_sockets = NULL;
331 265 : socket->tcp_fi_mode = NETIO_MODE_LIBFABRIC;
332 265 : if (attr == NULL){
333 : socket->attr.buffer_size = 0;
334 : socket->attr.num_buffers = 0;
335 : } else {
336 88 : if (attr->num_buffers > NETIO_DOMAIN_MAX_MR){
337 0 : log_error("Number of recv buffers requested %lu exceeds max value. Using %lu.", attr->num_buffers, NETIO_DOMAIN_MAX_MR);
338 0 : attr->num_buffers = NETIO_DOMAIN_MAX_MR;
339 : }
340 88 : socket->attr = *attr;
341 : }
342 265 : }
343 :
344 :
345 : /**
346 : * Initializes an unbuffered recv socket.
347 : *
348 : * @param socket: The socket to intialize
349 : * @param ctx: The netio context
350 : */
351 : void
352 220 : netio_init_recv_socket(struct netio_recv_socket* socket, struct netio_listen_socket* lsocket)
353 : {
354 220 : memset(socket, 0, sizeof(*socket));
355 220 : socket->ctx = lsocket->ctx;
356 220 : socket->lsocket = lsocket;
357 220 : socket->reg_mr = 0;
358 220 : socket->cq_size = NETIO_MAX_CQ_EVENTS;
359 220 : socket->sub_msg_buffers = NULL;
360 220 : socket->mr = malloc(NETIO_DOMAIN_MAX_MR*sizeof(struct fid*));
361 220 : socket->tcp_fi_mode = NETIO_MODE_LIBFABRIC;
362 220 : }
363 :
364 : void
365 4 : netio_send_socket_init_and_connect(struct netio_send_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port) {
366 4 : int tcp = netio_tcp_mode(hostname);
367 4 : const char* host = netio_hostname(hostname);
368 4 : if (tcp) {
369 2 : netio_init_send_tcp_socket(socket, ctx);
370 2 : netio_connect_tcp(socket, host, port);
371 : } else {
372 2 : netio_init_send_socket(socket, ctx);
373 2 : netio_connect_domain(socket, host, port, NULL);
374 : }
375 4 : }
376 :
377 : void
378 2 : netio_listen_socket_init_and_listen(struct netio_listen_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_unbuffered_socket_attr* attr) {
379 2 : int tcp = netio_tcp_mode(hostname);
380 2 : const char* host = netio_hostname(hostname);
381 2 : if (tcp) {
382 1 : netio_init_listen_tcp_socket(socket, ctx, attr);
383 1 : netio_listen_tcp(socket, host, port);
384 : } else {
385 1 : netio_init_listen_socket(socket, ctx, attr);
386 1 : netio_listen(socket, host, port);
387 : }
388 2 : }
389 :
390 : void
391 541 : netio_connect(struct netio_send_socket* socket, const char* hostname, unsigned port)
392 : {
393 541 : if (socket->tcp_fi_mode == NETIO_MODE_TCP){
394 17 : netio_connect_tcp(socket, netio_hostname(hostname), port);
395 : }
396 : else{
397 524 : netio_connect_domain(socket, netio_hostname(hostname), port, NULL);
398 : }
399 541 : }
400 :
401 : /**
402 : * Connect an unbuffered send socket to a remote endpoint.
403 : *
404 : * @param socket: An unbuffered send socket
405 : * @param hostname: Hostname or IP address of the remote endpoint
406 : * @param port: Port of the remote endpoint
407 : * @param domain: an already existing and initialized netio_domain that this connect will use instead of creating its own
408 : * Obviously this can only work if the socket's actual domain (physical NIC) would refer to the same as domain's
409 : */
410 : void
411 526 : netio_connect_domain(struct netio_send_socket* socket, const char* hostname, unsigned port, struct netio_domain* domain)
412 : {
413 526 : log_dbg("_socket_init_info");
414 526 : if ((_socket_init_info(socket, hostname, port, NULL, 0))) return;
415 522 : if (domain == NULL) {
416 522 : log_dbg("_socket_init_domain in netio_connect_domain");
417 522 : if ( _socket_init_domain(socket) ) return;
418 : } else {
419 0 : domain->nb_sockets += 1;
420 0 : socket->domain = domain;
421 : }
422 522 : _socket_connect(socket);
423 : }
424 :
425 :
426 22 : void netio_connect_rawaddr(struct netio_send_socket* socket, void* addr, size_t addrlen)
427 : {
428 22 : netio_connect_rawaddr_domain(socket, addr, addrlen, NULL);
429 22 : }
430 :
431 :
432 : /**
433 : * Same as netio_connect_domain but using the ip address and port
434 : *
435 : * @param socket: An unbuffered send socket
436 : * @param addr: IP and port of the remote endpoint
437 : * @param addrlen: lenght of the address
438 : * @param domain: an already existing and initialized netio_domain that this connect will use instead of creating its own
439 : * Obviously this can only work if the socket's actual domain (physical NIC) would refer to the same as domain's
440 : */
441 43 : void netio_connect_rawaddr_domain(struct netio_send_socket* socket, void* addr, size_t addrlen, struct netio_domain* domain)
442 : {
443 43 : log_dbg("_socket_init_info");
444 43 : if ((_socket_init_info(socket, NULL, 0, addr, addrlen))) return;
445 43 : if (domain == NULL) {
446 39 : log_dbg("_socket_init_domain in netio_connect_rawaddr_domain");
447 39 : if ( _socket_init_domain(socket) ) return;
448 : } else {
449 4 : log_dbg("socket %p keeping domain %p with %d sockets", socket, domain, domain->nb_sockets);
450 4 : domain->nb_sockets += 1;
451 4 : socket->domain = domain;
452 : }
453 43 : _socket_connect(socket);
454 : }
455 :
456 :
457 : /**
458 : * Disconnect a connected unbuffered send socket.
459 : *
460 : * @param socket: A connected unbuffered send socket
461 : */
462 : void
463 404 : netio_disconnect(struct netio_send_socket* socket)
464 : {
465 404 : if(socket->tcp_fi_mode == NETIO_MODE_TCP){
466 15 : shutdown(socket->cq_ev_ctx.fd, SHUT_RDWR);
467 : } else {
468 389 : if(!socket->ep) {
469 : log_dbg("Send socket %p EP not initialised, not calling fi_shutdown.", socket);
470 : return;
471 : }
472 389 : int ret=0;
473 389 : if((ret = fi_shutdown(socket->ep, 0))){
474 0 : log_error("Failed to shutdown send_socket, error %d: %s", ret, fi_strerror(-ret));
475 0 : ON_ERROR_CONNECTION_REFUSED(socket, "fi_shutdown failed", ret);
476 0 : return;
477 : }
478 : }
479 : }
480 :
481 :
482 :
483 : void
484 0 : netio_connection_shutdown(void* ptr)
485 : {
486 0 : log_dbg("Handle_connection_shutdown.");
487 0 : struct signal_data* sd = (struct signal_data*)ptr;
488 0 : struct netio_send_socket* socket = (struct netio_send_socket*)sd->ptr;
489 0 : int ret=0;
490 0 : if(!socket->ep){
491 : log_dbg("Send socket %p EP not initialised, not calling fi_shutdown.", socket);
492 : return;
493 : }
494 0 : if((ret = fi_shutdown(socket->ep, 0)))
495 : {
496 0 : ON_ERROR_CONNECTION_REFUSED(socket, "fi_shutdown failed", ret);
497 0 : return;
498 : }
499 : //clean up signal
500 0 : netio_signal_close(sd->evloop, sd->signal);
501 0 : free(sd->signal);
502 0 : free(sd);
503 : }
504 :
505 :
506 : /**
507 : * Bind an unbuffered listen socket to an endpoint and listen for incoming connections.
508 : *
509 : * @param socket: An unbuffered listen socket
510 : * @param hostname: Hostname or IP address of an endpoint
511 : * @param port: A port number to listen on
512 : */
513 : void
514 128 : netio_listen(struct netio_listen_socket* socket, const char* hostname, unsigned port)
515 : {
516 128 : int ret=0;
517 128 : struct fi_info* hints;
518 128 : struct fi_eq_attr eq_attr = {10000, 0, FI_WAIT_FD, 0, 0};
519 :
520 128 : hints = fi_allocinfo();
521 128 : hints->addr_format = FI_FORMAT_UNSPEC;
522 128 : hints->ep_attr->type = FI_EP_MSG;
523 128 : hints->caps = FI_MSG;
524 128 : hints->mode = FI_LOCAL_MR;
525 128 : char port_addr[32];
526 128 : snprintf(port_addr, 32, "%u", port);
527 :
528 : //Resource initialisation
529 128 : socket->eqfd = -1;
530 128 : socket->pep = NULL;
531 128 : socket->eq = NULL;
532 128 : socket->fi = NULL;
533 :
534 128 : if((ret = fi_getinfo(FI_VERSION(1, 1), hostname, port_addr, FI_SOURCE, hints,
535 : &socket->fi)))
536 : {
537 2 : log_error("Failed to get info on local interface, error %d: %s", ret, fi_strerror(-ret));
538 2 : ON_ERROR_BIND_REFUSED(socket, "fi_getinfo failed", ret);
539 8 : return;
540 : }
541 126 : log_dbg("addr format: %x", socket->fi->addr_format);
542 :
543 126 : if((ret = fi_fabric(socket->fi->fabric_attr, &socket->fabric, NULL)))
544 : {
545 0 : log_error("Failed to open fabric for listen socket, error %d: %s", ret, fi_strerror(-ret));
546 0 : ON_ERROR_BIND_REFUSED(socket, "fi_fabric failed", ret);
547 0 : return;
548 : }
549 126 : if((ret = fi_eq_open(socket->fabric, &eq_attr, &socket->eq, NULL)))
550 : {
551 0 : log_error("Failed to open Event Queue for listen socket, error %d: %s", ret, fi_strerror(-ret));
552 0 : ON_ERROR_BIND_REFUSED(socket, "fi_eq_open failed", ret);
553 0 : return;
554 : }
555 :
556 126 : if((ret = fi_passive_ep(socket->fabric, socket->fi, &socket->pep, NULL)))
557 : {
558 0 : log_error("Failed to open passive endpoint for listen socket, error %d: %s", ret, fi_strerror(-ret));
559 0 : ON_ERROR_BIND_REFUSED(socket, "fi_passive_ep failed", ret);
560 0 : return;
561 : }
562 :
563 126 : if((ret = fi_pep_bind(socket->pep, &socket->eq->fid, 0)))
564 : {
565 0 : log_error("Failed to bind passive endpoint for listen socket, error %d: %s", ret, fi_strerror(-ret));
566 0 : ON_ERROR_BIND_REFUSED(socket, "fi_pep_bind failed", ret);
567 0 : return;
568 : }
569 :
570 126 : if((ret = fi_listen(socket->pep)))
571 : {
572 4 : log_error("Failed to enable listen socket, error %d: %s", ret, fi_strerror(-ret));
573 4 : ON_ERROR_BIND_REFUSED(socket, "fi_listen failed", ret);
574 4 : return;
575 : }
576 :
577 122 : if((ret = fi_control(&socket->eq->fid, FI_GETWAIT, &socket->eqfd)))
578 : {
579 0 : log_error("Failed to retrive listen socket Event Queue wait object, error %d: %s", ret, fi_strerror(-ret));
580 0 : ON_ERROR_BIND_REFUSED(socket, "fi_control failed", ret);
581 0 : return;
582 : }
583 :
584 122 : socket->eq_ev_ctx.fd = socket->eqfd;
585 122 : socket->eq_ev_ctx.data = socket;
586 122 : socket->eq_ev_ctx.cb = on_listen_socket_libfabric_cm_event;
587 :
588 : //TODO: The listen EQ should be added too, but I don't know how to remove it to avoid a trywait SEGFAULT.
589 : //printf("Adding LISTEN EQ polled fid %d %p \n", socket->eqfd, &socket->eq->fid);
590 : //add_polled_fid(&socket->ctx->evloop.pfids, socket->fabric, &socket->eq->fid, socket->eqfd, socket, on_listen_socket_cm_event);
591 122 : add_open_fd(&socket->ctx->evloop.openfds, socket->eqfd, NETIO_EQ, ULISTEN, socket);
592 :
593 122 : netio_register_read_fd(&socket->ctx->evloop, &socket->eq_ev_ctx);
594 122 : log_dbg("netio_listen_socket: registering EQ fd %d", socket->eqfd);
595 122 : fi_freeinfo(hints);
596 : }
597 :
598 :
599 : /**
600 : * Retrieve the local endpoint address and store it in the sockaddr_storage sa.
601 : * The function returns the lenght of the address.
602 : *
603 : * @param socket: An unbuffered listen socket
604 : * @param sa: structure storing the local endpoint address
605 : */
606 9 : size_t netio_listen_socket_endpoint(struct netio_listen_socket* socket, struct sockaddr_storage *sa)
607 : {
608 9 : size_t addrlen;
609 : //memset(sa, 0, sizeof(*sa));
610 9 : addrlen = sizeof(struct sockaddr_storage);
611 :
612 9 : int ret=0;
613 9 : if((ret = fi_getname(&socket->pep->fid, sa, &addrlen)))
614 : {
615 0 : log_error("Failed to retrieve the local endpoint address, error %d: %s", ret, fi_strerror(-ret));
616 0 : ON_ERROR_BIND_REFUSED(socket, "fi_getname failed", ret);
617 0 : return 0;
618 : }
619 :
620 9 : return addrlen;
621 : }
622 :
623 :
624 : /**
625 : * Post a receive buffer to an unbuffered receive socket.
626 : *
627 : * Receive buffers must be registered using `netio_register_recv_buffer`.
628 : *
629 : * @param socket: An unbuffered receive socket
630 : * @param buf: A registered receive buffer.
631 : */
632 : void
633 36966138 : netio_post_recv(struct netio_recv_socket* socket, struct netio_buffer* buf)
634 : {
635 36966138 : struct iovec iov;
636 36966138 : void* desc;
637 :
638 36966138 : struct netio_tcp_recv_item *mrdn;
639 :
640 36966138 : if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC)
641 : {
642 :
643 21642596 : iov.iov_base = buf->data;
644 21642596 : iov.iov_len = buf->size;
645 21642596 : desc = fi_mr_desc(buf->mr);
646 :
647 21642596 : struct fi_msg msg;
648 21642596 : msg.msg_iov = &iov; /* scatter-gather array */
649 21642596 : msg.desc = &desc;
650 21642596 : msg.iov_count = 1;
651 21642596 : msg.addr = 0;
652 21642596 : msg.context = buf;
653 21642596 : msg.data = 0;
654 :
655 21642596 : uint64_t flags;
656 21642596 : flags = FI_REMOTE_CQ_DATA;//FI_MULTI_RECV;
657 :
658 21642596 : int ret=0;
659 21642596 : if((ret = fi_recvmsg(socket->ep, &msg, flags)) != 0)
660 : {
661 0 : log_error("Failed to post a buffer to receive inbound messages, error %d: %s", ret, fi_strerror(-ret));
662 : }
663 : }
664 : else
665 : {
666 : //Allocate memory for a message request descriptor
667 15323542 : struct netio_tcp_recv_item *mrd;
668 15323542 : mrd = (struct netio_tcp_recv_item *) malloc(sizeof(struct netio_tcp_recv_item));
669 15323542 : if(mrd == NULL)
670 : {
671 0 : ERROR_LOG("cannot allocate memory for descriptor");
672 : }
673 : /* log_debug("mrd is at %p", (void *)mrd); */
674 :
675 15323542 : mrd->element_active = 1; //MJ do we need this variable?
676 15323542 : mrd->socket = socket; //this is a netio_recv_socket
677 15323542 : mrd->buffer = buf;
678 15323542 : mrd->next_element = NULL;
679 15323542 : mrd->bytes_received = 0;
680 15323542 : mrd->message_size = 0;
681 :
682 : /* log_debug("receive descriptor allocated and initialized"); */
683 :
684 : //Append the descriptor to the list
685 15323542 : if(socket->message_request_header == NULL)
686 : {
687 100 : socket->message_request_header = (void *)mrd;
688 : /* log_debug("descriptor linked to head of queue"); */
689 : }
690 : else
691 : {
692 : int free_item = 1;
693 : struct netio_tcp_recv_item *mrdq;
694 : mrdq = (struct netio_tcp_recv_item *)socket->message_request_header;
695 : /* log_debug("Head of list points at = %p", mrdq); */
696 : int mrd_linked = 0;
697 :
698 4508172256 : do
699 : {
700 4508172256 : if (mrdq->next_element == NULL)
701 : {
702 15323446 : mrdq->next_element = (void *)mrd;
703 15323446 : mrd_linked = 1;
704 : // log_error("End of list found at entry %d. Appending mrd to the desriptor at %p", free_item, mrdq);
705 : }
706 : else
707 : {
708 : mrdn = (struct netio_tcp_recv_item *)mrdq->next_element;
709 : // log_error("item = %d. %p points at %p", free_item, mrdq, mrdn);
710 : free_item++;
711 4508172256 : mrdq = mrdn;
712 : }
713 : }
714 4508172256 : while(!mrd_linked);
715 : }
716 : /*
717 : log_debug("Calling netio_signal_fire");
718 : log_debug("&socket->tcp_signal = %p", &socket->tcp_signal);
719 : netio_signal_fire(&socket->tcp_signal);
720 : */
721 15323542 : return;
722 : }
723 : }
724 :
725 : /**
726 : * Removes recv socket from recv socket list of a listen socket.
727 : *
728 : * @param socket: An unbuffered receive socket.
729 : */
730 : void
731 0 : netio_remove_recv_socket(struct netio_recv_socket* socket){
732 0 : struct netio_listen_socket* lsocket = socket->lsocket;
733 0 : int ret = remove_socket(&lsocket->recv_sockets, (void*)socket);
734 0 : if (ret == 0){
735 0 : log_info("Unbuffered connection closed, recv socket deleted.");
736 : } else {
737 0 : log_warn("Unbuffered connection closed, could not delete recv socket.");
738 : }
739 0 : }
740 :
741 : /**
742 : * Removes buffered recv socket from recv socket list of a buffered listen socket.
743 : *
744 : * @param socket: An buffered receive socket.
745 : */
746 : void
747 0 : netio_buffered_remove_recv_socket(struct netio_buffered_recv_socket* socket){
748 0 : struct netio_buffered_listen_socket* lsocket = socket->lsocket;
749 0 : int ret = remove_socket(&(lsocket->listen_socket.recv_sockets), (void*)socket);
750 0 : if (ret == 0){
751 0 : log_info("Buffered connection closed, buffered recv socket deleted.");
752 : } else {
753 0 : log_warn("Buffered connection closed, could not delete recv socket.");
754 : }
755 0 : }
756 :
757 : static int
758 12424354 : generic_sendmsg(struct netio_send_socket* socket,
759 : struct iovec* iov,
760 : void** desc,
761 : size_t count,
762 : uint64_t key,
763 : uint64_t add_flags,
764 : uint64_t imm)
765 : {
766 12424354 : int ret=0;
767 12424354 : uint64_t flags;
768 :
769 12424354 : struct fi_msg msg;
770 12424354 : msg.msg_iov = iov; /* scatter-gather array */
771 12424354 : msg.desc = desc;
772 12424354 : msg.iov_count = count;
773 12424354 : msg.addr = 0;
774 12424354 : msg.context = (void*)key;
775 12424354 : msg.data = imm;
776 :
777 12424354 : log_trc("sending iov message with immediate value 0x%lx", msg.data);
778 :
779 12424354 : flags = FI_INJECT_COMPLETE | add_flags;
780 :
781 12424354 : if(!socket->ep || !socket->ep->msg){
782 0 : log_error("Failed sending message because of null message or null endpoint.");
783 0 : return NETIO_STATUS_ERROR;
784 : }
785 :
786 12424354 : if((ret = fi_sendmsg(socket->ep, &msg, flags)) != 0)
787 : {
788 0 : if(ret == -FI_EAGAIN) {
789 : return NETIO_STATUS_AGAIN;
790 : }
791 0 : log_error("Failed to send message error (IOV count %lu, key %lu) %d: %s.", count, key, ret, fi_strerror(-ret));
792 0 : return NETIO_STATUS_ERROR;
793 : }
794 : return NETIO_STATUS_OK;
795 : }
796 :
797 :
798 : /**
799 : * Sends a full buffer over a connected unbuffered send socket.
800 : *
801 : * @param socket: A connected, unbuffered send socket
802 : * @param buf: A registered send buffer
803 : */
804 : int
805 1121463 : netio_send_buffer(struct netio_send_socket* socket, struct netio_buffer* buf)
806 : {
807 1121463 : return netio_send(socket, buf, buf->data, buf->size, (uint64_t)buf);
808 : }
809 :
810 :
811 : /**
812 : * Sends a buffer over a connected unbuffered send socket as inline data.
813 : *
814 : * @param socket: A connected, unbuffered send socket
815 : * @param buf: A buffer whose size does not exceed FI_VERBS_INLINE_SIZE (libfabric default 64 bytes)
816 : * The buffer is reusable as soon as the function returns as it is copied by the provider.
817 : * The buffer is not required to be a registered memory region as no RDMA transfer occurs.
818 : * FI_VERBS_INLINE_SIZE corresponds to IBV_SEND_INLINE
819 : */
820 : int
821 2433 : netio_send_inline_buffer(struct netio_send_socket* socket, struct netio_buffer* buf)
822 : {
823 2433 : struct iovec iov;
824 2433 : void* desc;
825 :
826 2433 : iov.iov_base = buf->data;
827 2433 : iov.iov_len = buf->size;
828 2433 : desc = fi_mr_desc(buf->mr);
829 2433 : uint64_t key = (uint64_t)buf;
830 2433 : uint64_t flags = FI_INJECT;
831 :
832 2433 : return generic_sendmsg(socket, /* struct netio_send_socket* socket */
833 : &iov, /* struct iovec* iov */
834 : &desc, /* void** desc */
835 : 1, /* size_t count */
836 : key, /* uint64_t key */
837 : flags, /* uint64_t add_flags */
838 : 0 /* uint64_t imm */
839 : );
840 : }
841 :
842 :
843 : /**
844 : * Sends a partial buffer over a connected unbuffered send socket.
845 : *
846 : * @param socket: A connected, unbuffered send socket
847 : * @param buffer: A registered send buffer
848 : * @param addr: Pointer to message within the buffer
849 : * @param size: Size of the message
850 : * @param key: Message key used to track the message progress.
851 : */
852 : int
853 1121615 : netio_send(struct netio_send_socket* socket, struct netio_buffer* buf, void* addr, size_t size, uint64_t key)
854 : {
855 1121615 : struct iovec iov;
856 1121615 : void* desc;
857 1121615 : struct netio_tcp_send_item *mrdn;
858 :
859 1121615 : if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC)
860 : {
861 1118126 : iov.iov_base = addr;
862 1118126 : iov.iov_len = size;
863 1118126 : desc = fi_mr_desc(buf->mr);
864 :
865 1118126 : return generic_sendmsg(socket, /* struct netio_send_socket* socket */
866 : &iov, /* struct iovec* iov */
867 : &desc, /* void** desc */
868 : 1, /* size_t count */
869 : key, /* uint64_t key */
870 : 0, /* uint64_t add_flags */
871 : 0 /* uint64_t imm */
872 : );
873 : }
874 : else
875 : {
876 3489 : if (socket->state!=CONNECTED)
877 : {
878 0 : log_warn("socket not connected (state=%d)",socket->state);
879 : }
880 : //Allocate memory for a message request descriptor
881 3489 : struct netio_tcp_send_item *mrd;
882 3489 : mrd = (struct netio_tcp_send_item *) malloc(sizeof(struct netio_tcp_send_item));
883 3489 : if(mrd == NULL)
884 : {
885 0 : ERROR_LOG("cannot allocate memory for descriptor");
886 : }
887 3489 : mrd->element_active = NETIO_TCP_NEW;
888 3489 : mrd->socket = socket;
889 3489 : mrd->buffer = buf;
890 3489 : log_dbg("netio-tcp: setting buffer size to msg size and buffer data to addr");
891 3489 : mrd->buffer->size = size;
892 3489 : mrd->buffer->data = addr;
893 3489 : mrd->total_bytes = size;
894 3489 : mrd->bytes_left = sizeof(int);
895 3489 : mrd->next_element = NULL;
896 3489 : mrd->key = (uint64_t) buf;
897 : // log_debug("send descriptor allocated and initialized");
898 :
899 : //Append the descriptor to the list
900 3489 : if(socket->message_request_header == NULL)
901 : {
902 2047 : socket->message_request_header = (void *)mrd;
903 : // log_debug("List was empty. Descriptor linked to head of list");
904 : }
905 : else
906 : {
907 : int free_item = 1;
908 : struct netio_tcp_send_item *mrdq;
909 : mrdq = (struct netio_tcp_send_item *)socket->message_request_header;
910 : // log_debug("Head of list points at = %p", mrdq);
911 : int mrd_linked = 0;
912 :
913 290370 : do
914 : {
915 290370 : if (mrdq->next_element == NULL)
916 : {
917 1442 : mrdq->next_element = (void *)mrd;
918 1442 : mrd_linked = 1;
919 : // log_debug("End of list found at entry %d. Appending mrd to the desriptor at %p", free_item, mrdq);
920 : }
921 : else
922 : {
923 : mrdn = (struct netio_tcp_send_item *)mrdq->next_element;
924 : // log_debug("Item = %d. %p points at %p", free_item, mrdq, mrdn);
925 : free_item++;
926 290370 : mrdq = mrdn;
927 : }
928 : }
929 290370 : while(!mrd_linked);
930 : }
931 :
932 : // log_debug("Calling netio_signal_fire for signal at %p", &socket->tcp_signal);
933 3489 : netio_signal_fire(&socket->tcp_signal);
934 :
935 : // log_info("done");
936 3489 : return(NETIO_STATUS_OK);
937 : }
938 :
939 : }
940 :
941 :
942 : /**
943 : * Sends a partial buffer with immediate data over a connected unbuffered send socket.
944 : *
945 : * @param socket: A connected, unbuffered send socket
946 : * @param buffer: A registered send buffer
947 : * @param addr: Pointer to message within the buffer
948 : * @param size: Size of the message
949 : * @param key: Message key used to track the message progress.
950 : * @param imm: Immediate data, up to 8 byte (size is implementation-dependent)
951 : */
952 : int
953 0 : netio_send_imm(struct netio_send_socket* socket, struct netio_buffer* buf, void* addr, size_t size, uint64_t key, uint64_t imm)
954 : {
955 0 : struct iovec iov;
956 0 : void* desc;
957 :
958 0 : iov.iov_base = addr;
959 0 : iov.iov_len = size;
960 0 : desc = fi_mr_desc(buf->mr);
961 :
962 0 : return generic_sendmsg(socket, /* struct netio_send_socket* socket */
963 : &iov, /* struct iovec* iov */
964 : &desc, /* void** desc */
965 : 1, /* size_t count */
966 : key, /* uint64_t key */
967 : FI_REMOTE_CQ_DATA, /* uint64_t add_flags */
968 : imm /* uint64_t imm */
969 : );
970 : }
971 :
972 :
973 : /**
974 : * Sends a partial buffer data over a connected unbuffered send socket.
975 : *
976 : * @param socket: A connected, unbuffered send socket
977 : * @param buffer: A vector of registered send buffers
978 : * @param iov: Scatter/gather buffer describing message within the buffers
979 : * @param count: Size of the scatter/gather vector
980 : * @param key: Message key used to track the message progress.
981 : */
982 : int
983 0 : netio_sendv(struct netio_send_socket* socket, struct netio_buffer** buf, struct iovec* iov, size_t count, uint64_t key)
984 : {
985 0 : void* descarray[NETIO_MAX_IOV_LEN];
986 :
987 0 : for(unsigned i=0; i<count; i++) {
988 0 : descarray[i] = fi_mr_desc(buf[i]->mr);
989 : }
990 :
991 0 : return generic_sendmsg(socket, /* struct netio_send_socket* socket */
992 : iov, /* struct iovec* iov */
993 : descarray, /* void** desc */
994 : count, /* size_t count */
995 : key, /* uint64_t key */
996 : 0, /* uint64_t add_flags */
997 : 0 /* uint64_t imm */
998 : );
999 : }
1000 :
1001 :
1002 : /**
1003 : * Sends a partial buffer with immediate data over a connected unbuffered send socket.
1004 : *
1005 : * @param socket: A connected, unbuffered send socket
1006 : * @param buffer: A vector of registered send buffers
1007 : * @param iov: Scatter/gather buffer describing message within the buffers
1008 : * @param count: Size of the scatter/gather vector
1009 : * @param key: Message key used to track the message progress.
1010 : * @param imm: Immediate data, up to 8 byte (size is implementation-dependent)
1011 : */
1012 : int
1013 11306030 : netio_sendv_imm(struct netio_send_socket* socket, struct netio_buffer** buf, struct iovec* iov, size_t count, uint64_t key, uint64_t imm)
1014 : {
1015 11306030 : void* descarray[NETIO_MAX_IOV_LEN];
1016 :
1017 66118099 : for(unsigned i=0; i<count; i++) {
1018 54812069 : descarray[i] = fi_mr_desc(buf[i]->mr);
1019 : }
1020 :
1021 11306030 : return generic_sendmsg(socket, /* struct netio_send_socket* socket */
1022 : iov, /* struct iovec* iov */
1023 : descarray, /* void** desc */
1024 : count, /* size_t count */
1025 : key, /* uint64_t key */
1026 : FI_REMOTE_CQ_DATA, /* uint64_t add_flags */
1027 : imm /* uint64_t imm */
1028 : );
1029 : }
|