Line data Source code
1 : #include <stdio.h>
2 : #include <string.h>
3 : #include <assert.h>
4 : #include <netdb.h>
5 : #include "log.h"
6 : #include "netio/netio.h"
7 : #include "netio/netio_tcp.h"
8 :
9 : #if defined DEBUG || defined DEBUG_PUB
10 : #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
11 : #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
12 : #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
13 : #else
14 : #define log_dbg(...)
15 : #define log_trc(...)
16 : #endif
17 :
18 :
19 : char*
20 497 : netio_domain_name_lookup(const char* domain_name)
21 : {
22 497 : if(!domain_name) return NULL;
23 497 : struct sockaddr_in sock_address;
24 497 : char* ip_address = (char*)malloc(sizeof(char) * 17);
25 497 : int is_ip_address = inet_pton(AF_INET, domain_name, &(sock_address.sin_addr));
26 497 : if(!is_ip_address)
27 : {
28 4 : struct hostent* host = gethostbyname(domain_name);
29 4 : if(host)
30 : {
31 2 : strcpy(ip_address, inet_ntoa(*(struct in_addr *)host->h_addr));
32 : }
33 : else
34 : {
35 2 : char* _domain_name = malloc(sizeof(char) * (strlen(domain_name) + 1));
36 2 : strcpy(_domain_name, domain_name);
37 2 : log_error("The host cannot be resolved. Domain name set to %s", _domain_name);
38 2 : free(ip_address);
39 2 : return _domain_name;
40 : }
41 : }
42 : else
43 : {
44 493 : strcpy(ip_address, domain_name);
45 : }
46 :
47 : return ip_address;
48 : }
49 :
50 : static int
51 10129 : cmp_subscription(const void* a, const void *b)
52 : {
53 10129 : struct netio_subscription* suba = (struct netio_subscription*)a;
54 10129 : struct netio_subscription* subb = (struct netio_subscription*)b;
55 :
56 10129 : if(suba->tag == subb->tag) {
57 : return 0;
58 : }
59 5 : return suba->tag > subb->tag ? 1 : -1;
60 : }
61 :
62 : static int
63 24 : table_add_subscription(struct netio_subscription_table* table, netio_tag_t tag, struct netio_buffered_send_socket* socket)
64 : {
65 24 : if(table->num_subscriptions == table->size) {
66 0 : log_error("Maximum number of subscriptions. New subscription for %lx dropped.", tag);
67 0 : return 1;
68 : }
69 :
70 : // TODO need to keep the list sorted
71 24 : table->subscriptions[table->num_subscriptions].tag = tag;
72 24 : table->subscriptions[table->num_subscriptions].socket = socket;
73 24 : table->subscriptions[table->num_subscriptions].again = 0;
74 :
75 : log_dbg("Adding connection in subscription table, tag=%lu, socket=%p",
76 : table->subscriptions[table->num_subscriptions].tag,
77 : table->subscriptions[table->num_subscriptions].socket
78 24 : );
79 :
80 24 : table->num_subscriptions++;
81 24 : table->ts++;
82 24 : log_info("New entry in buffered subscription table, tag=%lu in hex 0x%lx, socket=%p, total #subscription=%lu", tag, tag, socket, table->num_subscriptions);
83 24 : qsort(table->subscriptions, table->num_subscriptions, sizeof(struct netio_subscription), cmp_subscription);
84 24 : return 0;
85 : }
86 :
87 :
88 : /**
89 : * @brief Handle an unsubscription or a client disconnection.
90 : *
91 : * @param netio_subscription_table: the table of active subscriptions
92 : * @param socket: the unbuffered send socket used to send the data to the subscriber
93 : * @param tag: the tag for which an unsubscribe request has been received
94 : * @param closed_connection: a flag to enable the removal of all the subscriptions associated to
95 : * the send socket in response to a closed connection.
96 : */
97 : static void
98 23 : table_remove_subscription(struct netio_subscription_table* table, struct netio_buffered_send_socket* socket, netio_tag_t tag, uint8_t closed_connection)
99 : {
100 23 : log_dbg("Total subscriptions: %lu", table->num_subscriptions);
101 23 : unsigned i=0;
102 23 : unsigned remaining_subscriptions_of_socket=0;
103 48 : for(unsigned int k = 0; k < table->num_subscriptions; ++k) {
104 25 : if(table->subscriptions[k].socket == socket){remaining_subscriptions_of_socket++;}
105 : }
106 :
107 :
108 48 : while(i<table->num_subscriptions) {
109 25 : if(table->subscriptions[i].socket == socket && (table->subscriptions[i].tag == tag || closed_connection)) {
110 : log_dbg("Removing entry in subscription table, tag=%lu, socket=%p index %lu becomes %u",
111 : table->subscriptions[i].tag,
112 : table->subscriptions[i].socket,
113 : table->num_subscriptions-1,
114 23 : i);
115 23 : table->subscriptions[i].tag = table->subscriptions[table->num_subscriptions-1].tag;
116 23 : table->subscriptions[i].socket = table->subscriptions[table->num_subscriptions-1].socket;
117 23 : table->subscriptions[i].again = table->subscriptions[table->num_subscriptions-1].again;
118 23 : table->num_subscriptions--;
119 23 : remaining_subscriptions_of_socket--;
120 23 : table->ts++;
121 : }
122 : else{
123 2 : i++;
124 48 : log_dbg("Skipping tag=%lu because the socket is %p", table->subscriptions[i].tag, table->subscriptions[i].socket);
125 : }
126 : }
127 23 : qsort(table->subscriptions, table->num_subscriptions, sizeof(struct netio_subscription), cmp_subscription);
128 23 : log_dbg("Remaining subscriptions: %lu",table->num_subscriptions);
129 23 : log_info("Removing entry from buffered subscription table, Fid=0x%lx, socket=%p, total #subscription=%lu", tag, socket, table->num_subscriptions);
130 23 : if(closed_connection==0 && remaining_subscriptions_of_socket==0){
131 23 : log_warn("Disconnecting endpoint with zero subscriptions");
132 23 : if (socket->send_socket.tcp_fi_mode == NETIO_MODE_TCP){
133 : // netio_disconnect(&socket->send_socket); // TODO prevents re-subscription
134 21 : } else if (socket->send_socket.tcp_fi_mode == NETIO_MODE_LIBFABRIC){
135 21 : netio_disconnect(&socket->send_socket);
136 : }
137 : }
138 23 : }
139 :
140 :
141 : static int
142 23 : table_get_subs_by_socket(struct netio_subscription_table* table, struct netio_buffered_send_socket* socket)
143 : {
144 23 : log_dbg("Subscription table lookup: Total subscriptions: %lu", table->num_subscriptions);
145 23 : unsigned remaining_subscriptions_of_socket=0;
146 23 : for(unsigned int k = 0; k < table->num_subscriptions; ++k) {
147 0 : if(table->subscriptions[k].socket == socket){remaining_subscriptions_of_socket++;}
148 : }
149 23 : log_dbg("Subscription table lookup: for socket %p: %lu", socket, remaining_subscriptions_of_socket);
150 23 : return remaining_subscriptions_of_socket;
151 : }
152 :
153 : static void
154 212184 : on_buffer_available(void* ptr)
155 : {
156 212184 : log_trc("a buffer became available, calling callback");
157 212184 : struct netio_publish_socket* socket = (struct netio_publish_socket*)ptr;
158 212184 : if(socket->cb_buffer_available) {
159 212184 : socket->cb_buffer_available(socket);
160 : }
161 212184 : }
162 :
163 :
164 : static void
165 24 : pubsocket_on_connection_established(struct netio_buffered_send_socket* socket)
166 : {
167 24 : log_dbg("publish socket established connection to remote, can publish now");
168 24 : struct netio_publish_socket* pubsocket = (struct netio_publish_socket*)socket->pub_socket;
169 : //add deferred subscriptions to the table
170 24 : struct deferred_subscription* sub = socket->send_socket.deferred_subs;
171 48 : while(sub){
172 24 : int ret = table_add_subscription(&pubsocket->subscription_table, sub->tag, socket);
173 24 : if(pubsocket->cb_subscribe && ret == 0){
174 6 : pubsocket->cb_subscribe(pubsocket, sub->tag, NULL, 0);
175 : }
176 24 : pop_subscription(&sub);
177 : }
178 : //user callback
179 24 : if(pubsocket->cb_connection_established) {
180 6 : pubsocket->cb_connection_established(pubsocket);
181 : }
182 24 : }
183 :
184 :
185 : static void
186 23 : pubsocket_on_connection_closed(struct netio_buffered_send_socket* socket)
187 : {
188 23 : log_dbg("published socket with buffered send socket %p and send socket %p: connection to remote was closed", socket, &socket->send_socket);
189 23 : struct netio_publish_socket* pubsocket = (struct netio_publish_socket*)socket->pub_socket;
190 23 : if(pubsocket->cb_connection_closed) {
191 5 : pubsocket->cb_connection_closed(pubsocket);
192 : }
193 : //Only if the connection was closed without unsubscribing first.
194 23 : if (table_get_subs_by_socket(&pubsocket->subscription_table, socket)){
195 0 : log_dbg("Removing a send socket for which a connection was closed without unsubscribing first");
196 0 : uint8_t connection_closed = 1;
197 0 : table_remove_subscription(&pubsocket->subscription_table, socket, 0, connection_closed);
198 : }
199 23 : }
200 :
201 :
202 : static struct netio_buffered_send_socket*
203 24 : socket_list_add_or_lookup(struct netio_publish_socket* pubsocket,
204 : struct netio_socket_list** list,
205 : void* addr, size_t addrlen,
206 : int port,
207 : struct netio_context* ctx,
208 : struct netio_buffered_socket_attr* attr)
209 : {
210 24 : if(addrlen == 0) {
211 : return NULL;
212 : }
213 :
214 24 : struct netio_socket_list* entry = find_socket_by_address(*list, addr, addrlen, port);
215 :
216 24 : if ( entry == NULL ) {
217 24 : entry = add_socket_with_address(list, BSEND, addr, addrlen, port);
218 24 : struct netio_buffered_send_socket* bufsocket = entry->socket;
219 :
220 24 : if (pubsocket->tcp_fi_mode==NETIO_MODE_LIBFABRIC) {
221 22 : netio_buffered_send_socket_init(bufsocket, ctx, attr);
222 22 : bufsocket->pub_socket = pubsocket;
223 22 : bufsocket->cb_connection_established = pubsocket_on_connection_established;
224 22 : bufsocket->cb_connection_closed = pubsocket_on_connection_closed;
225 22 : netio_buffered_connect_rawaddr(bufsocket, entry->addr, entry->addrlen);
226 : }
227 2 : else if (pubsocket->tcp_fi_mode==NETIO_MODE_TCP){
228 2 : netio_buffered_send_tcp_socket_init(bufsocket, ctx, attr);
229 2 : bufsocket->pub_socket = pubsocket;
230 2 : bufsocket->cb_connection_established = pubsocket_on_connection_established;
231 2 : bufsocket->cb_connection_closed = pubsocket_on_connection_closed;
232 2 : log_dbg("Calling netio_connect_tcp for addr=<%s> port=%d", (char*)entry->addr, port);
233 2 : netio_connect_tcp(&bufsocket->send_socket, entry->addr, port);
234 : }
235 : else {
236 0 : log_error("socket_list_add_or_lookup called for unkown connection mode %d", pubsocket->tcp_fi_mode);
237 0 : remove_socket(list, bufsocket);
238 0 : return NULL;
239 : }
240 :
241 24 : bufsocket->signal_buffer_available.data = pubsocket;
242 24 : bufsocket->signal_buffer_available.cb = on_buffer_available;
243 : }
244 :
245 24 : struct netio_buffered_send_socket* ss = (struct netio_buffered_send_socket*)entry->socket;
246 24 : return ss;
247 : }
248 :
249 :
250 : static void
251 24 : subscribe(struct netio_publish_socket* pubsocket, struct netio_recv_socket* recv_socket, void* addr, size_t addrlen, int port, netio_tag_t tag)
252 : {
253 24 : struct netio_buffered_send_socket* socket = socket_list_add_or_lookup(pubsocket, &pubsocket->subscription_table.socket_list, addr, addrlen, port, pubsocket->ctx, &pubsocket->attr);
254 :
255 24 : if(socket->send_socket.recv_socket == NULL){
256 24 : socket->send_socket.recv_socket = recv_socket;
257 : }
258 :
259 24 : if (socket->send_socket.state == CONNECTED){
260 0 : table_add_subscription(&pubsocket->subscription_table, tag, socket);
261 0 : if(pubsocket->cb_subscribe) {
262 0 : pubsocket->cb_subscribe(pubsocket, tag, addr, addrlen);
263 : }
264 : } else {
265 24 : push_back_subscription(&socket->send_socket.deferred_subs, tag);
266 : }
267 24 : }
268 :
269 :
270 : static void
271 23 : unsubscribe(struct netio_publish_socket* pubsocket, void* addr, size_t addrlen, int port, netio_tag_t tag)
272 : {
273 23 : struct netio_socket_list* list = find_socket_by_address(pubsocket->subscription_table.socket_list, addr, addrlen, port);
274 23 : if(list == NULL){return;}
275 :
276 23 : struct netio_buffered_send_socket* socket = (struct netio_buffered_send_socket*)list->socket;
277 23 : uint8_t connection_closed = 0;
278 23 : table_remove_subscription(&pubsocket->subscription_table, socket, tag, connection_closed);
279 23 : pubsocket->subscription_table.ts++;
280 :
281 23 : if(pubsocket->cb_unsubscribe) {
282 1 : pubsocket->cb_unsubscribe(pubsocket, tag, addr, addrlen);
283 : }
284 :
285 : }
286 :
287 :
288 : static void
289 24 : lsocket_on_connection_established(struct netio_recv_socket* socket)
290 : {
291 24 : log_dbg("Buffered listen socket: on connection established");
292 24 : if(socket->tcp_fi_mode == NETIO_MODE_TCP){
293 : //libfabric buffers posted in on_listen_socket_cm_event
294 2 : socket->sub_msg_buffers = malloc(32*sizeof(struct netio_buffer*));
295 66 : for (int i = 0; i < 32; i++){
296 64 : socket->sub_msg_buffers[i] = malloc(sizeof(struct netio_buffer));
297 64 : socket->sub_msg_buffers[i]->size = sizeof(struct netio_subscription_message);
298 64 : socket->sub_msg_buffers[i]->data = malloc(socket->sub_msg_buffers[i]->size);
299 64 : netio_post_recv(socket, socket->sub_msg_buffers[i]);
300 : }
301 : }
302 24 : }
303 :
304 :
305 : static void
306 47 : parse_subscription_message(struct netio_publish_socket* socket, struct netio_subscription_message* msg, struct netio_recv_socket* recv_socket)
307 : {
308 47 : if (msg->action){
309 24 : log_dbg("subscription request for tag %lu, addrlen=%lu", msg->tag, msg->addrlen);
310 24 : subscribe(socket, recv_socket, msg->addr, msg->addrlen, msg->port, msg->tag);
311 : }
312 : else{
313 23 : log_dbg("unsubscription request for tag %lu, addrlen=%lu", msg->tag, msg->addrlen);
314 23 : unsubscribe(socket, msg->addr, msg->addrlen, msg->port, msg->tag);
315 : }
316 47 : }
317 :
318 :
319 : static void
320 47 : lsocket_on_msg_received(struct netio_recv_socket* socket, struct netio_buffer* buf, void* data, size_t len)
321 : {
322 47 : log_dbg("message received by recv socket %p", socket);
323 47 : struct netio_publish_socket* pubsocket = (struct netio_publish_socket*)socket->lsocket->usr;
324 47 : if(len != sizeof(struct netio_subscription_message)) {
325 0 : log_error("Illegal subscription message size %lu. Did you confuse tohost and from host links", len);
326 0 : netio_post_recv(socket, buf);
327 0 : return;
328 : }
329 47 : parse_subscription_message(pubsocket, (struct netio_subscription_message*)buf->data, socket);
330 47 : netio_post_recv(socket, buf);
331 : }
332 :
333 :
334 : static int
335 2380 : send_subscription_message(struct netio_subscribe_socket* socket, netio_tag_t tag, int action)
336 : {
337 2380 : if( action == NETIO_SUBSCRIBE ){
338 1217 : log_info("Sending subscription for tag 0x%lx", tag);
339 1163 : } else if ( action == NETIO_UNSUBSCRIBE ){
340 1163 : log_info("Sending unsubscription for tag 0x%lx", tag);
341 : } else {
342 0 : log_error("Invalid subscription action %d", action);
343 0 : return 0;
344 : }
345 2380 : int ret = 0;
346 2380 : if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
347 1263 : socket->msg.tag = tag;
348 1263 : socket->msg.action = action;
349 1263 : socket->buf.data = &socket->msg;
350 1263 : socket->buf.size = sizeof(struct netio_subscription_message);
351 1263 : ret = netio_send_inline_buffer(&socket->socket, &socket->buf);
352 1117 : } else if (socket->tcp_fi_mode == NETIO_MODE_TCP){
353 : // look for available buf
354 : size_t id = 0;
355 2066 : while (socket->bufs[id].to_send) {
356 949 : id++;
357 : }
358 1117 : log_info("tag id %d", id);
359 1117 : socket->msgs[id] = socket->msgs[0]; // initialize msg to default
360 1117 : socket->msgs[id].tag = tag;
361 1117 : socket->msgs[id].action = action;
362 1117 : socket->bufs[id].data = &socket->msgs[id];
363 1117 : socket->bufs[id].size = sizeof(struct netio_subscription_message);
364 1117 : socket->bufs[id].to_send = 1;
365 1117 : ret = netio_send_buffer(&socket->socket, &socket->bufs[id]);
366 : } else {
367 0 : log_error("Socket connection type unsupported %d", socket->tcp_fi_mode);
368 0 : ret = 1;
369 : }
370 2380 : log_info("send_subscription_message done");
371 2380 : return ret;
372 : }
373 :
374 :
375 : static void
376 162 : subsocket_on_connection_established(struct netio_send_socket* socket)
377 : {
378 162 : log_dbg("subsocket connection established");
379 162 : int ret;
380 162 : struct netio_subscribe_socket* subscribe_socket = (struct netio_subscribe_socket*)socket->usr;
381 :
382 162 : if(subscribe_socket->total_tags == 0){
383 0 : log_info("Closing send connection again because there is no tag to subscribe to.");
384 0 : netio_disconnect(socket);
385 : }
386 162 : if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
387 105 : subscribe_socket->msg.addrlen = NETIO_MAX_ADDRLEN;
388 105 : if((ret = fi_getname(&subscribe_socket->recv_socket.listen_socket.pep->fid,
389 105 : subscribe_socket->msg.addr,
390 : &subscribe_socket->msg.addrlen)) != 0) {
391 0 : log_fatal("Cannot get information on local interface %d: %s", ret, fi_strerror(-ret));
392 0 : exit(1);
393 : }
394 105 : subscribe_socket->buf.data = &subscribe_socket->msg;
395 105 : subscribe_socket->buf.size = sizeof(struct netio_subscription_message);
396 105 : netio_register_send_buffer(socket, &subscribe_socket->buf, 0);
397 : }
398 57 : else if (socket->tcp_fi_mode == NETIO_MODE_TCP) {
399 : // initialize msgs[0] with defaults
400 57 : struct sockaddr sock_addr;
401 57 : socklen_t addrlen=sizeof(sock_addr);
402 57 : getsockname(socket->eq_ev_ctx.fd, &sock_addr, &addrlen);
403 57 : getnameinfo(&sock_addr, addrlen,
404 57 : subscribe_socket->msgs[0].addr,NETIO_MAX_ADDRLEN,
405 : NULL, 0, NI_NUMERICHOST);
406 57 : addrlen=strlen(subscribe_socket->msgs[0].addr);
407 57 : subscribe_socket->msgs[0].addr[addrlen] = 0;
408 57 : subscribe_socket->msgs[0].addrlen = addrlen+1;
409 57 : subscribe_socket->msgs[0].port = subscribe_socket->msg.port;
410 : }
411 :
412 : // send tags one by one
413 514 : while(subscribe_socket->total_tags > 0){
414 352 : size_t idx = subscribe_socket->total_tags - 1;
415 352 : netio_tag_t tag = subscribe_socket->tags_to_subscribe[idx];
416 352 : log_info("subsocket_on_connection_established, sending subscription for tag 0x%lx pos %lu", tag, idx);
417 352 : ret = send_subscription_message(subscribe_socket, tag, NETIO_SUBSCRIBE);
418 352 : if (ret == NETIO_STATUS_OK){
419 352 : subscribe_socket->total_tags--;
420 : } else {
421 0 : log_warn("subsocket_on_connection_established: send_subscription_message returned %d", ret);
422 0 : break;
423 : }
424 : }
425 162 : }
426 :
427 :
428 : static void
429 61 : subsocket_on_send_connection_closed(struct netio_send_socket* socket)
430 : {
431 61 : log_dbg("subsocket_on_send_connection_closed callback");
432 61 : struct netio_subscribe_socket* subscribe_socket = (struct netio_subscribe_socket*)socket->usr;
433 61 : subscribe_socket->state = NONE;
434 61 : if(socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
435 61 : handle_send_socket_shutdown(socket);
436 : }
437 0 : else if (socket->tcp_fi_mode == NETIO_MODE_TCP){
438 0 : handle_tcp_send_socket_shutdown(socket);
439 : }
440 61 : }
441 :
442 :
443 : static void
444 25 : subsocket_on_error_connection_refused(struct netio_send_socket* ssocket) {
445 25 : log_dbg("subsocket connection refused");
446 25 : struct netio_subscribe_socket* subscribe_socket = (struct netio_subscribe_socket*)ssocket->usr;
447 25 : subscribe_socket->state = NONE;
448 :
449 25 : handle_listen_socket_shutdown(&subscribe_socket->recv_socket.listen_socket);
450 25 : if(subscribe_socket->cb_error_connection_refused) {
451 25 : subscribe_socket->cb_error_connection_refused(subscribe_socket);
452 : }
453 25 : }
454 :
455 :
456 : static void
457 2363 : subsocket_on_send_completed(struct netio_send_socket* socket, uint64_t key)
458 : {
459 2363 : struct netio_subscribe_socket* subscribe_socket = (struct netio_subscribe_socket*)socket->usr;
460 : //check for remaining tags from on_connection_established
461 2363 : while(subscribe_socket->total_tags > 0){
462 0 : size_t idx = subscribe_socket->total_tags - 1;
463 0 : netio_tag_t tag = subscribe_socket->tags_to_subscribe[idx];
464 0 : int ret = send_subscription_message(subscribe_socket, tag, NETIO_SUBSCRIBE);
465 0 : if (ret == NETIO_STATUS_OK){
466 0 : subscribe_socket->total_tags--;
467 : } else {
468 0 : log_warn("subsocket_on_send_completed: send_subscription_message returned %d", ret);
469 0 : break;
470 : }
471 : }
472 2363 : }
473 :
474 :
475 : static void
476 163 : subscribe_socket_on_connection_established(struct netio_buffered_recv_socket* rsocket)
477 : {
478 163 : log_dbg("connection to subscribe socket has been established");
479 163 : struct netio_subscribe_socket* socket = (struct netio_subscribe_socket*)rsocket->lsocket->listen_socket.usr;
480 163 : if(socket->cb_connection_established) {
481 161 : socket->cb_connection_established(socket);
482 : }
483 163 : }
484 :
485 :
486 : static void
487 85 : subscribe_socket_on_connection_closed(struct netio_buffered_recv_socket* rsocket)
488 : {
489 85 : log_info("connection to subscribe socket has been closed");
490 85 : struct netio_subscribe_socket* socket = (struct netio_subscribe_socket*)rsocket->lsocket->listen_socket.usr;
491 85 : if(socket->cb_connection_closed) {
492 85 : socket->cb_connection_closed(socket);
493 : }
494 85 : }
495 :
496 :
497 : static void
498 1662952403 : subscribe_socket_on_msg_received(struct netio_buffered_recv_socket* rsocket, void* data, size_t len)
499 : {
500 1662952403 : log_trc("buffer received");
501 1662952403 : struct netio_subscribe_socket* socket = (struct netio_subscribe_socket*)rsocket->lsocket->listen_socket.usr;
502 :
503 1662952403 : if(len <= sizeof(netio_tag_t)) {
504 0 : log_error("Received invalid message of size %lu. Need %lu bytes for tag, at least 1 of payload", len, sizeof(netio_tag_t));
505 0 : return;
506 : }
507 :
508 1662952403 : netio_tag_t tag = *((netio_tag_t*)data);
509 1662952403 : if(socket->cb_msg_received) {
510 1662952403 : socket->cb_msg_received(socket, tag, (char *)data + sizeof(netio_tag_t), len - sizeof(netio_tag_t));
511 : }
512 : }
513 :
514 :
515 : void
516 41 : netio_subscription_table_init(struct netio_subscription_table* table)
517 : {
518 41 : table->socket_list = NULL;
519 41 : table->subscriptions = (struct netio_subscription*)malloc(NETIO_INITIAL_SUBSCRIPTIONS*sizeof(struct netio_subscription));
520 41 : table->num_subscriptions = 0;
521 41 : table->size = NETIO_INITIAL_SUBSCRIPTIONS;
522 41 : table->ts = 0;
523 41 : }
524 :
525 :
526 : /*
527 : * Initializes a buffered publish socket.
528 : *
529 : * @param socket: The buffered publish socket to initialize
530 : * @param ctx: The NetIO context in which to initialize the socket
531 : * @param hostname: Hostname or IP address to bind to
532 : * @param port: Port to bind to
533 : * @param attr: Buffered connection settings to be used for the underlying connections
534 : *
535 : * @see `netio_buffered_send_socket_init` for a description of the connection parameters
536 : */
537 : void
538 21 : netio_publish_libfabric_socket_init(struct netio_publish_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffered_socket_attr* attr)
539 : {
540 21 : memset(socket, 0, sizeof(*socket));
541 21 : socket->ctx = ctx;
542 21 : socket->tcp_fi_mode=NETIO_MODE_LIBFABRIC;
543 21 : netio_subscription_table_init(&socket->subscription_table);
544 21 : netio_init_listen_socket(&socket->lsocket, ctx, NULL);
545 21 : if (attr->num_pages > NETIO_DOMAIN_MAX_MR){
546 0 : log_error("Number of requested publisher netio pages %lu exceeds max value. Using %lu.", attr->num_pages, NETIO_DOMAIN_MAX_MR);
547 0 : attr->num_pages = NETIO_DOMAIN_MAX_MR;
548 : }
549 21 : memcpy(&socket->attr, attr, sizeof(struct netio_buffered_socket_attr));
550 21 : socket->lsocket.usr = socket;
551 21 : socket->lsocket.cb_connection_established = lsocket_on_connection_established;
552 21 : socket->lsocket.cb_msg_received = lsocket_on_msg_received;
553 21 : socket->lsocket.recv_sub_msg = 1;
554 21 : char* _hostname = netio_domain_name_lookup(hostname);
555 21 : netio_listen(&socket->lsocket, (const char*)_hostname, port);
556 21 : free(_hostname);
557 21 : }
558 :
559 :
560 : /**
561 : * Initializes a buffered publish socket but with tcp instead if libfabric.
562 : *
563 : * @param socket: The buffered publish socket to initialize
564 : * @param ctx: The NetIO context in which to initialize the socket
565 : * @param hostname: Hostname or IP address to bind to
566 : * @param port: Port to bind to
567 : * @param attr: Buffered connection settings to be used for the underlying connections
568 : *
569 : * @see `netio_buffered_send_socket_init` for a description of the connection parameters
570 : */
571 : void
572 2 : netio_publish_tcp_socket_init(struct netio_publish_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffered_socket_attr* attr)
573 : {
574 2 : memset(socket, 0, sizeof(*socket));
575 2 : socket->ctx = ctx;
576 2 : socket->tcp_fi_mode = NETIO_MODE_TCP;
577 2 : netio_subscription_table_init(&socket->subscription_table);
578 2 : netio_init_listen_tcp_socket(&socket->lsocket, ctx, NULL);
579 2 : memcpy(&socket->attr, attr, sizeof(struct netio_buffered_socket_attr));
580 2 : socket->lsocket.usr = socket;
581 2 : socket->lsocket.cb_connection_established = lsocket_on_connection_established;
582 2 : socket->lsocket.cb_msg_received = lsocket_on_msg_received;
583 2 : socket->lsocket.recv_sub_msg = 1;
584 2 : char* _hostname = netio_domain_name_lookup(hostname);
585 2 : netio_listen_tcp(&socket->lsocket, (const char*)_hostname, port);
586 2 : free(_hostname);
587 2 : }
588 :
589 :
590 : /*
591 : * Initializes a buffered publish socket.
592 : *
593 : * @param socket: The buffered publish socket to initialize
594 : * @param ctx: The NetIO context in which to initialize the socket
595 : * @param hostname: Hostname or IP address to bind to
596 : * @param port: Port to bind to
597 : * @param attr: Buffered connection settings to be used for the underlying connections
598 : *
599 : * @see `netio_buffered_send_socket_init` for a description of the connection parameters
600 : */
601 : void
602 23 : netio_publish_socket_init(struct netio_publish_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffered_socket_attr* attr)
603 : {
604 23 : if (netio_tcp_mode(hostname)) {
605 2 : netio_publish_tcp_socket_init(socket, ctx, netio_hostname(hostname), port, attr);
606 : } else {
607 21 : netio_publish_libfabric_socket_init(socket, ctx, netio_hostname(hostname), port, attr);
608 : }
609 23 : }
610 :
611 : /**
612 : * Initializes a buffered subscribe socket.
613 : *
614 : * @param socket: The buffered subscribe socket to initialize
615 : * @param ctx: The NetIO context in which to initialize the socket
616 : * @param attr: Buffered connection settings to be used for the underlying connections
617 : * @param hostname: Hostname or IP address of the local interface to bind to
618 : * @param remote_host: Hostname or IP of the remote publish socket
619 : * @param remote_port: Port of the remote publish socket
620 : *
621 : * @see `netio_buffered_send_socket_init` for a description of the connection parameters
622 : */
623 : void
624 130 : netio_subscribe_libfabric_socket_init(struct netio_subscribe_socket* socket,
625 : struct netio_context* ctx,
626 : struct netio_buffered_socket_attr* attr,
627 : const char* hostname,
628 : const char* remote_host,
629 : unsigned remote_port)
630 : {
631 130 : memset(socket, 0, sizeof(*socket));
632 130 : socket->ctx = ctx;
633 130 : socket->state = NONE;
634 130 : socket->tcp_fi_mode=NETIO_MODE_LIBFABRIC;
635 130 : memcpy(&socket->attr, attr, sizeof(struct netio_buffered_socket_attr));
636 130 : netio_buffered_listen_socket_init(&socket->recv_socket, ctx, &socket->attr);
637 130 : socket->recv_socket.listen_socket.usr = socket;
638 130 : socket->recv_socket.cb_connection_established = subscribe_socket_on_connection_established;
639 130 : socket->recv_socket.cb_connection_closed = subscribe_socket_on_connection_closed;
640 130 : socket->recv_socket.cb_msg_received = subscribe_socket_on_msg_received;
641 : //set cb_buf_received meant only for pileup measurement to NULL
642 130 : socket->cb_buf_received = NULL;
643 130 : char* lookedup_hostname = netio_domain_name_lookup(hostname);
644 130 : char* lookedup_remote_hostname = netio_domain_name_lookup(remote_host);
645 130 : netio_buffered_listen(&socket->recv_socket, (const char*)lookedup_hostname, 0);
646 :
647 130 : socket->remote_hostname = strdup((const char*)lookedup_remote_hostname);
648 130 : socket->remote_port = remote_port;
649 130 : socket->total_tags = 0;
650 130 : free(lookedup_remote_hostname);
651 130 : free(lookedup_hostname);
652 130 : }
653 :
654 :
655 : /* Same as above except tcp instead of libfabric */
656 : void
657 57 : netio_subscribe_tcp_socket_init(struct netio_subscribe_socket* socket,
658 : struct netio_context* ctx,
659 : struct netio_buffered_socket_attr* attr,
660 : const char* hostname,
661 : const char* remote_host,
662 : unsigned remote_port)
663 : {
664 57 : log_info("subscribe_tcp from <%s> to <%s>",hostname,remote_host);
665 57 : memset(socket, 0, sizeof(*socket));
666 57 : socket->ctx = ctx;
667 57 : socket->state = NONE;
668 57 : socket->tcp_fi_mode=NETIO_MODE_TCP;
669 57 : memcpy(&socket->attr, attr, sizeof(struct netio_buffered_socket_attr));
670 57 : log_dbg("Prepare recv socket (buffered_listen_tcp_socket)");
671 57 : netio_buffered_listen_tcp_socket_init(&socket->recv_socket, ctx, &socket->attr);
672 :
673 57 : socket->recv_socket.listen_socket.usr = socket;
674 57 : socket->recv_socket.cb_connection_established = subscribe_socket_on_connection_established;
675 57 : socket->recv_socket.cb_connection_closed = subscribe_socket_on_connection_closed;
676 57 : socket->recv_socket.cb_msg_received = subscribe_socket_on_msg_received;
677 : //set cb_buf_received meant only for pileup measurement to NULL
678 57 : socket->cb_buf_received = NULL;
679 57 : char* lookedup_hostname = netio_domain_name_lookup(hostname);
680 57 : char* lookedup_remote_hostname = netio_domain_name_lookup(remote_host);
681 57 : netio_buffered_listen_tcp(&socket->recv_socket, (const char*)lookedup_hostname, 0);
682 :
683 57 : socket->msg.port=socket->recv_socket.listen_socket.port;
684 :
685 57 : socket->remote_hostname = strdup((const char*)lookedup_remote_hostname);
686 57 : socket->remote_port = remote_port;
687 57 : socket->total_tags = 0;
688 57 : free(lookedup_remote_hostname);
689 57 : free(lookedup_hostname);
690 57 : }
691 :
692 :
693 : /**
694 : * Initializes a buffered subscribe socket.
695 : *
696 : * @param socket: The buffered subscribe socket to initialize
697 : * @param ctx: The NetIO context in which to initialize the socket
698 : * @param attr: Buffered connection settings to be used for the underlying connections
699 : * @param hostname: Hostname or IP address of the local interface to bind to
700 : * @param remote_host: Hostname or IP of the remote publish socket
701 : * @param remote_port: Port of the remote publish socket
702 : *
703 : * @see `netio_buffered_send_socket_init` for a description of the connection parameters
704 : */
705 : void
706 156 : netio_subscribe_socket_init(struct netio_subscribe_socket* socket,
707 : struct netio_context* ctx,
708 : struct netio_buffered_socket_attr* attr,
709 : const char* hostname,
710 : const char* remote_host,
711 : unsigned remote_port)
712 : {
713 156 : if (netio_tcp_mode(hostname)) {
714 26 : netio_subscribe_tcp_socket_init(socket, ctx, attr, netio_hostname(hostname), netio_hostname(remote_host), remote_port);
715 : } else {
716 130 : netio_subscribe_libfabric_socket_init(socket, ctx, attr, netio_hostname(hostname), netio_hostname(remote_host), remote_port);
717 : }
718 156 : }
719 :
720 : /**
721 : * Initialize a netio_subscription_cache object.
722 : *
723 : * @param cache: The cache to be initialized
724 : */
725 : void
726 25165824 : netio_subscription_cache_init(struct netio_subscription_cache* cache)
727 : {
728 25165824 : cache->ts = 0;
729 25165824 : cache->count = 0;
730 25165824 : cache->idx_start = 0;
731 25165824 : }
732 :
733 :
734 : static unsigned
735 10144 : lookup_tag(struct netio_subscription_table* table, netio_tag_t tag, unsigned* start)
736 : {
737 10144 : struct netio_subscription key;
738 10144 : key.tag = tag;
739 20288 : struct netio_subscription* ptr = (struct netio_subscription*)bsearch(&key,
740 10144 : table->subscriptions,
741 : table->num_subscriptions,
742 : sizeof(struct netio_subscription),
743 : cmp_subscription);
744 10144 : if(ptr == NULL) {
745 : return 0;
746 : }
747 10123 : unsigned start_idx = ptr - table->subscriptions;
748 10124 : while(start_idx > 0 && table->subscriptions[start_idx-1].tag == tag) {
749 : start_idx--;
750 : }
751 : unsigned count = 0;
752 20247 : for(unsigned int i=start_idx; i<table->num_subscriptions; i++) {
753 10125 : if(table->subscriptions[i].tag == tag) {
754 10124 : count++;
755 : } else {
756 : break;
757 : }
758 : }
759 10123 : *start = start_idx;
760 10123 : return count;
761 : }
762 :
763 : static unsigned
764 606002965 : lookup_tag_cached(struct netio_subscription_table* table, netio_tag_t tag, struct netio_subscription_cache* cache, unsigned* start)
765 : {
766 606002965 : if(cache->ts != table->ts) {
767 42 : cache->count = lookup_tag(table, tag, &cache->idx_start);
768 42 : cache->ts = table->ts;
769 : }
770 606002965 : *start = cache->idx_start;
771 606002965 : return cache->count;
772 : }
773 :
774 : /**
775 : * Publishes a message under a given tag
776 : *
777 : * @param socket: The socket to publish on
778 : * @param tag: The tag under which to publish
779 : * @param data: Message data
780 : * @param len: Message size
781 : * @param flags: NETIO_REENTRY publishing of this message was attempted before and
782 : * resulted in NETIO_STATUS_AGAIN. Calling publish with
783 : * this flag will only send on connections where the
784 : * message was previously unpublished.
785 : * @param cache: Optional user-supplied cache for the subscription table lookup.
786 : */
787 : int
788 606013410 : netio_buffered_publishi(struct netio_publish_socket* socket, struct iovec * iov, size_t len, int flags, struct netio_subscription_cache* cache)
789 : {
790 606013410 : int ret = NETIO_STATUS_OK;
791 606013410 : netio_tag_t tag = *(netio_tag_t*)iov[0].iov_base;
792 606013410 : unsigned start_idx;
793 606013410 : unsigned num_subscriptions;
794 :
795 606013410 : if(cache) {
796 606003308 : num_subscriptions = lookup_tag_cached(&socket->subscription_table, tag, cache, &start_idx);
797 : } else {
798 10102 : num_subscriptions = lookup_tag(&socket->subscription_table, tag, &start_idx);
799 : }
800 606013155 : if(num_subscriptions == 0) return NETIO_STATUS_OK_NOSUB;
801 :
802 120494182 : for(unsigned i=start_idx; i<start_idx+num_subscriptions; i++) {
803 62617941 : struct netio_subscription* subscription = &socket->subscription_table.subscriptions[i];
804 62617941 : if(subscription->tag == tag) {
805 :
806 : // skip connections that were already successful if we are in reentry mode
807 62618081 : if(flags & NETIO_REENTRY) {
808 219219 : if(subscription->again == 0) {
809 116 : continue;
810 : }
811 : }
812 :
813 62617965 : int result = netio_buffered_sendv(subscription->socket, iov, len);
814 62617185 : log_dbg("Sending iov on subscription->socket, result=%d",result);
815 :
816 62617185 : if(result == NETIO_STATUS_OK) {
817 62398221 : subscription->again = 0;
818 218964 : } else if(result == NETIO_STATUS_AGAIN) {
819 218964 : subscription->again = 1;
820 218964 : ret = NETIO_STATUS_AGAIN;
821 : }
822 0 : else if(result == NETIO_STATUS_TOO_BIG) {
823 0 : subscription->again = 0;
824 0 : ret = NETIO_STATUS_TOO_BIG;
825 : }
826 : else {
827 : return result; // some error occured and we return immediately
828 : }
829 : }
830 : }
831 : return ret;
832 : }
833 :
834 :
835 : /**
836 : * Publishes a message under a given tag
837 : *
838 : * @param socket: The socket to publish on
839 : * @param tag: The tag under which to publish
840 : * @param data: Message data
841 : * @param len: Message size
842 : * @param flags: NETIO_REENTRY publishing of this message was attempted before and
843 : * resulted in NETIO_STATUS_AGAIN. Calling publish with
844 : * this flag will only send on connections where the
845 : * message was previously unpublished.
846 : * @param cache: Optional user-supplied cache for the subscription table lookup.
847 : */
848 : int
849 10102 : netio_buffered_publish(struct netio_publish_socket* socket, netio_tag_t tag, void* data, size_t len, int flags, struct netio_subscription_cache* cache)
850 : {
851 10102 : log_trc("netio_buffered_publish (size=%lu)", len);
852 :
853 10102 : struct iovec iov[2];
854 10102 : iov[0].iov_base = &tag;
855 10102 : iov[0].iov_len = sizeof(netio_tag_t);
856 10102 : iov[1].iov_base = data;
857 10102 : iov[1].iov_len = len;
858 :
859 10102 : return netio_buffered_publishi(socket, iov, 2, flags, cache);
860 : }
861 :
862 :
863 : int
864 605990356 : netio_buffered_publishv(struct netio_publish_socket* socket, netio_tag_t tag, struct iovec * iov, size_t len, int flags, struct netio_subscription_cache* cache)
865 : {
866 605990356 : struct iovec iovv[NETIO_MAX_IOV_LEN + 1];
867 : //NETIO_MAX_IOV_LEN is not a limititation, as all entries will be copied in one in netio_buffered_sendv
868 605990356 : iovv[0].iov_base = &tag;
869 605990356 : iovv[0].iov_len = sizeof(netio_tag_t);
870 605990356 : size_t size = (len > NETIO_MAX_IOV_LEN) ? NETIO_MAX_IOV_LEN : len;
871 605990356 : memcpy(iovv + 1, iov, size * sizeof(struct iovec));
872 605990356 : return netio_buffered_publishi(socket, iovv, (size + 1), flags, cache);
873 : }
874 :
875 :
876 : /**
877 : * Flushes buffers on all connections of a given publish socket for a certain tag.
878 : *
879 : * @param socket The buffered publish socket
880 : * @param tag The message tag
881 : * @param cache An optional subscription cache object
882 : */
883 : void
884 14160353 : netio_buffered_publish_flush(struct netio_publish_socket* socket, netio_tag_t tag, struct netio_subscription_cache* cache)
885 : {
886 15211103 : for(unsigned i=0; i<socket->subscription_table.num_subscriptions; i++) {
887 1050750 : struct netio_subscription* subscription = &socket->subscription_table.subscriptions[i];
888 1050750 : netio_buffered_flush(subscription->socket);
889 : }
890 14160353 : }
891 :
892 :
893 : /**
894 : * Subscribe to a given message tag.
895 : *
896 : * For a given subscribe socket, `netio_subscribe` can be called multiple times.
897 : *
898 : * @param socket: The buffered subscribe socket.
899 : * @param tag: The subscription tag.
900 : */
901 : int
902 1242 : netio_subscribe(struct netio_subscribe_socket* socket, netio_tag_t tag)
903 : {
904 1242 : if(socket->state == NONE) {
905 : //A new socket is created and the on subsocket_on_connection_established
906 : //will trigger the actual subscriptions.
907 187 : log_dbg("Creating and connecting a new send_socket");
908 187 : if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC) {
909 130 : netio_init_send_socket(&socket->socket, socket->ctx);
910 130 : socket->socket.usr = socket;
911 130 : socket->socket.cb_connection_established = subsocket_on_connection_established;
912 130 : socket->socket.cb_connection_closed = subsocket_on_send_connection_closed;
913 130 : socket->socket.cb_error_connection_refused = subsocket_on_error_connection_refused;
914 130 : socket->socket.cb_send_completed = subsocket_on_send_completed;
915 130 : netio_connect(&socket->socket, socket->remote_hostname, socket->remote_port);
916 57 : } else if (socket->tcp_fi_mode == NETIO_MODE_TCP) {
917 57 : netio_init_send_tcp_socket(&socket->socket, socket->ctx);
918 57 : socket->socket.usr = socket;
919 57 : socket->socket.cb_connection_established = subsocket_on_connection_established;
920 57 : socket->socket.cb_connection_closed = subsocket_on_send_connection_closed;
921 57 : socket->socket.cb_error_connection_refused = subsocket_on_error_connection_refused;
922 57 : socket->socket.cb_send_completed = subsocket_on_send_completed;
923 57 : netio_connect_tcp(&socket->socket, socket->remote_hostname, socket->remote_port);
924 : }
925 187 : socket->state = INITIALIZED;
926 : }
927 :
928 4267 : for(unsigned i=0; i<socket->total_tags; i++) {
929 3025 : if(socket->tags_to_subscribe[i] == tag) {
930 : return 0;
931 : }
932 : }
933 :
934 : //if send socket connected send message
935 : //otherwise on_connection_established will do it
936 1242 : if (socket->socket.state){
937 865 : log_info("Sending subscription message for tag 0x%lx", tag);
938 865 : int ret = send_subscription_message(socket, tag, NETIO_SUBSCRIBE);
939 865 : return ret;
940 : } else {
941 377 : log_info("Queing subscription message for txg 0x%lx", tag);
942 377 : socket->tags_to_subscribe[socket->total_tags] = tag;
943 377 : socket->total_tags++;
944 377 : return 0;
945 : }
946 : }
947 :
948 :
949 : static int
950 0 : remove_tag_to_subscribe(struct netio_subscribe_socket* socket, netio_tag_t tag)
951 : {
952 0 : int found = 0;
953 0 : for(unsigned int i=0; i<socket->total_tags; ++i){
954 0 : if(socket->tags_to_subscribe[i] == tag){
955 0 : log_info("Removing tag 0x%lx from tags_to_subscribe of socket %p", socket);
956 0 : for(unsigned int j = i; j < socket->total_tags-1; ++j){
957 0 : socket->tags_to_subscribe[j] = socket->tags_to_subscribe[j+1];
958 : }
959 0 : found = 1;
960 0 : socket->total_tags--;
961 0 : break;
962 : }
963 : }
964 0 : if(found == 0){
965 0 : log_warn("tag 0x%lx expected in tags_to_subscribe of socket %p not found", tag);
966 : }
967 0 : return NETIO_STATUS_OK;
968 : }
969 :
970 :
971 : /**
972 : * Unsubscribe from a given message tag.
973 : *
974 : * For a given subscribe socket, `netio_unsubscribe` can be called multiple times.
975 : *
976 : * @param socket: The subscribe socket.
977 : * @param tag: The tag to unsubscribe from.
978 : */
979 : int
980 1163 : netio_unsubscribe(struct netio_subscribe_socket* socket, netio_tag_t tag)
981 : {
982 1163 : int ret = NETIO_STATUS_OK;
983 1163 : if(socket->state == INITIALIZED) {
984 1163 : log_dbg("Subscribe socket initialised, can proceed with usubscription");
985 1163 : if (socket->socket.state) {
986 1163 : ret = send_subscription_message(socket, tag, NETIO_UNSUBSCRIBE);
987 1163 : log_dbg("Unsubscription message for tag 0x%lx sent, return code %d", tag, ret);
988 : } else {
989 0 : ret = remove_tag_to_subscribe(socket, tag);
990 : }
991 : } else {
992 1163 : log_dbg("The connection has been already closed.");
993 : }
994 1163 : return ret;
995 : }
996 :
997 : /**
998 : * Returns the minimum number of netio pages across buffered send sockets
999 : * associated to a publish socket
1000 : *
1001 : * @param socket: The publish socket.
1002 : */
1003 : unsigned
1004 83 : netio_pubsocket_get_minimum_pages(struct netio_publish_socket* socket)
1005 : {
1006 83 : if (!socket) {
1007 : return 0;
1008 : }
1009 :
1010 83 : size_t pages = socket->attr.num_pages;
1011 83 : struct netio_socket_list* itr = socket->subscription_table.socket_list;
1012 :
1013 122 : while(itr != NULL){
1014 39 : struct netio_buffered_send_socket* buf_send_socket = (struct netio_buffered_send_socket*)itr->socket;
1015 39 : uint64_t socket_pages = buf_send_socket->buffers.available_buffers;
1016 39 : if(socket_pages < pages){
1017 : pages = socket_pages;
1018 : }
1019 39 : itr = itr->next;
1020 : }
1021 83 : return pages;
1022 : }
1023 :
|