Line data Source code
1 : #include <stdio.h>
2 : #include <string.h>
3 : #include <assert.h>
4 : #include "log.h"
5 : #include "netio/netio.h"
6 : #include "netio/netio_tcp.h"
7 :
8 : #include <sys/socket.h>
9 : #include <netdb.h>
10 :
11 :
12 : #define PUBLISH_SOCKET_MAX_COMPLETIONS (512)
13 :
14 : #if defined DEBUG || defined DEBUG_UPUB
15 : #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
16 : #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
17 : #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
18 : #else
19 : #define log_dbg(...)
20 : #define log_trc(...)
21 : #endif
22 :
23 : static unsigned lookup_tag(struct netio_subscription_table* table, netio_tag_t tag, unsigned* start);
24 :
25 : static int
26 769996 : cmp_subscription(const void* a, const void *b)
27 : {
28 769996 : struct netio_subscription* suba = (struct netio_subscription*)a;
29 769996 : struct netio_subscription* subb = (struct netio_subscription*)b;
30 :
31 769996 : if(suba->tag == subb->tag) {
32 : return 0;
33 : }
34 11 : return suba->tag > subb->tag ? 1 : -1;
35 : }
36 :
37 :
38 : static int
39 22 : table_add_subscription(struct netio_subscription_table* table, netio_tag_t tag, struct netio_send_socket* socket)
40 : {
41 22 : if(table->num_subscriptions == table->size) {
42 0 : log_error("Maximum number of subscriptions. New subscription for 0x%lx dropped.", tag);
43 0 : return 1;
44 : }
45 :
46 : //Check if the subscription is already in the list
47 22 : unsigned start;
48 22 : unsigned count = lookup_tag(table, tag, &start);
49 25 : for(unsigned i=0; i<count; i++) {
50 3 : if(table->subscriptions[start+i].socket == socket) {
51 : return 0;
52 : }
53 : }
54 :
55 22 : table->subscriptions[table->num_subscriptions].tag = tag;
56 22 : table->subscriptions[table->num_subscriptions].socket = socket;
57 22 : table->subscriptions[table->num_subscriptions].again = 0;
58 22 : table->num_subscriptions++;
59 22 : table->ts++;
60 22 : log_info("New entry in subscription table, tag=%lu in hex 0x%lx, socket=%p, total #subscription=%lu", tag, tag, socket, table->num_subscriptions);
61 22 : qsort(table->subscriptions, table->num_subscriptions, sizeof(struct netio_subscription), cmp_subscription);
62 22 : return 0;
63 : }
64 :
65 :
66 : /**
67 : * @brief Handle an unsubscription or a client disconnection.
68 : *
69 : * @param pubsocket: netio_unbuffered_publish_socket to which all send sockets are associated.
70 : * @param socket: the netio_send_socket not needed anymore
71 : * @param tag: the tag for which an unsubscribe request has been received
72 : * @param closed_connection: a flag to enable the removal of all the subscriptions associated to
73 : * the send socket in response to a closed connection.
74 : */
75 : static void
76 16 : table_remove_subscription(struct netio_unbuffered_publish_socket* pubsocket, struct netio_send_socket* socket, netio_tag_t tag, uint8_t closed_connection)
77 : {
78 16 : struct netio_subscription_table* table = &pubsocket->subscription_table;
79 16 : log_dbg("Total subscriptions: %lu", table->num_subscriptions);
80 :
81 16 : unsigned i=0;
82 16 : unsigned remaining_subscriptions_of_socket=0;
83 34 : for(unsigned int k = 0; k < table->num_subscriptions; ++k) {
84 18 : if(table->subscriptions[k].socket == socket){remaining_subscriptions_of_socket++;}
85 : }
86 :
87 34 : while(i<table->num_subscriptions) {
88 18 : log_dbg("Removing subscription tag %lx table socket %p socket %p", table->subscriptions[i].tag, table->subscriptions[i].socket, socket);
89 18 : if(table->subscriptions[i].socket == socket && (table->subscriptions[i].tag == tag || closed_connection)) {
90 : //increment the completion semaphore counter as if messages were sent to the disconnected client
91 : struct netio_completion_stack* cs = &pubsocket->completion_stack;
92 8208 : for(size_t j=0; j < cs->num_objects; ++j) {
93 8192 : if(cs->objects[j].header.tag == table->subscriptions[i].tag && cs->objects[j].key != 0xFFFFFFFFFFFFFFFF){
94 2022 : netio_semaphore_increment(&cs->objects[j].sem, 1);
95 : }
96 : }
97 :
98 16 : log_dbg("Available completion objects %lu / %lu", cs->available_objects, cs->num_objects);
99 : log_dbg("Removing connection in subscription table, tag=%lu, socket=%p index %lu becomes %u",
100 : table->subscriptions[i].tag,
101 : table->subscriptions[i].socket,
102 : table->num_subscriptions-1,
103 16 : i);
104 16 : table->subscriptions[i].tag = table->subscriptions[table->num_subscriptions-1].tag;
105 16 : table->subscriptions[i].socket = table->subscriptions[table->num_subscriptions-1].socket;
106 16 : table->subscriptions[i].again = table->subscriptions[table->num_subscriptions-1].again;
107 16 : table->num_subscriptions--;
108 16 : remaining_subscriptions_of_socket--;
109 16 : table->ts++;
110 : }
111 : else{
112 2 : i++;
113 34 : log_dbg("Skipping tag=%lu because the socket is %p", table->subscriptions[i].tag, table->subscriptions[i].socket);
114 : }
115 : }
116 :
117 16 : log_dbg("At the end of table_remove_subscription available completion objects %lu / %lu", pubsocket->completion_stack.available_objects, pubsocket->completion_stack.num_objects);
118 16 : qsort(table->subscriptions, table->num_subscriptions, sizeof(struct netio_subscription), cmp_subscription);
119 16 : log_dbg("Remaining subscriptions: %lu",table->num_subscriptions);
120 :
121 16 : log_info("Removing entry from unbuffered subscription table, Fid=0x%lx, socket=%p, total #subscription=%lu", tag, socket, table->num_subscriptions);
122 :
123 16 : if(remaining_subscriptions_of_socket==0){
124 16 : log_info("No subscriptions remaining");
125 16 : if (socket->tcp_fi_mode == NETIO_MODE_TCP){
126 : // netio_close_socket(&socket->ctx->evloop,socket,USEND); // TODO prevents re-subscription
127 16 : } else if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC && closed_connection==0){
128 16 : netio_disconnect(socket);
129 : }
130 : }
131 16 : }
132 :
133 : static int
134 16 : table_get_subs_by_socket(struct netio_subscription_table* table, struct netio_send_socket* socket)
135 : {
136 16 : log_dbg("Subscription table lookup: Total subscriptions: %lu", table->num_subscriptions);
137 16 : unsigned remaining_subscriptions_of_socket=0;
138 16 : for(unsigned int k = 0; k < table->num_subscriptions; ++k) {
139 0 : if(table->subscriptions[k].socket == socket){remaining_subscriptions_of_socket++;}
140 : }
141 16 : log_dbg("Subscription table lookup: for socket %p: %lu", socket, remaining_subscriptions_of_socket);
142 16 : return remaining_subscriptions_of_socket;
143 : }
144 :
145 : static void
146 10805972 : release_completion_object(struct netio_completion_object* completion)
147 : {
148 10805972 : log_trc("releasing completion object");
149 10805972 : if(netio_completion_stack_push(&completion->socket->completion_stack, completion)) {
150 0 : log_error("Could not push completion object since stack was full. This should not happen.");
151 : }
152 10805972 : }
153 :
154 : static void
155 10805972 : on_completion_trigger(void* c)
156 : {
157 10805972 : struct netio_completion_object* completion = (struct netio_completion_object*)c;
158 10805972 : log_trc("calling cb_msg_published");
159 10805972 : if(completion->socket->cb_msg_published) {
160 10805971 : completion->socket->cb_msg_published(completion->socket, completion->key);
161 : }
162 10805972 : release_completion_object(completion);
163 10805972 : }
164 :
165 0 : void print_completion_objects(struct netio_unbuffered_publish_socket* socket){
166 0 : printf("Number of available completion objects: %zu \n", socket->completion_stack.available_objects);
167 0 : printf("===============================================\n");
168 0 : printf("CO# \t KEY \t \t TAG \n");
169 0 : printf("-----------------------------------------------\n");
170 0 : for(unsigned int i=0; i < socket->completion_stack.num_objects; ++i){
171 0 : uint32_t tag = (socket->completion_stack.key_array[i] >> 32);
172 0 : printf("%u \t 0x%lx \t %x \n", i, socket->completion_stack.key_array[i], tag);
173 : }
174 0 : printf("===============================================\n");
175 0 : printf("Subscription table: there are %zu elinks subscribed\n", socket->subscription_table.num_subscriptions);
176 0 : printf("FID \t \t \t SOCKET\n");
177 0 : printf("-----------------------------------------------\n");
178 0 : for (unsigned int i=0; i < socket->subscription_table.num_subscriptions; ++i){
179 0 : printf("0x%lx \t %p \n", socket->subscription_table.subscriptions[i].tag, socket->subscription_table.subscriptions[i].socket);
180 : }
181 0 : socket->completion_stack.printed = 1;
182 0 : }
183 :
184 :
185 :
186 : static struct netio_completion_object*
187 18137380 : fetch_completion_object(struct netio_unbuffered_publish_socket* socket)
188 : {
189 18137380 : struct netio_completion_object* completion;
190 18137380 : if(netio_completion_stack_pop(&socket->completion_stack, &completion)) {
191 : #if defined DEBUG || defined DEBUG_UPUB
192 : if (socket->completion_stack.printed == 0){
193 : print_completion_objects(socket);
194 : }
195 : #endif
196 : return NULL;
197 : }
198 :
199 10805972 : netio_semaphore_init(&completion->sem, 0);
200 10805972 : completion->sem.data = completion;
201 10805972 : completion->sem.cb = on_completion_trigger;
202 10805972 : completion->socket = socket;
203 10805972 : return completion;
204 : }
205 :
206 : static void
207 11304316 : increment_completion_object(struct netio_unbuffered_publish_socket* pubsocket, uint64_t key)
208 : {
209 11304316 : log_trc("incrementing completion object");
210 11304316 : struct netio_completion_object *completion = (struct netio_completion_object*)key;
211 11304316 : netio_semaphore_increment(&completion->sem, 1);
212 11304316 : log_trc("current: %d expected: %d", completion->sem.current, completion->sem.threshold);
213 11304316 : }
214 :
215 : static void
216 21 : pubsocket_on_connection_established(struct netio_send_socket* socket)
217 : {
218 21 : log_dbg("publish socket established connection to remote, can publish now");
219 21 : struct netio_unbuffered_publish_socket* pubsocket = (struct netio_unbuffered_publish_socket*)socket->unbuf_pub_socket;
220 : //add deferred subscriptions to the table
221 21 : struct deferred_subscription* sub = socket->deferred_subs;
222 43 : while(sub){
223 22 : int ret = table_add_subscription(&pubsocket->subscription_table, sub->tag, socket);
224 22 : if(pubsocket->cb_subscribe && ret == 0){
225 8 : pubsocket->cb_subscribe(pubsocket, sub->tag, NULL, 0);
226 : }
227 22 : pop_subscription(&sub);
228 : }
229 : //user callback
230 21 : if(pubsocket->cb_connection_established) {
231 7 : pubsocket->cb_connection_established(pubsocket);
232 : }
233 21 : }
234 :
235 :
236 : static void
237 16 : pubsocket_on_connection_closed(struct netio_send_socket* socket)
238 : {
239 16 : log_info("published socket: connection to remote was closed");
240 16 : struct netio_unbuffered_publish_socket* pubsocket = (struct netio_unbuffered_publish_socket*)socket->unbuf_pub_socket;
241 16 : if(pubsocket->cb_connection_closed) {
242 2 : pubsocket->cb_connection_closed(pubsocket);
243 : }
244 16 : if (table_get_subs_by_socket(&pubsocket->subscription_table, socket)){
245 0 : uint8_t closed_connection = 1;
246 0 : table_remove_subscription(pubsocket, socket, 0, closed_connection);
247 : }
248 16 : handle_send_socket_shutdown(socket);
249 16 : remove_socket(&(pubsocket->subscription_table.socket_list), socket);
250 16 : }
251 :
252 :
253 : static void
254 155 : on_unbuffered_send_connection_closed(struct netio_send_socket* socket)
255 : {
256 155 : log_info("Send socket: connection to remote was closed");
257 155 : if(socket->unbuf_pub_socket != NULL) {
258 0 : pubsocket_on_connection_closed(socket);
259 : } else{
260 155 : handle_send_socket_shutdown(socket);
261 : }
262 155 : }
263 :
264 : static void
265 11304316 : pubsocket_on_send_completed(struct netio_send_socket* socket, uint64_t key)
266 : {
267 11304316 : struct netio_unbuffered_publish_socket* pubsocket = (struct netio_unbuffered_publish_socket*)socket->unbuf_pub_socket;
268 11304316 : increment_completion_object(pubsocket, key);
269 11304316 : log_trc("completion on pubsocket connection, key=%lu", key);
270 11304316 : }
271 :
272 :
273 : static struct netio_send_socket*
274 22 : socket_list_add_or_lookup(struct netio_unbuffered_publish_socket* pubsocket,
275 : struct netio_socket_list** list,
276 : void* addr, size_t addrlen,
277 : int port,
278 : struct netio_context* ctx)
279 : {
280 22 : if(addrlen == 0) {
281 0 : log_error("Invalid zero-byte address");
282 0 : return NULL;
283 : }
284 :
285 22 : struct netio_socket_list* entry = find_socket_by_address(*list, addr, addrlen, port);
286 :
287 22 : if ( entry == NULL ) {
288 21 : entry = add_socket_with_address(list, USEND, addr, addrlen, port);
289 21 : struct netio_send_socket* socket = (struct netio_send_socket*)entry->socket;
290 :
291 21 : if (pubsocket->tcp_fi_mode==NETIO_MODE_LIBFABRIC) {
292 21 : struct netio_domain* domain = NULL; //domain is shared among sockets,
293 21 : if (entry->next != NULL && entry->next->socket != NULL) {
294 4 : domain = ((struct netio_send_socket*)(entry->next->socket))->domain;
295 : }
296 21 : netio_init_send_socket(socket, ctx); //here we memset to zero and we loose domain
297 21 : socket->unbuf_pub_socket = pubsocket;
298 21 : socket->cb_connection_established = pubsocket_on_connection_established;
299 21 : socket->cb_connection_closed = pubsocket_on_connection_closed;
300 21 : socket->cb_send_completed = pubsocket_on_send_completed;
301 21 : netio_connect_rawaddr_domain(socket, entry->addr, entry->addrlen, domain);
302 21 : if ( domain == NULL ){ //check on local domain variable
303 17 : netio_register_send_buffer(socket, &pubsocket->buf, 0);
304 17 : netio_completion_stack_register_send_socket(&pubsocket->completion_stack, socket);
305 : }
306 : }
307 0 : else if (pubsocket->tcp_fi_mode==NETIO_MODE_TCP){
308 0 : netio_init_send_tcp_socket(socket, ctx);
309 0 : socket->unbuf_pub_socket = pubsocket;
310 0 : socket->cb_connection_established = pubsocket_on_connection_established;
311 0 : socket->cb_connection_closed = pubsocket_on_connection_closed;
312 0 : socket->cb_send_completed = pubsocket_on_send_completed;
313 0 : log_dbg("Calling netio_connect_tcp for addr=<%s> port=%d", (char*)entry->addr, port);
314 0 : netio_connect_tcp(socket, entry->addr, port);
315 : }
316 : else {
317 0 : log_error("socket_list_add_or_lookup called for unkown connection mode %d", pubsocket->tcp_fi_mode);
318 0 : remove_socket(list, socket);
319 0 : return NULL;
320 : }
321 : }
322 :
323 22 : struct netio_send_socket* ss = (struct netio_send_socket*)entry->socket;
324 22 : return ss;
325 : }
326 :
327 :
328 : static void
329 22 : subscribe(struct netio_unbuffered_publish_socket* pubsocket, struct netio_recv_socket* recv_socket, void* addr, size_t addrlen, int port, netio_tag_t tag)
330 : {
331 22 : struct netio_send_socket* socket = socket_list_add_or_lookup(pubsocket, &pubsocket->subscription_table.socket_list, addr, addrlen, port, pubsocket->ctx);
332 22 : if(socket->recv_socket == NULL){
333 21 : socket->recv_socket =recv_socket;
334 : }
335 :
336 22 : if (socket->state == CONNECTED){
337 0 : table_add_subscription(&pubsocket->subscription_table, tag, socket);
338 0 : if(pubsocket->cb_subscribe) {
339 0 : pubsocket->cb_subscribe(pubsocket, tag, addr, addrlen);
340 : }
341 : } else {
342 22 : push_back_subscription(&socket->deferred_subs, tag);
343 : }
344 22 : }
345 :
346 : static void
347 16 : unsubscribe(struct netio_unbuffered_publish_socket* pubsocket, void* addr, size_t addrlen, int port, netio_tag_t tag)
348 : {
349 16 : struct netio_socket_list* list = find_socket_by_address(pubsocket->subscription_table.socket_list, addr, addrlen, port);
350 16 : if(list == NULL){return;}
351 :
352 16 : struct netio_send_socket* socket = (struct netio_send_socket*)list->socket;
353 16 : uint8_t connection_closed = 0;
354 16 : table_remove_subscription(pubsocket, socket, tag, connection_closed);
355 16 : pubsocket->subscription_table.ts++;
356 :
357 16 : if(pubsocket->cb_unsubscribe) {
358 1 : pubsocket->cb_unsubscribe(pubsocket, tag, addr, addrlen);
359 : }
360 : }
361 :
362 : static void
363 21 : lsocket_on_connection_established(struct netio_recv_socket* socket)
364 : {
365 21 : log_dbg("Buffered listen socket: on connection established");
366 21 : if(socket->tcp_fi_mode == NETIO_MODE_TCP){
367 0 : struct netio_buffer* buf = malloc(sizeof(struct netio_buffer));
368 0 : buf->size = sizeof(struct netio_subscription_message);
369 0 : buf->data = malloc(buf->size);
370 0 : netio_post_recv(socket, buf);
371 0 : socket->usr = buf;
372 : }
373 21 : }
374 :
375 :
376 : static void
377 38 : parse_subscription_message(struct netio_unbuffered_publish_socket* socket, struct netio_subscription_message* msg, struct netio_recv_socket* recv_socket)
378 : {
379 38 : if (msg->action){
380 22 : log_dbg("subscription request for tag %lu, addrlen=%lu", msg->tag, msg->addrlen);
381 22 : subscribe(socket, recv_socket, msg->addr, msg->addrlen, msg->port, msg->tag);
382 : }
383 : else{
384 16 : log_dbg("unsubscription request for tag %lu, addrlen=%lu", msg->tag, msg->addrlen);
385 16 : unsubscribe(socket, msg->addr, msg->addrlen, msg->port, msg->tag);
386 : }
387 38 : }
388 :
389 :
390 : static void
391 38 : lsocket_on_msg_received(struct netio_recv_socket* socket, struct netio_buffer* buf, void* data, size_t len)
392 : {
393 38 : log_trc("message received");
394 38 : struct netio_unbuffered_publish_socket* pubsocket = (struct netio_unbuffered_publish_socket*)socket->lsocket->usr;
395 38 : if(len != sizeof(struct netio_subscription_message)) {
396 0 : log_error("Illegal subscription message size %lu", len);
397 0 : netio_post_recv(socket, buf);
398 0 : return;
399 : }
400 38 : parse_subscription_message(pubsocket, (struct netio_subscription_message*)buf->data, socket);
401 38 : netio_post_recv(socket, buf);
402 : }
403 :
404 :
405 : static int
406 2255 : send_subscription_message(struct netio_unbuffered_subscribe_socket* socket, netio_tag_t tag, int action)
407 : {
408 2255 : if( action == NETIO_SUBSCRIBE ){
409 1157 : log_info("Sending subscription for tag 0x%lx", tag);
410 1098 : } else if ( action == NETIO_UNSUBSCRIBE ){
411 1098 : log_info("Sending unsubscription for tag 0x%lx", tag);
412 : } else {
413 0 : log_error("Invalid subscription action %d", action);
414 0 : return 0;
415 : }
416 2255 : int ret = 0;
417 2255 : if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
418 1170 : socket->msg.tag = tag;
419 1170 : socket->msg.action = action;
420 1170 : socket->buf.data = &socket->msg;
421 1170 : socket->buf.size = sizeof(struct netio_subscription_message);
422 1170 : ret = netio_send_inline_buffer(&socket->socket, &socket->buf);
423 1085 : } else if (socket->tcp_fi_mode == NETIO_MODE_TCP){
424 : // look for available buf
425 : size_t id = 0;
426 2122 : while (socket->bufs[id].to_send) {
427 1037 : id++;
428 : }
429 1085 : log_dbg("tag id %d", id);
430 1085 : socket->msgs[id] = socket->msgs[0]; // initialize msg to default
431 1085 : socket->msgs[id].tag = tag;
432 1085 : socket->msgs[id].action = action;
433 1085 : socket->bufs[id].data = &socket->msgs[id];
434 1085 : socket->bufs[id].size = sizeof(struct netio_subscription_message);
435 1085 : socket->bufs[id].to_send = 1;
436 1085 : ret = netio_send_buffer(&socket->socket, &socket->bufs[id]);
437 : } else {
438 0 : log_error("Socket connection type unsupported %d", socket->tcp_fi_mode);
439 0 : ret = 1;
440 : }
441 : return ret;
442 : }
443 :
444 : /**
445 : * Initialize an unbuffered publish socket
446 : *
447 : * @param socket: An unbuffered publish socket
448 : * @param ctx: A netio context
449 : * @param hostname: Local hostname to bind to
450 : * @param port: Local port to bind to
451 : * @param buf: A registered send buffer
452 : */
453 : void
454 18 : netio_unbuffered_publish_libfabric_socket_init(struct netio_unbuffered_publish_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffer* buf)
455 : {
456 18 : log_dbg("netio_unbuffered_publish_libfabric_socket_init");
457 18 : socket->ctx = ctx;
458 18 : socket->tcp_fi_mode = NETIO_MODE_LIBFABRIC;
459 18 : memcpy(&socket->buf, buf, sizeof(struct netio_buffer));
460 18 : netio_subscription_table_init(&socket->subscription_table);
461 18 : netio_completion_stack_init(&socket->completion_stack, PUBLISH_SOCKET_MAX_COMPLETIONS);
462 18 : netio_init_listen_socket(&socket->lsocket, ctx, NULL);
463 18 : socket->lsocket.usr = socket;
464 18 : socket->lsocket.cb_connection_established = lsocket_on_connection_established;
465 18 : socket->lsocket.cb_msg_received = lsocket_on_msg_received;
466 18 : socket->lsocket.recv_sub_msg = 1;
467 18 : netio_listen(&socket->lsocket, hostname, port);
468 :
469 18 : socket->buf_array[0] = &socket->completion_stack.buf;
470 504 : for(unsigned i=1; i<NETIO_MAX_IOV_LEN; i++) {
471 486 : socket->buf_array[i] = &socket->buf;
472 : }
473 18 : }
474 :
475 : /* tcp versio of above */
476 : void
477 0 : netio_unbuffered_publish_tcp_socket_init(struct netio_unbuffered_publish_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffer* buf)
478 : {
479 0 : log_dbg("INIT: netio_unbuffered_publish_tcp_socket_init");
480 0 : socket->ctx = ctx;
481 0 : socket->tcp_fi_mode = NETIO_MODE_TCP;
482 0 : memcpy(&socket->buf, buf, sizeof(struct netio_buffer));
483 0 : netio_subscription_table_init(&socket->subscription_table);
484 0 : netio_completion_stack_init(&socket->completion_stack, PUBLISH_SOCKET_MAX_COMPLETIONS);
485 0 : netio_init_listen_tcp_socket(&socket->lsocket, ctx, NULL);
486 0 : socket->lsocket.usr = socket;
487 0 : socket->lsocket.cb_connection_established = lsocket_on_connection_established;
488 0 : socket->lsocket.cb_msg_received = lsocket_on_msg_received;
489 0 : socket->lsocket.recv_sub_msg = 1;
490 0 : netio_listen_tcp(&socket->lsocket, hostname, port);
491 :
492 0 : socket->buf_array[0] = &socket->completion_stack.buf;
493 0 : for(unsigned i=1; i<NETIO_MAX_IOV_LEN; i++) {
494 0 : socket->buf_array[i] = &socket->buf;
495 : }
496 0 : }
497 :
498 : /**
499 : * Initialize an unbuffered publish socket
500 : *
501 : * @param socket: An unbuffered publish socket
502 : * @param ctx: A netio context
503 : * @param hostname: Local hostname to bind to
504 : * @param port: Local port to bind to
505 : * @param buf: A registered send buffer
506 : */
507 : void
508 18 : netio_unbuffered_publish_socket_init(struct netio_unbuffered_publish_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffer* buf)
509 : {
510 18 : if (netio_tcp_mode(hostname)) {
511 0 : netio_unbuffered_publish_tcp_socket_init(socket, ctx, netio_hostname(hostname), port, buf);
512 : } else {
513 18 : netio_unbuffered_publish_libfabric_socket_init(socket, ctx, netio_hostname(hostname), port, buf);
514 : }
515 18 : }
516 :
517 : static unsigned
518 1316889 : lookup_tag(struct netio_subscription_table* table, netio_tag_t tag, unsigned* start)
519 : {
520 1316889 : struct netio_subscription key;
521 1316889 : key.tag = tag;
522 2633778 : struct netio_subscription* ptr = (struct netio_subscription*)bsearch(&key,
523 1316889 : table->subscriptions,
524 : table->num_subscriptions,
525 : sizeof(struct netio_subscription),
526 : cmp_subscription);
527 :
528 1316889 : log_trc("found ptr=0x%p", ptr);
529 1316889 : if(ptr == NULL) {
530 : return 0;
531 : }
532 769982 : unsigned start_idx = ptr - table->subscriptions;
533 770499 : while(start_idx > 0 && table->subscriptions[start_idx-1].tag == tag) {
534 : start_idx--;
535 : }
536 : log_trc("start_idx=%d", start_idx);
537 : unsigned count = 0;
538 1540481 : for(unsigned i=start_idx; i<table->num_subscriptions; i++) {
539 770503 : if(table->subscriptions[i].tag == tag) {
540 770499 : count++;
541 : } else {
542 : break;
543 : }
544 : }
545 769982 : *start = start_idx;
546 769982 : return count;
547 : }
548 :
549 : static unsigned
550 211120298 : lookup_tag_cached(struct netio_subscription_table* table, netio_tag_t tag, struct netio_subscription_cache* cache, unsigned* start)
551 : {
552 211120298 : if(cache->ts != table->ts) {
553 34 : log_trc("subscription updating cache");
554 34 : cache->count = lookup_tag(table, tag, &cache->idx_start);
555 34 : cache->ts = table->ts;
556 : }
557 :
558 : #if defined DEBUG || defined DEBUG_UBUF
559 : printf("Subscription table:");
560 : for(unsigned i=0; i<table->num_subscriptions; i++) {
561 : printf("%u:%lu ", i, table->subscriptions[i].tag);
562 : }
563 : printf("\n");
564 : #endif
565 :
566 211120298 : *start = cache->idx_start;
567 211120298 : log_trc("cache count=%d for tag=%lu", cache->count, tag);
568 211120298 : return cache->count;
569 : }
570 :
571 : /**
572 : * @brief Publishes a message on an unbuffered publish socket.
573 : *
574 : * @see netio_unbuffered_publishv_usr
575 : *
576 : * Same as netio_unbuffered_publishv_usr, but without the user header data field.
577 : */
578 : int
579 1316832 : netio_unbuffered_publishv(struct netio_unbuffered_publish_socket* socket,
580 : netio_tag_t tag,
581 : struct iovec* iov,
582 : size_t count,
583 : uint64_t* key,
584 : int flags,
585 : struct netio_subscription_cache* cache)
586 : {
587 1316832 : return netio_unbuffered_publishv_usr(socket, tag, iov, count, key, flags, cache, 0, 0);
588 : }
589 :
590 : /**
591 : * @brief Publishes a message on an unbuffered publish socket.
592 : *
593 : * The message is given as a scatter/gather buffer (`struct iovec`). The caller
594 : * has to ensure the validity of the buffer until the transfer is complete.
595 : * A transfer is complete when the socket's `msg_published` callback has been
596 : * called. A key can be passed to the call to identify the publication.
597 : * The key will be passed in the `msg_published` callback.
598 : *
599 : * The `msg_published` callback will only be called if the message has been
600 : * sent successfully to all subscribed endpoints.
601 : *
602 : * The call may return `NETIO_STATUS_AGAIN` if one of the sockets connections
603 : * yields `NETIO_STATUS_AGAIN`. In this case it is the user's responsibility
604 : * to call `netio_unbuffered_publishv` again with the `NETIO_REENTRY` flag.
605 : *
606 : * @param socket: The socket to publish on
607 : * @param tag: The tag under which to publish
608 : * @param iov: Message data iov
609 : * @param count: IOV count
610 : * @param key: Key that will be passed to the callback on successful publish of the
611 : * message. This is an input-output parameter. In case the function
612 : * returns NETIO_STATUS_PARTIAL, 'key' is used as storage to track the
613 : * completion data for the given tag. If netio_unbuffered_publishv is
614 : * called again with the NETIO_REENTRY flag, 'key' must remain
615 : * unchanged. In other words, for a given tag, 'key' is only set by the
616 : * user before the initial call to netio_unbuffered_publishv without
617 : * the NETIO_REENTRY flag.
618 : * @param flags: NETIO_REENTRY publishing of this message was attempted before and
619 : * resulted in NETIO_STATUS_AGAIN. Calling publish with
620 : * this flag will only send on connections where the
621 : * message was previously unpublished.
622 : * @param cache: Optional user-supplied cache for the subsctiption table lookup.
623 : * @param usr: Up to 8 byte of data that are transmitted as beginning of the message. This
624 : * allows the user to add a short header to a message without having to allocate
625 : * bufferspace for it.
626 : * @param usr_size: Size of the usr header field. Set to 0 if no header is required. The maximum
627 : * header size is 8.
628 : *
629 : * @returns NETIO_STATUS_OK If the message was published successfully to all subscribed endpoints
630 : * @returns NETIO_STATUS_OK_NOSUB No ongoing subscriptions to publish the given message
631 : * @returns NETIO_STATUS_AGAIN If not enough resources are available to proceed with the operation.
632 : * No data were sent to any endpoint. The user should try again with the exact same parameters.
633 : * @returns NETIO_STATUS_PARTIAL The message was sent to some of the subscribed endpoints, but not all. The user should
634 : * try again, and additionally set the NETIO_REENTRY flag. Users must take care not to
635 : * overwrite the key parameter, which is used by the function call to track the operation
636 : * status.
637 : * @returns NETIO_ERROR_MAX_IOV_EXCEEDED
638 : * Too many iovec entries, try with less.
639 : */
640 : int
641 212437131 : netio_unbuffered_publishv_usr(struct netio_unbuffered_publish_socket* socket,
642 : netio_tag_t tag,
643 : struct iovec* iov,
644 : size_t count,
645 : uint64_t* key,
646 : int flags,
647 : struct netio_subscription_cache* cache,
648 : uint64_t usr,
649 : uint8_t usr_size)
650 : {
651 212437131 : int ret = NETIO_STATUS_OK;
652 212437131 : if(count > NETIO_MAX_IOV_LEN-1) {
653 : return NETIO_ERROR_MAX_IOV_EXCEEDED;
654 : }
655 :
656 212437131 : log_trc("unbuffered publishv, key=0x%p, tag=%lu", key, tag);
657 :
658 212437131 : unsigned start_idx;
659 212437131 : unsigned num_subscriptions;
660 :
661 212437131 : if(cache) {
662 211120298 : num_subscriptions = lookup_tag_cached(&socket->subscription_table, tag, cache, &start_idx);
663 : } else {
664 1316833 : num_subscriptions = lookup_tag(&socket->subscription_table, tag, &start_idx);
665 : }
666 :
667 212437131 : log_trc("unbuffered publishv: num_subscriptions for tag %lu: %d, start_idx: %d", tag, num_subscriptions, start_idx);
668 212437131 : if(num_subscriptions == 0) {
669 194299751 : if(socket->cb_msg_published) {
670 194299751 : socket->cb_msg_published(socket, *key);
671 : }
672 194299751 : return NETIO_STATUS_OK_NOSUB;
673 : }
674 :
675 18137380 : int used_completion = 0;
676 18137380 : struct netio_completion_object* completion = NULL;
677 18137380 : if(flags & NETIO_REENTRY) {
678 0 : log_trc("unbuffered publishv: REENTRY - fetching completion from user");
679 0 : completion = (struct netio_completion_object*)(*key);
680 0 : used_completion = 1;
681 : } else {
682 18137380 : log_trc("unbuffered publishv: fetching completion object");
683 18137380 : completion = fetch_completion_object(socket);
684 18137380 : if(completion) {
685 10805972 : netio_semphore_set_threshold(&completion->sem, num_subscriptions);
686 10805972 : completion->key = *key;
687 10805972 : completion->header.tag = tag;
688 10805972 : completion->header.usr = usr;
689 10805972 : completion->usr_size = usr_size;
690 10805972 : socket->completion_stack.key_array[socket->completion_stack.available_objects] = *key;
691 10805972 : *key = (uint64_t)completion;
692 10805972 : used_completion = 0;
693 10805972 : log_trc("fetched completion: %lu for tag 0%lx", completion->key, tag);
694 : } else {
695 : log_trc("unbuffered publishv: no completion available -> AGAIN");
696 : // When no completion is available, we return NETIO_STATUS_AGAIN
697 : // The user is supposed to call the same call again (no need to keep track of completion object)
698 : return NETIO_STATUS_AGAIN;
699 : }
700 : }
701 :
702 :
703 22112002 : for(unsigned i=start_idx; i<start_idx+num_subscriptions; i++) {
704 11306030 : struct netio_subscription* subscription = &socket->subscription_table.subscriptions[i];
705 11306030 : log_trc("subscription %d has tag %lu", i, subscription->tag);
706 11306030 : if(subscription->tag == tag) {
707 : // skip connections that were already successful if we are in reentry mode
708 11306030 : if(flags & NETIO_REENTRY) {
709 0 : if(subscription->again == 0) {
710 0 : continue;
711 : }
712 : }
713 :
714 11306030 : struct netio_send_socket* ssocket = (struct netio_send_socket*)(subscription->socket);
715 11306030 : log_trc("unbuffered publishv: send message on subscribed connection, iov-count=%lu, iov-len[0]=%lu", count, iov[0].iov_len);
716 11306030 : int result;
717 11306030 : struct iovec hdr_iov[NETIO_MAX_IOV_LEN];
718 11306030 : hdr_iov[0].iov_base = &completion->header;
719 11306030 : hdr_iov[0].iov_len = sizeof(netio_tag_t) + completion->usr_size;
720 11306030 : uint32_t total_size=hdr_iov[0].iov_len;
721 :
722 54812069 : for(unsigned j=0; j<count; j++) {
723 43506039 : hdr_iov[1+j].iov_base = iov[j].iov_base;
724 43506039 : hdr_iov[1+j].iov_len = iov[j].iov_len;
725 43506039 : total_size+=iov[j].iov_len;
726 : }
727 11306030 : log_trc("iov: [0]: %lu - compl: %lu", *((netio_tag_t*)hdr_iov[0].iov_base), completion->key);
728 11306030 : if (socket->tcp_fi_mode==NETIO_MODE_LIBFABRIC) {
729 11306030 : result = netio_sendv_imm(ssocket, socket->buf_array, hdr_iov, count+1, (uint64_t)completion, 0);
730 : }
731 : else {
732 0 : result = netio_tcp_sendv_imm(ssocket, total_size,
733 : hdr_iov, count+1, (uint64_t)completion, 0);
734 : }
735 11306030 : log_trc("unbuffered publishv: result=%d", result);
736 :
737 11306030 : if(result == NETIO_STATUS_OK) {
738 11306030 : used_completion = 1;
739 11306030 : subscription->again = 0;
740 0 : } else if(result == NETIO_STATUS_AGAIN) {
741 0 : subscription->again = 1;
742 0 : used_completion = 1;
743 0 : ret = NETIO_STATUS_PARTIAL;
744 : } else {
745 0 : return result;
746 : // some error occured and we return immediately
747 : // TODO we should handle the error and unsubscribe the faulty remote
748 : }
749 : }
750 : }
751 :
752 10805972 : if(used_completion == 0) {
753 0 : netio_completion_stack_push(&socket->completion_stack, completion);
754 : }
755 :
756 : return ret;
757 : }
758 :
759 :
760 : static void
761 100 : subscribe_socket_on_connection_established(struct netio_recv_socket* socket)
762 : {
763 100 : struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->lsocket->usr;
764 100 : if(subscribe_socket->cb_connection_established) {
765 94 : subscribe_socket->cb_connection_established(subscribe_socket);
766 : }
767 100 : }
768 :
769 :
770 : static void
771 46 : subscribe_socket_on_connection_closed(struct netio_recv_socket* socket)
772 : {
773 46 : struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->lsocket->usr;
774 46 : log_info("subscribe socket: connection closed");
775 46 : if(subscribe_socket->cb_connection_closed) {
776 46 : subscribe_socket->cb_connection_closed(subscribe_socket);
777 : }
778 46 : }
779 :
780 :
781 : static void
782 29569709 : subscribe_socket_on_msg_received(struct netio_recv_socket* rsocket, struct netio_buffer* buf, void* data, size_t len)
783 : {
784 29569709 : struct netio_unbuffered_subscribe_socket* socket = (struct netio_unbuffered_subscribe_socket*)rsocket->lsocket->usr;
785 :
786 29569709 : netio_tag_t tag;
787 29569709 : memcpy(&tag, data, sizeof(netio_tag_t));
788 29569709 : data = (char *)data + sizeof(netio_tag_t);
789 29569709 : len -= sizeof(netio_tag_t);
790 :
791 29569709 : log_trc("buffer received of length %lu for tag %lu", len, tag);
792 :
793 29569709 : if(socket->cb_msg_received) {
794 29569709 : socket->cb_msg_received(socket, tag, data, len);
795 : }
796 :
797 29569700 : netio_post_recv(rsocket, buf);
798 29569700 : }
799 :
800 :
801 : static void
802 99 : subsocket_on_connection_established(struct netio_send_socket* socket)
803 : {
804 99 : log_dbg("subsocket connection established");
805 99 : int ret;
806 99 : struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->usr;
807 :
808 99 : if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
809 60 : subscribe_socket->msg.addrlen = NETIO_MAX_ADDRLEN;
810 60 : if((ret = fi_getname(&subscribe_socket->recv_socket.pep->fid,
811 60 : subscribe_socket->msg.addr,
812 : &subscribe_socket->msg.addrlen)) !=0) {
813 0 : log_fatal("Cannot get information on local interface %d: %s", ret, fi_strerror(-ret));
814 0 : exit(1);
815 : }
816 60 : subscribe_socket->buf.data = &subscribe_socket->msg;
817 60 : subscribe_socket->buf.size = sizeof(struct netio_subscription_message);
818 60 : netio_register_send_buffer(socket, &subscribe_socket->buf, 0);
819 : }
820 39 : else if (socket->tcp_fi_mode == NETIO_MODE_TCP) {
821 : // initialize msgs[0] with defaults
822 39 : struct sockaddr sock_addr;
823 39 : socklen_t addrlen=sizeof(sock_addr);
824 39 : getsockname(socket->eq_ev_ctx.fd, &sock_addr, &addrlen);
825 39 : getnameinfo(&sock_addr, addrlen,
826 39 : subscribe_socket->msgs[0].addr,NETIO_MAX_ADDRLEN,
827 : NULL, 0, NI_NUMERICHOST);
828 39 : addrlen=strlen(subscribe_socket->msgs[0].addr);
829 39 : subscribe_socket->msgs[0].addr[addrlen] = 0;
830 39 : subscribe_socket->msgs[0].addrlen = addrlen+1;
831 39 : subscribe_socket->msgs[0].port = subscribe_socket->msg.port;
832 : }
833 :
834 99 : if(subscribe_socket->total_tags == 0){
835 0 : log_info("Closing send connection again because there is no tag to subscribe to.");
836 0 : netio_disconnect(socket);
837 : }
838 :
839 99 : if(subscribe_socket->total_tags == 0){
840 0 : log_info("Closing send connection again because there is no tag to subscribe to.");
841 0 : netio_disconnect(socket);
842 : }
843 :
844 : // send tags one by one
845 366 : while(subscribe_socket->total_tags > 0){
846 267 : size_t idx = subscribe_socket->total_tags - 1;
847 267 : netio_tag_t tag = subscribe_socket->tags_to_subscribe[idx];
848 267 : log_info("subsocket_on_connection_established, sending subscription for tag 0x%lx pos %lu", tag, idx);
849 267 : ret = send_subscription_message(subscribe_socket, tag, NETIO_SUBSCRIBE);
850 267 : if (ret == NETIO_STATUS_OK){
851 267 : subscribe_socket->total_tags--;
852 : } else {
853 0 : log_warn("subsocket_on_connection_established: send_subscription_message returned %d", ret);
854 0 : break;
855 : }
856 : }
857 99 : }
858 :
859 :
860 : static void
861 41 : subsocket_on_connection_closed(struct netio_send_socket* socket)
862 : {
863 41 : struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->usr;
864 41 : subscribe_socket->state = NONE;
865 41 : }
866 :
867 :
868 : static void
869 26 : subsocket_on_error_connection_refused(struct netio_send_socket* socket) {
870 26 : log_dbg("subsocket connection refused");
871 26 : struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->usr;
872 26 : subscribe_socket->state = NONE;
873 26 : handle_listen_socket_shutdown(&subscribe_socket->recv_socket);
874 26 : if(subscribe_socket->cb_error_connection_refused) {
875 26 : subscribe_socket->cb_error_connection_refused(subscribe_socket);
876 : }
877 26 : }
878 :
879 :
880 : static void
881 2246 : subsocket_on_send_completed(struct netio_send_socket* socket, uint64_t key)
882 : {
883 2246 : struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->usr;
884 :
885 : //check for remaining tags from on_connection_established
886 2246 : while(subscribe_socket->total_tags > 0){
887 0 : size_t idx = subscribe_socket->total_tags - 1;
888 0 : netio_tag_t tag = subscribe_socket->tags_to_subscribe[idx];
889 0 : int ret = send_subscription_message(subscribe_socket, tag, NETIO_SUBSCRIBE);
890 0 : if (ret == NETIO_STATUS_OK){
891 0 : subscribe_socket->total_tags--;
892 : } else {
893 0 : log_warn("subsocket_on_send_completed: send_subscription_message returned %d", ret);
894 0 : break;
895 : }
896 : }
897 2246 : }
898 :
899 :
900 : /**
901 : * Initialize an unbuffered subscribe socket
902 : *
903 : * @param socket: An unbuffered subscribe socket
904 : * @param ctx: A netio context
905 : * @param hostname: A local hostname or IP to bind to
906 : * @param remote_host: Hostname or IP of the remote publish socket
907 : * @param remote_port: Port of the remote publish socket
908 : * @param count: Size of the buffer array
909 : */
910 86 : void netio_unbuffered_subscribe_libfabric_socket_init(struct netio_unbuffered_subscribe_socket* socket,
911 : struct netio_context* ctx,
912 : const char* hostname,
913 : const char* remote_host,
914 : unsigned remote_port,
915 : size_t buffer_size,
916 : size_t count)
917 : {
918 86 : memset(socket, 0, sizeof(*socket));
919 86 : socket->ctx = ctx;
920 86 : socket->state = NONE;
921 86 : socket->tcp_fi_mode = NETIO_MODE_LIBFABRIC;
922 86 : struct netio_unbuffered_socket_attr attr = {count, buffer_size};
923 86 : netio_init_listen_socket(&socket->recv_socket, ctx, &attr);
924 86 : socket->recv_socket.usr = socket;
925 86 : socket->recv_socket.cb_msg_received = subscribe_socket_on_msg_received;
926 86 : socket->recv_socket.cb_connection_established = subscribe_socket_on_connection_established;
927 86 : socket->recv_socket.cb_connection_closed = subscribe_socket_on_connection_closed;
928 86 : netio_listen(&socket->recv_socket, hostname, 0);
929 :
930 86 : socket->remote_hostname = strdup(remote_host);
931 86 : socket->remote_port = remote_port;
932 :
933 86 : log_dbg("subscribe socket is listening");
934 :
935 86 : socket->total_tags = 0;
936 86 : }
937 :
938 :
939 : /* As above but with tcp instead of libfabric */
940 39 : void netio_unbuffered_subscribe_tcp_socket_init(struct netio_unbuffered_subscribe_socket* socket,
941 : struct netio_context* ctx,
942 : const char* hostname,
943 : const char* remote_host,
944 : unsigned remote_port,
945 : size_t buffer_size,
946 : size_t count)
947 : {
948 39 : log_dbg("INIT: netio_unbuffered_subscribe_tcp_socket_init");
949 39 : memset(socket, 0, sizeof(*socket));
950 39 : socket->ctx = ctx;
951 39 : socket->state = NONE;
952 39 : socket->tcp_fi_mode = NETIO_MODE_TCP;
953 39 : struct netio_unbuffered_socket_attr attr = {count, buffer_size};
954 39 : netio_init_listen_tcp_socket(&socket->recv_socket, ctx, &attr);
955 39 : socket->recv_socket.usr = socket;
956 39 : socket->recv_socket.cb_msg_received = subscribe_socket_on_msg_received;
957 39 : socket->recv_socket.cb_connection_established = subscribe_socket_on_connection_established;
958 39 : socket->recv_socket.cb_connection_closed = subscribe_socket_on_connection_closed;
959 39 : netio_listen_tcp(&socket->recv_socket, hostname, 0);
960 :
961 39 : log_dbg("socket=%p recv_socket.port=%d\n",socket,socket->recv_socket.port);
962 39 : socket->msg.port=socket->recv_socket.port;
963 :
964 39 : socket->remote_hostname = strdup(remote_host);
965 39 : socket->remote_port = remote_port;
966 :
967 39 : socket->total_tags = 0;
968 39 : }
969 :
970 : /**
971 : * Initialize an unbuffered subscribe socket
972 : *
973 : * @param socket: An unbuffered subscribe socket
974 : * @param ctx: A netio context
975 : * @param hostname: A local hostname or IP to bind to
976 : * @param remote_host: Hostname or IP of the remote publish socket
977 : * @param remote_port: Port of the remote publish socket
978 : * @param buffers: Array of registered receive buffers
979 : * @param count: Size of the buffer array
980 : */
981 125 : void netio_unbuffered_subscribe_socket_init(struct netio_unbuffered_subscribe_socket* socket,
982 : struct netio_context* ctx,
983 : const char* hostname,
984 : const char* remote_host,
985 : unsigned remote_port,
986 : //struct netio_buffer* buffers,
987 : size_t buffer_size,
988 : size_t count)
989 : {
990 125 : if (netio_tcp_mode(hostname)) {
991 39 : netio_unbuffered_subscribe_tcp_socket_init(socket, ctx, netio_hostname(hostname), netio_hostname(remote_host), remote_port, buffer_size, count);
992 : } else {
993 86 : netio_unbuffered_subscribe_libfabric_socket_init(socket, ctx, netio_hostname(hostname), netio_hostname(remote_host), remote_port, buffer_size, count);
994 : }
995 125 : }
996 :
997 : /**
998 : * Initializes an unbuffered send socket.
999 : *
1000 : * @param socket: The socket to initialize
1001 : * @param ctx: The NetIO context object in which to initialize the socket
1002 : */
1003 : void
1004 137 : netio_unbuffered_send_socket_init(struct netio_send_socket* socket, struct netio_context* ctx)
1005 : {
1006 137 : netio_init_send_socket(socket, ctx);
1007 137 : socket->cb_internal_connection_closed = on_unbuffered_send_connection_closed;
1008 137 : }
1009 :
1010 :
1011 : /**
1012 : * Subscribe an unbuffered subscribe socket to a given tag.
1013 : *
1014 : * @param socket: An unbuffered subscribe socket
1015 : * @param tag: A netio tag
1016 : */
1017 : int
1018 1183 : netio_unbuffered_subscribe(struct netio_unbuffered_subscribe_socket* socket, netio_tag_t tag)
1019 : {
1020 1183 : if(socket->state == NONE) {
1021 125 : log_dbg("Creating and connecting a new socket");
1022 125 : if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC) {
1023 86 : netio_init_send_socket(&socket->socket, socket->ctx);
1024 86 : socket->socket.usr = socket;
1025 86 : socket->socket.cb_connection_established = subsocket_on_connection_established;
1026 86 : socket->socket.cb_connection_closed = subsocket_on_connection_closed;
1027 86 : socket->socket.cb_send_completed = subsocket_on_send_completed;
1028 86 : socket->socket.cb_error_connection_refused = subsocket_on_error_connection_refused;
1029 86 : socket->socket.cb_internal_connection_closed = on_unbuffered_send_connection_closed;
1030 86 : netio_connect(&socket->socket, socket->remote_hostname, socket->remote_port);
1031 : }
1032 39 : else if (socket->tcp_fi_mode == NETIO_MODE_TCP) {
1033 39 : netio_init_send_tcp_socket(&socket->socket, socket->ctx);
1034 39 : socket->socket.usr = socket;
1035 39 : socket->socket.cb_connection_established = subsocket_on_connection_established;
1036 39 : socket->socket.cb_connection_closed = subsocket_on_connection_closed;
1037 39 : socket->socket.cb_error_connection_refused = subsocket_on_error_connection_refused;
1038 39 : socket->socket.cb_send_completed = subsocket_on_send_completed;
1039 39 : socket->socket.cb_internal_connection_closed = on_unbuffered_send_connection_closed;
1040 39 : netio_connect_tcp(&socket->socket, socket->remote_hostname, socket->remote_port);
1041 : } else {
1042 0 : log_error("Unsupported connection type %d", socket->tcp_fi_mode);
1043 0 : return 1;
1044 : }
1045 125 : socket->state = INITIALIZED;
1046 : }
1047 :
1048 3760 : for(unsigned i=0; i<socket->total_tags; i++) {
1049 2577 : if(socket->tags_to_subscribe[i] == tag) {
1050 : return 0;
1051 : }
1052 : }
1053 :
1054 : //if send socket connected send message
1055 : //otherwise on_connection_established will do it
1056 1183 : if (socket->socket.state){
1057 890 : log_info("Sending subscription message for tag 0x%lx", tag);
1058 890 : int ret = send_subscription_message(socket, tag, NETIO_SUBSCRIBE);
1059 890 : return ret;
1060 : } else {
1061 293 : log_info("Queing subscription message for txg 0x%lx", tag);
1062 293 : socket->tags_to_subscribe[socket->total_tags] = tag;
1063 293 : socket->total_tags++;
1064 293 : return 0;
1065 : }
1066 :
1067 : return 0;
1068 : }
1069 :
1070 :
1071 : static int
1072 0 : remove_tag_to_subscribe(struct netio_unbuffered_subscribe_socket* socket, netio_tag_t tag)
1073 : {
1074 0 : int found = 0;
1075 0 : for(unsigned int i=0; i<socket->total_tags; ++i){
1076 0 : if(socket->tags_to_subscribe[i] == tag){
1077 0 : log_info("Removing tag 0x%lx from tags_to_subscribe of socket %p", socket);
1078 0 : for(unsigned int j = i; j < socket->total_tags-1; ++j){
1079 0 : socket->tags_to_subscribe[j] = socket->tags_to_subscribe[j+1];
1080 : }
1081 0 : found = 1;
1082 0 : socket->total_tags--;
1083 0 : break;
1084 : }
1085 : }
1086 0 : if(found == 0){
1087 0 : log_warn("tag 0x%lx expected in tags_to_subscribe of socket %p not found", tag);
1088 : }
1089 0 : return NETIO_STATUS_OK;
1090 : }
1091 :
1092 :
1093 : /**
1094 : * Unsubscribe from a given message tag.
1095 : *
1096 : * For a given subscribe socket, `netio_unsubscribe` can be called multiple times.
1097 : *
1098 : * @param socket: The unbuffered subscribe socket.
1099 : * @param tag: The tag to unsubscribe from.
1100 : */
1101 : int
1102 1098 : netio_unbuffered_unsubscribe(struct netio_unbuffered_subscribe_socket* socket, netio_tag_t tag)
1103 : {
1104 1098 : int ret = NETIO_STATUS_OK;
1105 1098 : if(socket->state == INITIALIZED) {
1106 1098 : log_dbg("Subscribe socket initialised, can proceed with unsubscription");
1107 1098 : if (socket->socket.state) {
1108 1098 : ret = send_subscription_message(socket, tag, NETIO_UNSUBSCRIBE);
1109 1098 : log_dbg("Unsubscription message for tag 0x%lx sent, return code %d", tag, ret);
1110 : } else {
1111 0 : ret = remove_tag_to_subscribe(socket, tag);
1112 : }
1113 : } else {
1114 1098 : log_dbg("The connection has been already closed.");
1115 : }
1116 1098 : return ret;
1117 : }
1118 :
1119 :
1120 : /**
1121 : * Returns the available number of completion objects of an unbuffered publish
1122 : * socket
1123 : *
1124 : * @param socket: The unbuffered publish socket.
1125 : */
1126 : unsigned
1127 104 : netio_pubsocket_get_available_co(struct netio_unbuffered_publish_socket* socket)
1128 : {
1129 104 : if(socket != NULL){
1130 104 : return socket->completion_stack.available_objects;
1131 : } else {
1132 : return 0;
1133 : }
1134 : }
|