Line data Source code
1 : #include <unistd.h>
2 : #include <stdio.h>
3 : #include <sys/uio.h>
4 : #include <string.h>
5 : #include "netio/netio.h"
6 : #include "connection_event.h"
7 : #include "completion_event.h"
8 : #include <sys/socket.h>
9 : #include <netinet/in.h>
10 : #include <arpa/inet.h>
11 : #include "log.h"
12 :
13 : #if defined DEBUG || defined DEBUG_IO
14 : #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
15 : #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
16 : #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
17 : #else
18 : #define log_dbg(...)
19 : #define log_trc(...)
20 : #endif
21 :
22 : #define ON_ERROR_CONNECTION_REFUSED(s, msg, c) \
23 : do { \
24 : s->fi_errno = -c; \
25 : s->fi_message = strdup(msg); \
26 : netio_error_connection_refused_fire(s); \
27 : } while(0);
28 :
29 : #define ON_ERROR_BIND_REFUSED(s, msg, c) \
30 : do { \
31 : s->fi_errno = -c; \
32 : s->fi_message = strdup(msg); \
33 : netio_error_bind_refused_fire(s); \
34 : } while(0);
35 :
36 :
37 : // STATIC FUNCTIONS ////////////////////////////////////////////////////////////
38 :
39 : static int
40 1180 : _socket_init_info(struct netio_send_socket* socket, const char* hostname, unsigned port, void* addr, size_t addrlen)
41 : {
42 1180 : log_dbg("Going trough _socket_init_info");
43 1180 : int ret=0;
44 1180 : struct fi_info* hints;
45 :
46 1180 : hints = fi_allocinfo();
47 1180 : hints->addr_format = FI_FORMAT_UNSPEC;
48 1180 : hints->ep_attr->type = FI_EP_MSG;
49 1180 : hints->caps = FI_MSG;
50 1180 : hints->mode = FI_LOCAL_MR;
51 : // As of libfabric 1.10, the tcp provider only support FI_PROGRESS_MANUAL
52 : // So the following will not allow the tcp provider to be used
53 1180 : hints->domain_attr->data_progress = FI_PROGRESS_AUTO;
54 1180 : hints->domain_attr->resource_mgmt = FI_RM_ENABLED;
55 :
56 1180 : char port_addr[32];
57 1180 : snprintf(port_addr, 32, "%u", port);
58 1180 : log_dbg("connecting to endpoint %s:%u", hostname, port);
59 :
60 1180 : uint64_t flags = 0;
61 1180 : if(hostname == NULL) {
62 75 : hostname = "127.0.0.1";
63 75 : flags = FI_SOURCE;
64 : }
65 :
66 1180 : if(addr) {
67 : // TODO we have to convert the sockaddr_in into hostname port - why can't we directly supply the addr+addrlen in hints->dest_addr?
68 75 : struct sockaddr_in* sockaddr = (struct sockaddr_in*)addr;
69 75 : char* str_addr = inet_ntoa(sockaddr->sin_addr);
70 75 : log_dbg("sockaddr: %s:%d", str_addr, ntohs(sockaddr->sin_port));
71 :
72 75 : hostname = str_addr;
73 75 : snprintf(port_addr, 32, "%d", ntohs(sockaddr->sin_port));
74 75 : flags = 0;
75 : }
76 :
77 1180 : if((ret = fi_getinfo(FI_VERSION(1, 1), hostname, port_addr, flags, hints, &socket->fi)))
78 : {
79 8 : fi_freeinfo(hints);
80 8 : log_error("Failed to initialise socket, error %d: %s", ret, fi_strerror(-ret));
81 8 : ON_ERROR_CONNECTION_REFUSED(socket, "fi_getinfo failed", ret);
82 8 : return -1;
83 : }
84 :
85 1172 : log_trc("addr format: %x", socket->fi->addr_format);
86 1172 : log_dbg("fi_freeinfo");
87 1172 : fi_freeinfo(hints);
88 1172 : return 0;
89 : }
90 :
91 :
92 : static int
93 1167 : _socket_init_domain(struct netio_send_socket* socket, const char* hostname, unsigned port, void* addr, size_t addrlen)
94 : {
95 1167 : int ret=0;
96 1167 : struct netio_domain *domain = malloc(sizeof(struct netio_domain));
97 1167 : domain->reg_mr = 0;
98 1167 : domain->mr = malloc(NETIO_DOMAIN_MAX_MR*sizeof(struct fid*));
99 1167 : domain->nb_sockets = 1;
100 1167 : socket->domain = domain;
101 :
102 1167 : if((ret = fi_fabric(socket->fi->fabric_attr, &domain->fabric, NULL)))
103 : {
104 0 : log_error("Failed to initialise fabric, error %d: %s", ret, fi_strerror(-ret));
105 0 : ON_ERROR_CONNECTION_REFUSED(socket, "fi_fabric failed", ret);
106 0 : return -1;
107 : }
108 :
109 1167 : if((ret = fi_domain(domain->fabric, socket->fi, &domain->domain, NULL)))
110 : {
111 0 : log_error("Failed to initialise domain, error %d: %s", ret, fi_strerror(-ret));
112 0 : ON_ERROR_CONNECTION_REFUSED(socket, "fi_domain failed", ret);
113 0 : return -1;
114 : }
115 : return 0;
116 : }
117 :
118 : static int
119 1172 : _socket_connect(struct netio_send_socket* socket, const char* hostname, unsigned port, void* addr, size_t addrlen)
120 : {
121 1172 : int ret=0;
122 1172 : struct fi_eq_attr eq_attr;
123 1172 : eq_attr.wait_obj = FI_WAIT_FD;
124 :
125 : //Resources initialisation
126 1172 : socket->eqfd = -1;
127 1172 : socket->cqfd = -1;
128 1172 : socket->ep = NULL;
129 1172 : socket->eq = NULL;
130 1172 : socket->cq = NULL;
131 :
132 1172 : if((ret = fi_eq_open(socket->domain->fabric, &eq_attr, &socket->eq, NULL)))
133 : {
134 0 : log_error("Failed to open Event Queue for send socket, error %d: %s", ret, fi_strerror(-ret));
135 0 : ON_ERROR_CONNECTION_REFUSED(socket, "fi_eq_open failed", ret);
136 0 : return -1;
137 : }
138 :
139 1172 : if((ret = fi_endpoint(socket->domain->domain, socket->fi, &socket->ep, NULL)))
140 : {
141 0 : log_error("Failed to open Endpoint for send socket, error %d: %s", ret, fi_strerror(-ret));
142 0 : ON_ERROR_CONNECTION_REFUSED(socket, "fi_endpoint failed", ret);
143 0 : return -1;
144 : }
145 :
146 1172 : if((ret = fi_ep_bind((socket->ep), &socket->eq->fid, 0)))
147 : {
148 0 : log_error("Failed to bind endpoint, error %d: %s", ret, fi_strerror(-ret));
149 0 : ON_ERROR_CONNECTION_REFUSED(socket, "fi_ep_bind failed", ret);
150 0 : return -1;
151 : }
152 :
153 1172 : struct fi_cq_attr cq_attr;
154 1172 : cq_attr.size = NETIO_MAX_CQ_ENTRIES; /* # entries for CQ */
155 1172 : cq_attr.flags = 0; /* operation flags */
156 1172 : cq_attr.format = FI_CQ_FORMAT_DATA; //FI_CQ_FORMAT_CONTEXT; /* completion format */
157 1172 : cq_attr.wait_obj= FI_WAIT_FD; /* requested wait object */
158 1172 : cq_attr.signaling_vector = 0; /* interrupt affinity */
159 1172 : cq_attr.wait_cond = FI_CQ_COND_NONE; /* wait condition format */ // The threshold indicates the number of entries that are to be queued before at the CQ before the wait is satisfied.
160 1172 : cq_attr.wait_set = NULL; /* optional wait set */
161 :
162 : //FI_TRANSMIT CQ
163 1172 : if((ret = fi_cq_open(socket->domain->domain, &cq_attr, &socket->cq, NULL)) != 0)
164 : {
165 0 : log_error("Failed to open Completion Queue for send socket, error %d: %s", ret, fi_strerror(-ret));
166 0 : ON_ERROR_CONNECTION_REFUSED(socket, "fi_cq_open failed", ret);
167 0 : return -1;
168 : }
169 :
170 1172 : if((ret = fi_ep_bind((socket->ep), &socket->cq->fid, FI_TRANSMIT)) != 0)
171 : {
172 0 : log_error("Failed to open Completion Queue for send socket, error %d: %s", ret, fi_strerror(-ret));
173 0 : ON_ERROR_CONNECTION_REFUSED(socket, "fi_ep_bind failed", ret);
174 0 : return -1;
175 : }
176 :
177 : //FI_RECV CQ - also necessary
178 1172 : cq_attr.format = FI_CQ_FORMAT_UNSPEC;
179 1172 : cq_attr.wait_obj= FI_WAIT_NONE;
180 1172 : if((ret = fi_cq_open(socket->domain->domain, &cq_attr, &socket->rcq, NULL)) != 0)
181 : {
182 0 : log_error("Failed to open Completion Queue for send socket, error %d: %s", ret, fi_strerror(-ret));
183 0 : ON_ERROR_CONNECTION_REFUSED(socket, "fi_cq_open failed", ret);
184 0 : return -1;
185 : }
186 :
187 1172 : if((ret = fi_ep_bind((socket->ep), &socket->rcq->fid, FI_RECV)) != 0)
188 : {
189 0 : log_error("Failed to open Completion Queue for send socket, error %d: %s", ret, fi_strerror(-ret));
190 0 : ON_ERROR_CONNECTION_REFUSED(socket, "fi_ep_bind failed", ret);
191 0 : return -1;
192 : }
193 :
194 :
195 1172 : if((ret = fi_enable(socket->ep)) != 0)
196 : {
197 0 : log_error("Failed to enable endpoint for send socket, error %d: %s", ret, fi_strerror(-ret));
198 0 : ON_ERROR_CONNECTION_REFUSED(socket, "fi_enable failed", ret);
199 0 : return -1;
200 : }
201 :
202 : /* Connect to server */
203 1172 : if((ret = fi_connect(socket->ep, socket->fi->dest_addr, NULL, 0)) != 0)
204 : {
205 8 : log_warn("Connection to remote failed, error %d: %s", ret, fi_strerror(-ret));
206 8 : ON_ERROR_CONNECTION_REFUSED(socket, "fi_connect failed", ret);
207 8 : return -1;
208 : }
209 :
210 1164 : if((ret = fi_control(&socket->eq->fid, FI_GETWAIT, &socket->eqfd)) != 0)
211 : {
212 0 : log_error("Cannot retrieve the Event Queue wait object of send socket, error %d: %s", ret, fi_strerror(-ret));
213 0 : ON_ERROR_CONNECTION_REFUSED(socket, "fi_control failed", ret);
214 0 : return -1;
215 : }
216 :
217 1164 : socket->eq_ev_ctx.fd = socket->eqfd;
218 1164 : socket->eq_ev_ctx.data = socket;
219 1164 : socket->eq_ev_ctx.cb = on_send_socket_cm_event;
220 :
221 1164 : log_dbg("Adding SEND EQ polled fid %d %p \n", socket->eqfd, &socket->eq->fid);
222 1164 : add_polled_fid(&socket->ctx->evloop.pfids, socket->domain->fabric, &socket->eq->fid, socket->eqfd, socket, on_send_socket_cm_event);
223 1164 : add_open_fd(&socket->ctx->evloop.openfds, socket->eqfd, NETIO_EQ, USEND, socket);
224 1164 : netio_register_read_fd(&socket->ctx->evloop, &socket->eq_ev_ctx);
225 1164 : log_dbg("send_socket: EQ fd %d waiting for connection", socket->eqfd);
226 :
227 1164 : return 0;
228 : }
229 :
230 :
231 : // API FUNCTIONS ///////////////////////////////////////////////////////////////
232 :
233 : /**
234 : * Configures the debug level.
235 : *
236 : * @param level: The debug level, an integer ranging from 0 (TRACE) to 5 (FATAL)
237 : */
238 : void
239 0 : netio_set_debug_level(int level)
240 : {
241 0 : log_set_level(level);
242 0 : }
243 :
244 : /**
245 : * Initializes the eventloop first resetting the context passed as argument.
246 : *
247 : * @param ctx: The netio context
248 : */
249 : void
250 915 : netio_init(struct netio_context* ctx)
251 : {
252 915 : log_set_level(DEFAULT_DEBUG_LEVEL);
253 915 : memset(ctx, 0, sizeof(*ctx));
254 915 : netio_eventloop_init(&ctx->evloop);
255 915 : }
256 :
257 :
258 : /**
259 : * Initializes an unbuffered send socket.
260 : *
261 : * @param socket: The socket to intialize
262 : * @param ctx: The netio context
263 : */
264 : void
265 1180 : netio_init_send_socket(struct netio_send_socket* socket, struct netio_context* ctx)
266 : {
267 1180 : memset(socket, 0, sizeof(*socket));
268 1180 : socket->ctx = ctx;
269 1180 : socket->epollfd = socket->ctx->evloop.epollfd;
270 1180 : socket->state = UNCONNECTED;
271 1180 : socket->cq_size = NETIO_MAX_CQ_EVENTS;
272 1180 : socket->unbuf_pub_socket = NULL;
273 1180 : socket->cb_internal_connection_closed = NULL;
274 1180 : socket->deferred_subs = NULL;
275 1180 : socket->recv_socket = NULL;
276 1180 : }
277 :
278 :
279 : /**
280 : * Initializes an unbuffered listen socket.
281 : *
282 : * @param socket: The socket to intialize
283 : * @param ctx: The netio context
284 : */
285 : void
286 646 : netio_init_listen_socket(struct netio_listen_socket* socket, struct netio_context* ctx, struct netio_unbuffered_socket_attr* attr)
287 : {
288 646 : memset(socket, 0, sizeof(*socket));
289 646 : socket->ctx = ctx;
290 646 : socket->recv_sockets = NULL;
291 646 : if (attr == NULL){
292 : socket->attr.buffer_size = 0;
293 : socket->attr.num_buffers = 0;
294 : } else {
295 184 : if (attr->num_buffers > NETIO_DOMAIN_MAX_MR){
296 0 : log_error("Number of recv buffers requested %lu exceeds max value. Using %lu.", attr->num_buffers, NETIO_DOMAIN_MAX_MR);
297 0 : attr->num_buffers = NETIO_DOMAIN_MAX_MR;
298 : }
299 184 : socket->attr = *attr;
300 : }
301 646 : }
302 :
303 :
304 : /**
305 : * Initializes an unbuffered recv socket.
306 : *
307 : * @param socket: The socket to intialize
308 : * @param ctx: The netio context
309 : */
310 : void
311 463 : netio_init_recv_socket(struct netio_recv_socket* socket, struct netio_listen_socket* lsocket)
312 : {
313 463 : memset(socket, 0, sizeof(*socket));
314 463 : socket->ctx = lsocket->ctx;
315 463 : socket->lsocket = lsocket;
316 463 : socket->reg_mr = 0;
317 463 : socket->cq_size = NETIO_MAX_CQ_EVENTS;
318 463 : socket->sub_msg_buffers = NULL;
319 463 : socket->mr = malloc(NETIO_DOMAIN_MAX_MR*sizeof(struct fid*));
320 463 : }
321 :
322 :
323 : void
324 1105 : netio_connect(struct netio_send_socket* socket, const char* hostname, unsigned port)
325 : {
326 1105 : netio_connect_domain(socket, hostname, port, NULL);
327 1105 : }
328 :
329 : /**
330 : * Connect an unbuffered send socket to a remote endpoint.
331 : *
332 : * @param socket: An unbuffered send socket
333 : * @param hostname: Hostname or IP address of the remote endpoint
334 : * @param port: Port of the remote endpoint
335 : * @param domain: an already existing and initialized netio_domain that this connect will use instead of creating its own
336 : * Obviously this can only work if the socket's actual domain (physical NIC) would refer to the same as domain's
337 : */
338 : void
339 1105 : netio_connect_domain(struct netio_send_socket* socket, const char* hostname, unsigned port, struct netio_domain* domain)
340 : {
341 1105 : log_dbg("_socket_init_info");
342 1105 : if ((_socket_init_info(socket, hostname, port, NULL, 0))) return;
343 1097 : if (domain == NULL) {
344 1097 : log_dbg("_socket_init_domain in netio_connect_domain");
345 1097 : if ((_socket_init_domain(socket, hostname, port, NULL, 0))) return;
346 : } else {
347 0 : domain->nb_sockets += 1;
348 0 : socket->domain = domain;
349 : }
350 1097 : _socket_connect(socket, hostname, port, NULL, 0);
351 : }
352 :
353 :
354 43 : void netio_connect_rawaddr(struct netio_send_socket* socket, void* addr, size_t addrlen)
355 : {
356 43 : netio_connect_rawaddr_domain(socket, addr, addrlen, NULL);
357 43 : }
358 :
359 :
360 : /**
361 : * Same as netio_connect_domain but using the ip address and port
362 : *
363 : * @param socket: An unbuffered send socket
364 : * @param addr: IP and port of the remote endpoint
365 : * @param addrlen: lenght of the address
366 : * @param domain: an already existing and initialized netio_domain that this connect will use instead of creating its own
367 : * Obviously this can only work if the socket's actual domain (physical NIC) would refer to the same as domain's
368 : */
369 75 : void netio_connect_rawaddr_domain(struct netio_send_socket* socket, void* addr, size_t addrlen, struct netio_domain* domain)
370 : {
371 75 : log_dbg("_socket_init_info");
372 75 : if ((_socket_init_info(socket, NULL, 0, addr, addrlen))) return;
373 75 : if (domain == NULL) {
374 70 : log_dbg("_socket_init_domain in netio_connect_rawaddr_domain");
375 70 : if ((_socket_init_domain(socket, NULL, 0, addr, addrlen))) return;
376 : } else {
377 5 : log_dbg("socket %p keeping domain %p with %d sockets", socket, domain, domain->nb_sockets);
378 5 : domain->nb_sockets += 1;
379 5 : socket->domain = domain;
380 : }
381 75 : _socket_connect(socket, NULL, 0, addr, addrlen);
382 : }
383 :
384 :
385 : /**
386 : * Disconnect a connected unbuffered send socket.
387 : *
388 : * @param socket: A connected unbuffered send socket
389 : */
390 : void
391 767 : netio_disconnect(struct netio_send_socket* socket)
392 : {
393 767 : int ret=0;
394 767 : if(!socket->ep){
395 : log_dbg("Send socket %p EP not initialised, not calling fi_shutdown.", socket);
396 : return;
397 : }
398 767 : if((ret = fi_shutdown(socket->ep, 0)))
399 : {
400 0 : log_error("Failed to shutdown send_socket, error %d: %s", ret, fi_strerror(-ret));
401 0 : ON_ERROR_CONNECTION_REFUSED(socket, "fi_shutdown failed", ret);
402 0 : return;
403 : }
404 : }
405 :
406 :
407 :
408 : void
409 0 : netio_connection_shutdown(void* ptr)
410 : {
411 0 : log_dbg("Handle_connection_shutdown.");
412 0 : struct signal_data* sd = (struct signal_data*)ptr;
413 0 : struct netio_send_socket* socket = (struct netio_send_socket*)sd->ptr;
414 0 : int ret=0;
415 0 : if(!socket->ep){
416 : log_dbg("Send socket %p EP not initialised, not calling fi_shutdown.", socket);
417 : return;
418 : }
419 0 : if((ret = fi_shutdown(socket->ep, 0)))
420 : {
421 0 : ON_ERROR_CONNECTION_REFUSED(socket, "fi_shutdown failed", ret);
422 0 : return;
423 : }
424 :
425 : //clean up signal
426 0 : netio_signal_close(sd->evloop, sd->signal);
427 0 : free(sd->signal);
428 0 : free(sd);
429 : }
430 :
431 :
432 : /**
433 : * Bind an unbuffered listen socket to an endpoint and listen for incoming connections.
434 : *
435 : * @param socket: An unbuffered listen socket
436 : * @param hostname: Hostname or IP address of an endpoint
437 : * @param port: A port number to listen on
438 : */
439 : void
440 335 : netio_listen(struct netio_listen_socket* socket, const char* hostname, unsigned port)
441 : {
442 335 : int ret=0;
443 335 : struct fi_info* hints;
444 335 : struct fi_eq_attr eq_attr;
445 335 : eq_attr.wait_obj = FI_WAIT_FD;
446 :
447 335 : hints = fi_allocinfo();
448 335 : hints->addr_format = FI_FORMAT_UNSPEC;
449 335 : hints->ep_attr->type = FI_EP_MSG;
450 335 : hints->caps = FI_MSG;
451 335 : hints->mode = FI_LOCAL_MR;
452 335 : char port_addr[32];
453 335 : snprintf(port_addr, 32, "%u", port);
454 :
455 : //Resource initialisation
456 335 : socket->eqfd = -1;
457 335 : socket->pep = NULL;
458 335 : socket->eq = NULL;
459 335 : socket->fi = NULL;
460 :
461 335 : if((ret = fi_getinfo(FI_VERSION(1, 1), hostname, port_addr, FI_SOURCE, hints,
462 : &socket->fi)))
463 : {
464 4 : log_error("Failed to get info on local interface, error %d: %s", ret, fi_strerror(-ret));
465 4 : ON_ERROR_BIND_REFUSED(socket, "fi_getinfo failed", ret);
466 23 : return;
467 : }
468 331 : log_dbg("addr format: %x", socket->fi->addr_format);
469 :
470 331 : if((ret = fi_fabric(socket->fi->fabric_attr, &socket->fabric, NULL)))
471 : {
472 0 : log_error("Failed to open fabric for listen socket, error %d: %s", ret, fi_strerror(-ret));
473 0 : ON_ERROR_BIND_REFUSED(socket, "fi_fabric failed", ret);
474 0 : return;
475 : }
476 331 : if((ret = fi_eq_open(socket->fabric, &eq_attr, &socket->eq, NULL)))
477 : {
478 0 : log_error("Failed to open Event Queue for listen socket, error %d: %s", ret, fi_strerror(-ret));
479 0 : ON_ERROR_BIND_REFUSED(socket, "fi_eq_open failed", ret);
480 0 : return;
481 : }
482 :
483 331 : if((ret = fi_passive_ep(socket->fabric, socket->fi, &socket->pep, NULL)))
484 : {
485 0 : log_error("Failed to open passive endpoint for listen socket, error %d: %s", ret, fi_strerror(-ret));
486 0 : ON_ERROR_BIND_REFUSED(socket, "fi_passive_ep failed", ret);
487 0 : return;
488 : }
489 :
490 331 : if((ret = fi_pep_bind(socket->pep, &socket->eq->fid, 0)))
491 : {
492 0 : log_error("Failed to bind passive endpoint for listen socket, error %d: %s", ret, fi_strerror(-ret));
493 0 : ON_ERROR_BIND_REFUSED(socket, "fi_pep_bind failed", ret);
494 0 : return;
495 : }
496 :
497 331 : if((ret = fi_listen(socket->pep)))
498 : {
499 15 : log_error("Failed to enable listen socket, error %d: %s", ret, fi_strerror(-ret));
500 15 : ON_ERROR_BIND_REFUSED(socket, "fi_listen failed", ret);
501 15 : return;
502 : }
503 :
504 316 : if((ret = fi_control(&socket->eq->fid, FI_GETWAIT, &socket->eqfd)))
505 : {
506 0 : log_error("Failed to retrive listen socket Event Queue wait object, error %d: %s", ret, fi_strerror(-ret));
507 0 : ON_ERROR_BIND_REFUSED(socket, "fi_control failed", ret);
508 0 : return;
509 : }
510 :
511 316 : socket->eq_ev_ctx.fd = socket->eqfd;
512 316 : socket->eq_ev_ctx.data = socket;
513 316 : socket->eq_ev_ctx.cb = on_listen_socket_cm_event;
514 :
515 : //TODO: The listen EQ should be added too, but I don't know how to remove it to avoid a trywait SEGFAULT.
516 : //printf("Adding LISTEN EQ polled fid %d %p \n", socket->eqfd, &socket->eq->fid);
517 : //add_polled_fid(&socket->ctx->evloop.pfids, socket->fabric, &socket->eq->fid, socket->eqfd, socket, on_listen_socket_cm_event);
518 316 : add_open_fd(&socket->ctx->evloop.openfds, socket->eqfd, NETIO_EQ, ULISTEN, socket);
519 :
520 316 : netio_register_read_fd(&socket->ctx->evloop, &socket->eq_ev_ctx);
521 316 : log_dbg("netio_listen_socket: registering EQ fd %d", socket->eqfd);
522 316 : fi_freeinfo(hints);
523 : }
524 :
525 :
526 : /**
527 : * Retrieve the local endpoint address and store it in the sockaddr_storage sa.
528 : * The function returns the lenght of the address.
529 : *
530 : * @param socket: An unbuffered listen socket
531 : * @param sa: structure storing the local endpoint address
532 : */
533 20 : size_t netio_listen_socket_endpoint(struct netio_listen_socket* socket, struct sockaddr_storage *sa)
534 : {
535 20 : size_t addrlen;
536 : //memset(sa, 0, sizeof(*sa));
537 20 : addrlen = sizeof(struct sockaddr_storage);
538 :
539 20 : int ret=0;
540 20 : if((ret = fi_getname(&socket->pep->fid, sa, &addrlen)))
541 : {
542 0 : log_error("Failed to retrieve the local endpoint address, error %d: %s", ret, fi_strerror(-ret));
543 0 : ON_ERROR_BIND_REFUSED(socket, "fi_getname failed", ret);
544 0 : return 0;
545 : }
546 :
547 20 : return addrlen;
548 : }
549 :
550 :
551 : /**
552 : * Post a receive buffer to an unbuffered receive socket.
553 : *
554 : * Receive buffers must be registered using `netio_register_recv_buffer`.
555 : *
556 : * @param socket: An unbuffered receive socket
557 : * @param buf: A registered receive buffer.
558 : */
559 : void
560 66055566 : netio_post_recv(struct netio_recv_socket* socket, struct netio_buffer* buf)
561 : {
562 66055566 : int ret=0;
563 66055566 : struct iovec iov;
564 66055566 : void* desc;
565 66055566 : uint64_t flags;
566 :
567 66055566 : iov.iov_base = buf->data;
568 66055566 : iov.iov_len = buf->size;
569 66055566 : desc = fi_mr_desc(buf->mr);
570 :
571 66055566 : struct fi_msg msg;
572 66055566 : msg.msg_iov = &iov; /* scatter-gather array */
573 66055566 : msg.desc = &desc;
574 66055566 : msg.iov_count = 1;
575 66055566 : msg.addr = 0;
576 66055566 : msg.context = buf;
577 66055566 : msg.data = 0;
578 :
579 66055566 : flags = FI_REMOTE_CQ_DATA;//FI_MULTI_RECV;
580 :
581 66055566 : if((ret = fi_recvmsg(socket->ep, &msg, flags)) != 0)
582 : {
583 0 : log_error("Failed to post a buffer to receive inbound messages, error %d: %s", ret, fi_strerror(-ret));
584 : }
585 66055600 : }
586 :
587 : /**
588 : * Removes recv socket from recv socket list of a listen socket.
589 : *
590 : * @param socket: An unbuffered receive socket.
591 : */
592 : void
593 0 : netio_remove_recv_socket(struct netio_recv_socket* socket){
594 0 : struct netio_listen_socket* lsocket = socket->lsocket;
595 0 : int ret = remove_socket(&lsocket->recv_sockets, (void*)socket);
596 0 : if (ret == 0){
597 0 : log_info("Unbuffered connection closed, recv socket deleted.");
598 : } else {
599 0 : log_warn("Unbuffered connection closed, could not delete recv socket.");
600 : }
601 0 : }
602 :
603 : /**
604 : * Removes buffered recv socket from recv socket list of a buffered listen socket.
605 : *
606 : * @param socket: An buffered receive socket.
607 : */
608 : void
609 0 : netio_buffered_remove_recv_socket(struct netio_buffered_recv_socket* socket){
610 0 : struct netio_buffered_listen_socket* lsocket = socket->lsocket;
611 0 : int ret = remove_socket(&(lsocket->listen_socket.recv_sockets), (void*)socket);
612 0 : if (ret == 0){
613 0 : log_info("Buffered connection closed, buffered recv socket deleted.");
614 : } else {
615 0 : log_warn("Buffered connection closed, could not delete recv socket.");
616 : }
617 0 : }
618 :
619 : static int
620 37922337 : generic_sendmsg(struct netio_send_socket* socket,
621 : struct iovec* iov,
622 : void** desc,
623 : size_t count,
624 : uint64_t key,
625 : uint64_t add_flags,
626 : uint64_t imm)
627 : {
628 37922337 : int ret=0;
629 37922337 : uint64_t flags;
630 :
631 37922337 : struct fi_msg msg;
632 37922337 : msg.msg_iov = iov; /* scatter-gather array */
633 37922337 : msg.desc = desc;
634 37922337 : msg.iov_count = count;
635 37922337 : msg.addr = 0;
636 37922337 : msg.context = (void*)key;
637 37922337 : msg.data = imm;
638 :
639 37922337 : log_trc("sending iov message with immediate value 0x%lx", msg.data);
640 :
641 37922337 : flags = FI_INJECT_COMPLETE | add_flags;
642 :
643 37922337 : if(!socket->ep || !socket->ep->msg){
644 0 : log_error("Failed sending message because of null message or null endpoint.");
645 0 : return NETIO_STATUS_ERROR;
646 : }
647 :
648 37922337 : if((ret = fi_sendmsg(socket->ep, &msg, flags)) != 0)
649 : {
650 0 : if(ret == -FI_EAGAIN) {
651 : return NETIO_STATUS_AGAIN;
652 : }
653 0 : log_error("Failed to send message error (IOV count %lu, key %lu) %d: %s.", count, key, ret, fi_strerror(-ret));
654 0 : return NETIO_STATUS_ERROR;
655 : }
656 : return NETIO_STATUS_OK;
657 : }
658 :
659 :
660 : /**
661 : * Sends a full buffer over a connected unbuffered send socket.
662 : *
663 : * @param socket: A connected, unbuffered send socket
664 : * @param buf: A registered send buffer
665 : */
666 : int
667 1087090 : netio_send_buffer(struct netio_send_socket* socket, struct netio_buffer* buf)
668 : {
669 1087090 : return netio_send(socket, buf, buf->data, buf->size, (uint64_t)buf);
670 : }
671 :
672 :
673 : /**
674 : * Sends a buffer over a connected unbuffered send socket as inline data.
675 : *
676 : * @param socket: A connected, unbuffered send socket
677 : * @param buf: A buffer whose size does not exceed FI_VERBS_INLINE_SIZE (libfabric default 64 bytes)
678 : * The buffer is reusable as soon as the function returns as it is copied by the provider.
679 : * The buffer is not required to be a registered memory region as no RDMA transfer occurs.
680 : * FI_VERBS_INLINE_SIZE corresponds to IBV_SEND_INLINE
681 : */
682 : int
683 4777 : netio_send_inline_buffer(struct netio_send_socket* socket, struct netio_buffer* buf)
684 : {
685 4777 : struct iovec iov;
686 4777 : void* desc;
687 :
688 4777 : iov.iov_base = buf->data;
689 4777 : iov.iov_len = buf->size;
690 4777 : desc = fi_mr_desc(buf->mr);
691 4777 : uint64_t key = (uint64_t)buf;
692 4777 : uint64_t flags = FI_INJECT;
693 :
694 4777 : return generic_sendmsg(socket, /* struct netio_send_socket* socket */
695 : &iov, /* struct iovec* iov */
696 : &desc, /* void** desc */
697 : 1, /* size_t count */
698 : key, /* uint64_t key */
699 : flags, /* uint64_t add_flags */
700 : 0 /* uint64_t imm */
701 : );
702 : }
703 :
704 :
705 : /**
706 : * Sends a partial buffer over a connected unbuffered send socket.
707 : *
708 : * @param socket: A connected, unbuffered send socket
709 : * @param buffer: A registered send buffer
710 : * @param addr: Pointer to message within the buffer
711 : * @param size: Size of the message
712 : * @param key: Message key used to track the message progress.
713 : */
714 : int
715 1087536 : netio_send(struct netio_send_socket* socket, struct netio_buffer* buf, void* addr, size_t size, uint64_t key)
716 : {
717 1087536 : struct iovec iov;
718 1087536 : void* desc;
719 :
720 1087536 : iov.iov_base = addr;
721 1087536 : iov.iov_len = size;
722 1087536 : desc = fi_mr_desc(buf->mr);
723 :
724 1087536 : return generic_sendmsg(socket, /* struct netio_send_socket* socket */
725 : &iov, /* struct iovec* iov */
726 : &desc, /* void** desc */
727 : 1, /* size_t count */
728 : key, /* uint64_t key */
729 : 0, /* uint64_t add_flags */
730 : 0 /* uint64_t imm */
731 : );
732 : }
733 :
734 :
735 : /**
736 : * Sends a partial buffer with immediate data over a connected unbuffered send socket.
737 : *
738 : * @param socket: A connected, unbuffered send socket
739 : * @param buffer: A registered send buffer
740 : * @param addr: Pointer to message within the buffer
741 : * @param size: Size of the message
742 : * @param key: Message key used to track the message progress.
743 : * @param imm: Immediate data, up to 8 byte (size is implementation-dependent)
744 : */
745 : int
746 20000 : netio_send_imm(struct netio_send_socket* socket, struct netio_buffer* buf, void* addr, size_t size, uint64_t key, uint64_t imm)
747 : {
748 20000 : struct iovec iov;
749 20000 : void* desc;
750 :
751 20000 : iov.iov_base = addr;
752 20000 : iov.iov_len = size;
753 20000 : desc = fi_mr_desc(buf->mr);
754 :
755 20000 : return generic_sendmsg(socket, /* struct netio_send_socket* socket */
756 : &iov, /* struct iovec* iov */
757 : &desc, /* void** desc */
758 : 1, /* size_t count */
759 : key, /* uint64_t key */
760 : FI_REMOTE_CQ_DATA, /* uint64_t add_flags */
761 : imm /* uint64_t imm */
762 : );
763 : }
764 :
765 :
766 : /**
767 : * Sends a partial buffer data over a connected unbuffered send socket.
768 : *
769 : * @param socket: A connected, unbuffered send socket
770 : * @param buffer: A vector of registered send buffers
771 : * @param iov: Scatter/gather buffer describing message within the buffers
772 : * @param count: Size of the scatter/gather vector
773 : * @param key: Message key used to track the message progress.
774 : */
775 : int
776 0 : netio_sendv(struct netio_send_socket* socket, struct netio_buffer** buf, struct iovec* iov, size_t count, uint64_t key)
777 : {
778 0 : void* descarray[NETIO_MAX_IOV_LEN];
779 :
780 0 : for(unsigned i=0; i<count; i++) {
781 0 : descarray[i] = fi_mr_desc(buf[i]->mr);
782 : }
783 :
784 0 : return generic_sendmsg(socket, /* struct netio_send_socket* socket */
785 : iov, /* struct iovec* iov */
786 : descarray, /* void** desc */
787 : count, /* size_t count */
788 : key, /* uint64_t key */
789 : 0, /* uint64_t add_flags */
790 : 0 /* uint64_t imm */
791 : );
792 : }
793 :
794 :
795 : /**
796 : * Sends a partial buffer with immediate data over a connected unbuffered send socket.
797 : *
798 : * @param socket: A connected, unbuffered send socket
799 : * @param buffer: A vector of registered send buffers
800 : * @param iov: Scatter/gather buffer describing message within the buffers
801 : * @param count: Size of the scatter/gather vector
802 : * @param key: Message key used to track the message progress.
803 : * @param imm: Immediate data, up to 8 byte (size is implementation-dependent)
804 : */
805 : int
806 36810024 : netio_sendv_imm(struct netio_send_socket* socket, struct netio_buffer** buf, struct iovec* iov, size_t count, uint64_t key, uint64_t imm)
807 : {
808 36810024 : void* descarray[NETIO_MAX_IOV_LEN];
809 :
810 232161014 : for(unsigned i=0; i<count; i++) {
811 195350990 : descarray[i] = fi_mr_desc(buf[i]->mr);
812 : }
813 :
814 36810024 : return generic_sendmsg(socket, /* struct netio_send_socket* socket */
815 : iov, /* struct iovec* iov */
816 : descarray, /* void** desc */
817 : count, /* size_t count */
818 : key, /* uint64_t key */
819 : FI_REMOTE_CQ_DATA, /* uint64_t add_flags */
820 : imm /* uint64_t imm */
821 : );
822 : }
|