Line data Source code
1 : #include <stdio.h>
2 : #include <string.h>
3 : #include <assert.h>
4 : #include "log.h"
5 : #include "netio/netio.h"
6 :
7 : #define PUBLISH_SOCKET_MAX_COMPLETIONS (512)
8 :
9 : #if defined DEBUG || defined DEBUG_UPUB
10 : #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
11 : #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
12 : #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
13 : #else
14 : #define log_dbg(...)
15 : #define log_trc(...)
16 : #endif
17 :
18 : static unsigned lookup_tag(struct netio_subscription_table* table, netio_tag_t tag, unsigned* start);
19 :
20 : static int
21 1477454 : cmp_subscription(const void* a, const void *b)
22 : {
23 1477454 : struct netio_subscription* suba = (struct netio_subscription*)a;
24 1477454 : struct netio_subscription* subb = (struct netio_subscription*)b;
25 :
26 1477454 : if(suba->tag == subb->tag) {
27 : return 0;
28 : }
29 16 : return suba->tag > subb->tag ? 1 : -1;
30 : }
31 :
32 :
33 : static int
34 34 : table_add_subscription(struct netio_subscription_table* table, netio_tag_t tag, struct netio_send_socket* socket)
35 : {
36 34 : if(table->num_subscriptions == table->size) {
37 0 : log_error("Maximum number of subscriptions. New subscription for 0x%lx dropped.", tag);
38 0 : return 1;
39 : }
40 :
41 : //Check if the subscription is already in the list
42 34 : unsigned start;
43 34 : unsigned count = lookup_tag(table, tag, &start);
44 38 : for(unsigned i=0; i<count; i++) {
45 4 : if(table->subscriptions[start+i].socket == socket) {
46 : return 0;
47 : }
48 : }
49 :
50 34 : table->subscriptions[table->num_subscriptions].tag = tag;
51 34 : table->subscriptions[table->num_subscriptions].socket = socket;
52 34 : table->subscriptions[table->num_subscriptions].again = 0;
53 34 : table->num_subscriptions++;
54 34 : table->ts++;
55 34 : log_info("New entry in subscription table, tag=%lu in hex 0x%lx, socket=%p, total #subscription=%lu", tag, tag, socket, table->num_subscriptions);
56 34 : qsort(table->subscriptions, table->num_subscriptions, sizeof(struct netio_subscription), cmp_subscription);
57 34 : return 0;
58 : }
59 :
60 :
61 : /**
62 : * @brief Handle an unsubscription or a client disconnection.
63 : *
64 : * @param pubsocket: netio_unbuffered_publish_socket to which all send sockets are associated.
65 : * @param socket: the netio_send_socket not needed anymore
66 : * @param tag: the tag for which an unsubscribe request has been received
67 : * @param closed_connection: a flag to enable the removal of all the subscriptions associated to
68 : * the send socket in response to a closed connection.
69 : */
70 : static void
71 22 : table_remove_subscription(struct netio_unbuffered_publish_socket* pubsocket, struct netio_send_socket* socket, netio_tag_t tag, uint8_t closed_connection)
72 : {
73 22 : struct netio_subscription_table* table = &pubsocket->subscription_table;
74 22 : log_dbg("Total subscriptions: %lu", table->num_subscriptions);
75 :
76 22 : unsigned i=0;
77 22 : unsigned remaining_subscriptions_of_socket=0;
78 45 : for(unsigned int k = 0; k < table->num_subscriptions; ++k) {
79 23 : if(table->subscriptions[k].socket == socket){remaining_subscriptions_of_socket++;}
80 : }
81 :
82 45 : while(i<table->num_subscriptions) {
83 23 : log_dbg("Removing subscription tag %lx table socket %p socket %p", table->subscriptions[i].tag, table->subscriptions[i].socket, socket);
84 23 : if(table->subscriptions[i].socket == socket && (table->subscriptions[i].tag == tag || closed_connection)) {
85 : //increment the completion semaphore counter as if messages were sent to the disconnected client
86 : struct netio_completion_stack* cs = &pubsocket->completion_stack;
87 11286 : for(size_t j=0; j < cs->num_objects; ++j) {
88 11264 : if(cs->objects[j].header.tag == table->subscriptions[i].tag && cs->objects[j].key != 0xFFFFFFFFFFFFFFFF){
89 8523 : netio_semaphore_increment(&cs->objects[j].sem, 1);
90 : }
91 : }
92 :
93 22 : log_dbg("Available completion objects %lu / %lu", cs->available_objects, cs->num_objects);
94 : log_dbg("Removing connection in subscription table, tag=%lu, socket=%p index %lu becomes %u",
95 : table->subscriptions[i].tag,
96 : table->subscriptions[i].socket,
97 : table->num_subscriptions-1,
98 22 : i);
99 22 : table->subscriptions[i].tag = table->subscriptions[table->num_subscriptions-1].tag;
100 22 : table->subscriptions[i].socket = table->subscriptions[table->num_subscriptions-1].socket;
101 22 : table->subscriptions[i].again = table->subscriptions[table->num_subscriptions-1].again;
102 22 : table->num_subscriptions--;
103 22 : remaining_subscriptions_of_socket--;
104 22 : table->ts++;
105 : }
106 : else{
107 1 : i++;
108 45 : log_dbg("Skipping tag=%lu because the socket is %p", table->subscriptions[i].tag, table->subscriptions[i].socket);
109 : }
110 : }
111 :
112 22 : log_dbg("At the end of table_remove_subscription available completion objects %lu / %lu", pubsocket->completion_stack.available_objects, pubsocket->completion_stack.num_objects);
113 22 : qsort(table->subscriptions, table->num_subscriptions, sizeof(struct netio_subscription), cmp_subscription);
114 22 : log_dbg("Remaining subscriptions: %lu",table->num_subscriptions);
115 22 : log_info("Removing entry from unbuffered subscription table, Fid=0x%lx, socket=%p, total #subscription=%lu", tag, socket, table->num_subscriptions);
116 :
117 22 : if(closed_connection==0 && remaining_subscriptions_of_socket==0){
118 13 : log_info("Disconnecting endpoint with zero subscriptions");
119 13 : netio_disconnect(socket);
120 : }
121 22 : }
122 :
123 : static int
124 22 : table_get_subs_by_socket(struct netio_subscription_table* table, struct netio_send_socket* socket)
125 : {
126 22 : log_dbg("Subscription table lookup: Total subscriptions: %lu", table->num_subscriptions);
127 22 : unsigned remaining_subscriptions_of_socket=0;
128 31 : for(unsigned int k = 0; k < table->num_subscriptions; ++k) {
129 9 : if(table->subscriptions[k].socket == socket){remaining_subscriptions_of_socket++;}
130 : }
131 22 : log_dbg("Subscription table lookup: for socket %p: %lu", socket, remaining_subscriptions_of_socket);
132 22 : return remaining_subscriptions_of_socket;
133 : }
134 :
135 : static void
136 36808994 : release_completion_object(struct netio_completion_object* completion)
137 : {
138 36808994 : log_trc("releasing completion object");
139 36808994 : if(netio_completion_stack_push(&completion->socket->completion_stack, completion)) {
140 0 : log_error("Could not push completion object since stack was full. This should not happen.");
141 : }
142 36808994 : }
143 :
144 : static void
145 36808994 : on_completion_trigger(void* c)
146 : {
147 36808994 : struct netio_completion_object* completion = (struct netio_completion_object*)c;
148 36808994 : log_trc("calling cb_msg_published");
149 36808994 : if(completion->socket->cb_msg_published) {
150 36808992 : completion->socket->cb_msg_published(completion->socket, completion->key);
151 : }
152 36808994 : release_completion_object(completion);
153 36808994 : }
154 :
155 0 : void print_completion_objects(struct netio_unbuffered_publish_socket* socket){
156 0 : printf("Number of available completion objects: %zu \n", socket->completion_stack.available_objects);
157 0 : printf("===============================================\n");
158 0 : printf("CO# \t KEY \t \t TAG \n");
159 0 : printf("-----------------------------------------------\n");
160 0 : for(unsigned int i=0; i < socket->completion_stack.num_objects; ++i){
161 0 : uint32_t tag = (socket->completion_stack.key_array[i] >> 32);
162 0 : printf("%u \t 0x%lx \t %x \n", i, socket->completion_stack.key_array[i], tag);
163 : }
164 0 : printf("===============================================\n");
165 0 : printf("Subscription table: there are %zu elinks subscribed\n", socket->subscription_table.num_subscriptions);
166 0 : printf("FID \t \t \t SOCKET\n");
167 0 : printf("-----------------------------------------------\n");
168 0 : for (unsigned int i=0; i < socket->subscription_table.num_subscriptions; ++i){
169 0 : printf("0x%lx \t %p \n", socket->subscription_table.subscriptions[i].tag, socket->subscription_table.subscriptions[i].socket);
170 : }
171 0 : socket->completion_stack.printed = 1;
172 0 : }
173 :
174 :
175 :
176 : static struct netio_completion_object*
177 38747186 : fetch_completion_object(struct netio_unbuffered_publish_socket* socket)
178 : {
179 38747186 : struct netio_completion_object* completion;
180 38747186 : if(netio_completion_stack_pop(&socket->completion_stack, &completion)) {
181 : #if defined DEBUG || defined DEBUG_UPUB
182 : if (socket->completion_stack.printed == 0){
183 : print_completion_objects(socket);
184 : }
185 : #endif
186 : return NULL;
187 : }
188 :
189 36808994 : netio_semaphore_init(&completion->sem, 0);
190 36808994 : completion->sem.data = completion;
191 36808994 : completion->sem.cb = on_completion_trigger;
192 36808994 : completion->socket = socket;
193 36808994 : return completion;
194 : }
195 :
196 : static void
197 36805246 : increment_completion_object(struct netio_unbuffered_publish_socket* pubsocket, uint64_t key)
198 : {
199 36805246 : log_trc("incrementing completion object");
200 36805246 : struct netio_completion_object *completion = (struct netio_completion_object*)key;
201 36805246 : netio_semaphore_increment(&completion->sem, 1);
202 36805246 : log_trc("current: %d expected: %d", completion->sem.current, completion->sem.threshold);
203 : }
204 :
205 : static void
206 32 : pubsocket_on_connection_established(struct netio_send_socket* socket)
207 : {
208 32 : log_dbg("publish socket established connection to remote, can publish now");
209 32 : struct netio_unbuffered_publish_socket* pubsocket = (struct netio_unbuffered_publish_socket*)socket->unbuf_pub_socket;
210 : //add deferred subscriptions to the table
211 32 : struct deferred_subscription* sub = socket->deferred_subs;
212 66 : while(sub){
213 34 : int ret = table_add_subscription(&pubsocket->subscription_table, sub->tag, socket);
214 34 : if(pubsocket->cb_subscribe && ret == 0){
215 34 : pubsocket->cb_subscribe(pubsocket, sub->tag, NULL, 0);
216 : }
217 34 : pop_subscription(&sub);
218 : }
219 : //user callback
220 32 : if(pubsocket->cb_connection_established) {
221 32 : pubsocket->cb_connection_established(pubsocket);
222 : }
223 32 : }
224 :
225 :
226 : static void
227 22 : pubsocket_on_connection_closed(struct netio_send_socket* socket)
228 : {
229 22 : log_info("published socket: connection to remote was closed");
230 22 : struct netio_unbuffered_publish_socket* pubsocket = (struct netio_unbuffered_publish_socket*)socket->unbuf_pub_socket;
231 22 : if(pubsocket->cb_connection_closed) {
232 22 : pubsocket->cb_connection_closed(pubsocket);
233 : }
234 44 : if (table_get_subs_by_socket(&pubsocket->subscription_table, socket)){
235 9 : uint8_t closed_connection = 1;
236 9 : table_remove_subscription(pubsocket, socket, 0, closed_connection);
237 : }
238 22 : handle_send_socket_shutdown(socket);
239 22 : remove_socket(&(pubsocket->subscription_table.socket_list), socket);
240 22 : }
241 :
242 :
243 : static void
244 318 : on_unbuffered_send_connection_closed(struct netio_send_socket* socket)
245 : {
246 318 : log_info("Send socket: connection to remote was closed");
247 318 : if(socket->unbuf_pub_socket != NULL) {
248 0 : pubsocket_on_connection_closed(socket);
249 : } else{
250 318 : handle_send_socket_shutdown(socket);
251 : }
252 318 : }
253 :
254 : static void
255 36805246 : pubsocket_on_send_completed(struct netio_send_socket* socket, uint64_t key)
256 : {
257 36805246 : struct netio_unbuffered_publish_socket* pubsocket = (struct netio_unbuffered_publish_socket*)socket->unbuf_pub_socket;
258 36805246 : increment_completion_object(pubsocket, key);
259 36805246 : log_trc("completion on pubsocket connection, key=%lu", key);
260 36805246 : }
261 :
262 :
263 : static struct netio_send_socket*
264 34 : socket_list_add_or_lookup(struct netio_unbuffered_publish_socket* pubsocket, struct netio_socket_list** list, void* addr, size_t addrlen, struct netio_context* ctx)
265 : {
266 34 : if(addrlen == 0) {
267 0 : log_error("Invalid zero-byte address");
268 0 : return NULL;
269 : }
270 :
271 34 : struct netio_socket_list* entry = find_socket_by_address(*list, addr, addrlen);
272 :
273 34 : if ( entry == NULL ) {
274 32 : log_dbg("socket_list_add_or_lookup, adding new entry");
275 32 : entry = add_socket_with_address(list, USEND, addr, addrlen);
276 32 : struct netio_domain* domain = NULL; //domain is shared among sockets,
277 32 : if (entry->next != NULL && entry->next->socket != NULL) { //because we push to head of list
278 5 : domain = ((struct netio_send_socket*)(entry->next->socket))->domain;
279 : }
280 32 : struct netio_send_socket* socket = (struct netio_send_socket*)entry->socket;
281 32 : netio_init_send_socket(socket, ctx); //here we memset to zero and we loose domain
282 32 : socket->unbuf_pub_socket = pubsocket;
283 32 : socket->cb_connection_established = pubsocket_on_connection_established;
284 32 : socket->cb_connection_closed = pubsocket_on_connection_closed;
285 32 : socket->cb_send_completed = pubsocket_on_send_completed;
286 32 : netio_connect_rawaddr_domain(socket, entry->addr, entry->addrlen, domain);
287 32 : if ( domain == NULL ){ //check on local domain variable
288 27 : netio_register_send_buffer(socket, &pubsocket->buf, 0);
289 27 : netio_completion_stack_register_send_socket(&pubsocket->completion_stack, socket);
290 : }
291 : }
292 34 : struct netio_send_socket* ss = (struct netio_send_socket*)entry->socket;
293 34 : return ss;
294 : }
295 :
296 :
297 : static void
298 34 : subscribe(struct netio_unbuffered_publish_socket* pubsocket, struct netio_recv_socket* recv_socket, void* addr, size_t addrlen, netio_tag_t tag)
299 : {
300 34 : struct netio_send_socket* socket = socket_list_add_or_lookup(pubsocket, &pubsocket->subscription_table.socket_list, addr, addrlen, pubsocket->ctx);
301 34 : if(socket->recv_socket == NULL){
302 32 : socket->recv_socket =recv_socket;
303 : }
304 :
305 34 : if (socket->state == CONNECTED){
306 0 : table_add_subscription(&pubsocket->subscription_table, tag, socket);
307 0 : if(pubsocket->cb_subscribe) {
308 0 : pubsocket->cb_subscribe(pubsocket, tag, addr, addrlen);
309 : }
310 : } else {
311 34 : push_back_subscription(&socket->deferred_subs, tag);
312 : }
313 34 : }
314 :
315 : static void
316 13 : unsubscribe(struct netio_unbuffered_publish_socket* pubsocket, void* addr, size_t addrlen, netio_tag_t tag)
317 : {
318 13 : struct netio_socket_list* list = find_socket_by_address(pubsocket->subscription_table.socket_list, addr, addrlen);
319 13 : if(list == NULL){return;}
320 :
321 13 : struct netio_send_socket* socket = (struct netio_send_socket*)list->socket;
322 13 : uint8_t connection_closed = 0;
323 13 : table_remove_subscription(pubsocket, socket, tag, connection_closed);
324 13 : pubsocket->subscription_table.ts++;
325 :
326 13 : if(pubsocket->cb_unsubscribe) {
327 2 : pubsocket->cb_unsubscribe(pubsocket, tag, addr, addrlen);
328 : }
329 : }
330 :
331 : static void
332 32 : lsocket_on_connection_established(struct netio_recv_socket* socket)
333 : {
334 32 : log_dbg("Buffered listen socket: on connection establsihed");
335 32 : }
336 :
337 :
338 : static void
339 47 : parse_subscription_message(struct netio_unbuffered_publish_socket* socket, struct netio_subscription_message* msg, struct netio_recv_socket* recv_socket)
340 : {
341 47 : if (msg->action){
342 34 : log_dbg("subscription request for tag %lu, addrlen=%lu", msg->tag, msg->addrlen);
343 34 : subscribe(socket, recv_socket, msg->addr, msg->addrlen, msg->tag);
344 : }
345 : else{
346 13 : log_dbg("unsubscription request for tag %lu, addrlen=%lu", msg->tag, msg->addrlen);
347 13 : unsubscribe(socket, msg->addr, msg->addrlen, msg->tag);
348 : }
349 47 : }
350 :
351 :
352 : static void
353 47 : lsocket_on_msg_received(struct netio_recv_socket* socket, struct netio_buffer* buf, void* data, size_t len)
354 : {
355 47 : log_trc("message received");
356 47 : struct netio_unbuffered_publish_socket* pubsocket = (struct netio_unbuffered_publish_socket*)socket->lsocket->usr;
357 47 : if(len != sizeof(struct netio_subscription_message)) {
358 0 : log_error("Illegal subscription message size %lu", len);
359 0 : netio_post_recv(socket, buf);
360 0 : return;
361 : }
362 47 : parse_subscription_message(pubsocket, (struct netio_subscription_message*)buf->data, socket);
363 47 : netio_post_recv(socket, buf);
364 : }
365 :
366 :
367 : static int
368 2243 : send_subscription_message(struct netio_unbuffered_subscribe_socket* socket, netio_tag_t tag, int action)
369 : {
370 2243 : if( action == NETIO_SUBSCRIBE ){
371 1134 : log_info("Sending subscription for tag 0x%lx", tag);
372 1109 : } else if ( action == NETIO_UNSUBSCRIBE ){
373 1109 : log_info("Sending unsubscription for tag 0x%lx", tag);
374 : } else {
375 0 : log_error("Invalid subscription action %d", action);
376 0 : return 0;
377 : }
378 2243 : socket->msg.tag = tag;
379 2243 : socket->msg.action = action;
380 2243 : socket->buf.data = &socket->msg;
381 2243 : socket->buf.size = sizeof(struct netio_subscription_message);
382 2243 : int ret = netio_send_inline_buffer(&socket->socket, &socket->buf);
383 2243 : return ret;
384 : }
385 :
386 : /**
387 : * Initialize an unbuffered publish socket
388 : *
389 : * @param socket: An unbuffered publish socket
390 : * @param ctx: A netio context
391 : * @param hostname: Local hostname to bind to
392 : * @param port: Local port to bind to
393 : * @param buf: A registered send buffer
394 : */
395 : void
396 35 : netio_unbuffered_publish_socket_init(struct netio_unbuffered_publish_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffer* buf)
397 : {
398 35 : log_dbg("netio_unbuffered_publish_socket_init");
399 35 : socket->ctx = ctx;
400 35 : memcpy(&socket->buf, buf, sizeof(struct netio_buffer));
401 35 : netio_subscription_table_init(&socket->subscription_table);
402 35 : netio_completion_stack_init(&socket->completion_stack, PUBLISH_SOCKET_MAX_COMPLETIONS);
403 35 : netio_init_listen_socket(&socket->lsocket, ctx, NULL);
404 35 : socket->lsocket.usr = socket;
405 35 : socket->lsocket.cb_connection_established = lsocket_on_connection_established;
406 35 : socket->lsocket.cb_msg_received = lsocket_on_msg_received;
407 35 : socket->lsocket.recv_sub_msg = 1;
408 35 : netio_listen(&socket->lsocket, hostname, port);
409 :
410 35 : socket->buf_array[0] = &socket->completion_stack.buf;
411 980 : for(unsigned i=1; i<NETIO_MAX_IOV_LEN; i++) {
412 945 : socket->buf_array[i] = &socket->buf;
413 : }
414 35 : }
415 :
416 :
417 :
418 : static unsigned
419 2501662 : lookup_tag(struct netio_subscription_table* table, netio_tag_t tag, unsigned* start)
420 : {
421 2501662 : struct netio_subscription key;
422 2501662 : key.tag = tag;
423 5003324 : struct netio_subscription* ptr = (struct netio_subscription*)bsearch(&key,
424 2501662 : table->subscriptions,
425 : table->num_subscriptions,
426 : sizeof(struct netio_subscription),
427 : cmp_subscription);
428 :
429 2501662 : log_trc("found ptr=0x%p", ptr);
430 2501662 : if(ptr == NULL) {
431 : return 0;
432 : }
433 1477434 : unsigned start_idx = ptr - table->subscriptions;
434 1478466 : while(start_idx > 0 && table->subscriptions[start_idx-1].tag == tag) {
435 : start_idx--;
436 : }
437 : log_trc("start_idx=%d", start_idx);
438 : unsigned count = 0;
439 2955900 : for(unsigned i=start_idx; i<table->num_subscriptions; i++) {
440 1478473 : if(table->subscriptions[i].tag == tag) {
441 1478466 : count++;
442 : } else {
443 : break;
444 : }
445 : }
446 1477434 : *start = start_idx;
447 1477434 : return count;
448 : }
449 :
450 : static unsigned
451 81567839 : lookup_tag_cached(struct netio_subscription_table* table, netio_tag_t tag, struct netio_subscription_cache* cache, unsigned* start)
452 : {
453 81567839 : if(cache->ts != table->ts) {
454 42 : log_trc("subscription updating cache");
455 42 : cache->count = lookup_tag(table, tag, &cache->idx_start);
456 42 : cache->ts = table->ts;
457 : }
458 :
459 : #if defined DEBUG || defined DEBUG_UBUF
460 : for(unsigned i=0; i<table->num_subscriptions; i++) {
461 : printf("%u:%lu ", i, table->subscriptions[i].tag);
462 : }
463 : printf("\n");
464 : #endif
465 :
466 81567839 : *start = cache->idx_start;
467 81567839 : log_trc("cache count=%d for tag=%lu", cache->count, tag);
468 81567839 : return cache->count;
469 : }
470 :
471 : /**
472 : * @brief Publishes a message on an unbuffered publish socket.
473 : *
474 : * @see netio_unbuffered_publishv_usr
475 : *
476 : * Same as netio_unbuffered_publishv_usr, but without the user header data field.
477 : */
478 : int
479 2501584 : netio_unbuffered_publishv(struct netio_unbuffered_publish_socket* socket,
480 : netio_tag_t tag,
481 : struct iovec* iov,
482 : size_t count,
483 : uint64_t* key,
484 : int flags,
485 : struct netio_subscription_cache* cache)
486 : {
487 2501584 : return netio_unbuffered_publishv_usr(socket, tag, iov, count, key, flags, cache, 0, 0);
488 : }
489 :
490 : /**
491 : * @brief Publishes a message on an unbuffered publish socket.
492 : *
493 : * The message is given as a scatter/gather buffer (`struct iovec`). The caller
494 : * has to ensure the validity of the buffer until the transfer is complete.
495 : * A transfer is complete when the socket's `msg_published` callback has been
496 : * called. A key can be passed to the call to identify the publication.
497 : * The key will be passed in the `msg_published` callback.
498 : *
499 : * The `msg_published` callback will only be called if the message has been
500 : * sent successfully to all subscribed endpoints.
501 : *
502 : * The call may return `NETIO_STATUS_AGAIN` if one of the sockets connections
503 : * yields `NETIO_STATUS_AGAIN`. In this case it is the user's responsibility
504 : * to call `netio_unbuffered_publishv` again with the `NETIO_REENTRY` flag.
505 : *
506 : * @param socket: The socket to publish on
507 : * @param tag: The tag under which to publish
508 : * @param iov: Message data iov
509 : * @param count: IOV count
510 : * @param key: Key that will be passed to the callback on successful publish of the
511 : * message. This is an input-output parameter. In case the function
512 : * returns NETIO_STATUS_PARTIAL, 'key' is used as storage to track the
513 : * completion data for the given tag. If netio_unbuffered_publishv is
514 : * called again with the NETIO_REENTRY flag, 'key' must remain
515 : * unchanged. In other words, for a given tag, 'key' is only set by the
516 : * user before the initial call to netio_unbuffered_publishv without
517 : * the NETIO_REENTRY flag.
518 : * @param flags: NETIO_REENTRY publishing of this message was attempted before and
519 : * resulted in NETIO_STATUS_AGAIN. Calling publish with
520 : * this flag will only send on connections where the
521 : * message was previously unpublished.
522 : * @param cache: Optional user-supplied cache for the subsctiption table lookup.
523 : * @param usr: Up to 8 byte of data that are transmitted as beginning of the message. This
524 : * allows the user to add a short header to a message without having to allocate
525 : * bufferspace for it.
526 : * @param usr_size: Size of the usr header field. Set to 0 if no header is required. The maximum
527 : * header size is 8.
528 : *
529 : * @returns NETIO_STATUS_OK If the message was published successfully to all subscribed endpoints
530 : * @returns NETIO_STATUS_OK_NOSUB No ongoing subscriptions to publish the given message
531 : * @returns NETIO_STATUS_AGAIN If not enough resources are available to proceed with the operation.
532 : * No data were sent to any endpoint. The user should try again with the exact same parameters.
533 : * @returns NETIO_STATUS_PARTIAL The message was sent to some of the subscribed endpoints, but not all. The user should
534 : * try again, and additionally set the NETIO_REENTRY flag. Users must take care not to
535 : * overwrite the key parameter, which is used by the function call to track the operation
536 : * status.
537 : * @returns NETIO_ERROR_MAX_IOV_EXCEEDED
538 : * Too many iovec entries, try with less.
539 : */
540 : int
541 84069425 : netio_unbuffered_publishv_usr(struct netio_unbuffered_publish_socket* socket,
542 : netio_tag_t tag,
543 : struct iovec* iov,
544 : size_t count,
545 : uint64_t* key,
546 : int flags,
547 : struct netio_subscription_cache* cache,
548 : uint64_t usr,
549 : uint8_t usr_size)
550 : {
551 84069425 : int ret = NETIO_STATUS_OK;
552 84069425 : if(count > NETIO_MAX_IOV_LEN-1) {
553 : return NETIO_ERROR_MAX_IOV_EXCEEDED;
554 : }
555 :
556 84069425 : log_trc("unbuffered publishv, key=0x%p, tag=%lu", key, tag);
557 :
558 84069425 : unsigned start_idx;
559 84069425 : unsigned num_subscriptions;
560 :
561 84069425 : if(cache) {
562 81567881 : num_subscriptions = lookup_tag_cached(&socket->subscription_table, tag, cache, &start_idx);
563 : } else {
564 2501586 : num_subscriptions = lookup_tag(&socket->subscription_table, tag, &start_idx);
565 : }
566 :
567 84069425 : log_trc("unbuffered publishv: num_subscriptions for tag %lu: %d, start_idx: %d", tag, num_subscriptions, start_idx);
568 84069425 : if(num_subscriptions == 0) {
569 45322239 : if(socket->cb_msg_published) {
570 45322239 : socket->cb_msg_published(socket, *key);
571 : }
572 45322239 : return NETIO_STATUS_OK_NOSUB;
573 : }
574 :
575 38747186 : int used_completion = 0;
576 38747186 : struct netio_completion_object* completion = NULL;
577 38747186 : if(flags & NETIO_REENTRY) {
578 0 : log_trc("unbuffered publishv: REENTRY - fetching completion from user");
579 0 : completion = (struct netio_completion_object*)(*key);
580 0 : used_completion = 1;
581 : } else {
582 38747186 : log_trc("unbuffered publishv: fetching completion object");
583 38747186 : completion = fetch_completion_object(socket);
584 38747186 : if(completion) {
585 36808994 : netio_semphore_set_threshold(&completion->sem, num_subscriptions);
586 36808994 : completion->key = *key;
587 36808994 : completion->header.tag = tag;
588 36808994 : completion->header.usr = usr;
589 36808994 : completion->usr_size = usr_size;
590 36808994 : socket->completion_stack.key_array[socket->completion_stack.available_objects] = *key;
591 36808994 : *key = (uint64_t)completion;
592 36808994 : used_completion = 0;
593 36808994 : log_trc("fetched completion: %lu for tag 0%lx", completion->key, tag);
594 : } else {
595 : log_trc("unbuffered publishv: no completion available -> AGAIN");
596 : // When no completion is available, we return NETIO_STATUS_AGAIN
597 : // The user is supposed to call the same call again (no need to keep track of completion object)
598 : return NETIO_STATUS_AGAIN;
599 : }
600 : }
601 :
602 :
603 73619018 : for(unsigned i=start_idx; i<start_idx+num_subscriptions; i++) {
604 36810024 : struct netio_subscription* subscription = &socket->subscription_table.subscriptions[i];
605 36810024 : log_trc("subscription %d has tag %lu", i, subscription->tag);
606 36810024 : if(subscription->tag == tag) {
607 : // skip connections that were already successful if we are in reentry mode
608 36810024 : if(flags & NETIO_REENTRY) {
609 0 : if(subscription->again == 0) {
610 0 : continue;
611 : }
612 : }
613 :
614 36810024 : struct netio_send_socket* ssocket = (struct netio_send_socket*)(subscription->socket);
615 36810024 : log_trc("unbuffered publishv: send message on subscribed connection, iov-count=%lu, iov-len[0]=%lu", count, iov[0].iov_len);
616 36810024 : int result;
617 36810024 : struct iovec hdr_iov[NETIO_MAX_IOV_LEN];
618 36810024 : hdr_iov[0].iov_base = &completion->header;
619 36810024 : hdr_iov[0].iov_len = sizeof(netio_tag_t) + completion->usr_size;
620 195350990 : for(unsigned j=0; j<count; j++) {
621 158540966 : hdr_iov[1+j].iov_base = iov[j].iov_base;
622 158540966 : hdr_iov[1+j].iov_len = iov[j].iov_len;
623 : }
624 36810024 : log_trc("iov: [0]: %lu - compl: %lu", *((netio_tag_t*)hdr_iov[0].iov_base), completion->key);
625 36810024 : result = netio_sendv_imm(ssocket, socket->buf_array, hdr_iov, count+1, (uint64_t)completion, 0);
626 36810024 : log_trc("unbuffered publishv: result=%d", result);
627 :
628 36810024 : if(result == NETIO_STATUS_OK) {
629 36810024 : used_completion = 1;
630 36810024 : subscription->again = 0;
631 0 : } else if(result == NETIO_STATUS_AGAIN) {
632 0 : subscription->again = 1;
633 0 : used_completion = 1;
634 0 : ret = NETIO_STATUS_PARTIAL;
635 : } else {
636 0 : return result;
637 : // some error occured and we return immediately
638 : // TODO we should handle the error and unsubscribe the faulty remote
639 : }
640 : }
641 : }
642 :
643 36808994 : if(used_completion == 0) {
644 0 : netio_completion_stack_push(&socket->completion_stack, completion);
645 : }
646 :
647 : return ret;
648 : }
649 :
650 :
651 : static void
652 116 : subscribe_socket_on_connection_established(struct netio_recv_socket* socket)
653 : {
654 116 : struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->lsocket->usr;
655 116 : if(subscribe_socket->cb_connection_established) {
656 95 : subscribe_socket->cb_connection_established(subscribe_socket);
657 : }
658 116 : }
659 :
660 : static void
661 90 : subscribe_socket_on_connection_closed(struct netio_recv_socket* socket)
662 : {
663 90 : struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->lsocket->usr;
664 90 : log_info("subscribe socket: connection closed");
665 90 : if(subscribe_socket->cb_connection_closed) {
666 90 : subscribe_socket->cb_connection_closed(subscribe_socket);
667 : }
668 90 : }
669 :
670 : static void
671 48681059 : subscribe_socket_on_msg_received(struct netio_recv_socket* rsocket, struct netio_buffer* buf, void* data, size_t len)
672 : {
673 48681059 : struct netio_unbuffered_subscribe_socket* socket = (struct netio_unbuffered_subscribe_socket*)rsocket->lsocket->usr;
674 :
675 48681059 : netio_tag_t tag;
676 48681059 : memcpy(&tag, data, sizeof(netio_tag_t));
677 48681059 : data = (char *)data + sizeof(netio_tag_t);
678 48681059 : len -= sizeof(netio_tag_t);
679 :
680 48681059 : log_trc("buffer received of length %lu for tag %lu", len, tag);
681 :
682 48681059 : if(socket->cb_msg_received) {
683 48681059 : socket->cb_msg_received(socket, tag, data, len);
684 : }
685 :
686 48681043 : netio_post_recv(rsocket, buf);
687 48681043 : }
688 :
689 :
690 : static void
691 114 : subsocket_on_connection_established(struct netio_send_socket* socket)
692 : {
693 114 : log_dbg("subsocket connection established");
694 114 : int ret = 0;
695 114 : struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->usr;
696 :
697 114 : subscribe_socket->msg.addrlen = NETIO_MAX_ADDRLEN;
698 114 : if((ret = fi_getname(&subscribe_socket->recv_socket.pep->fid,
699 114 : subscribe_socket->msg.addr,
700 : &subscribe_socket->msg.addrlen)) !=0) {
701 0 : log_fatal("Cannot get information on local interface %d: %s", ret, fi_strerror(-ret));
702 0 : exit(1);
703 : }
704 114 : subscribe_socket->buf.data = &subscribe_socket->msg;
705 114 : subscribe_socket->buf.size = sizeof(struct netio_subscription_message);
706 114 : netio_register_send_buffer(socket, &subscribe_socket->buf, 0);
707 :
708 114 : if(subscribe_socket->total_tags == 0){
709 0 : log_info("Closing send connection again because there is no tag to subscribe to.");
710 0 : netio_disconnect(socket);
711 : }
712 :
713 : // send tags one by one
714 552 : while(subscribe_socket->total_tags > 0){
715 438 : size_t idx = subscribe_socket->total_tags - 1;
716 438 : netio_tag_t tag = subscribe_socket->tags_to_subscribe[idx];
717 438 : log_info("subsocket_on_connection_established, sending subscription for tag 0x%lx pos %lu", tag, idx);
718 438 : ret = send_subscription_message(subscribe_socket, tag, NETIO_SUBSCRIBE);
719 438 : if (ret == NETIO_STATUS_OK){
720 438 : subscribe_socket->total_tags--;
721 : } else {
722 0 : log_warn("subsocket_on_connection_established: send_subscription_message returned %d", ret);
723 0 : break;
724 : }
725 : }
726 114 : }
727 :
728 :
729 : static void
730 82 : subsocket_on_connection_closed(struct netio_send_socket* socket)
731 : {
732 82 : struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->usr;
733 82 : subscribe_socket->state = NONE;
734 82 : }
735 :
736 : static void
737 50 : subsocket_on_error_connection_refused(struct netio_send_socket* socket) {
738 50 : log_dbg("subsocket connection refused");
739 50 : struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->usr;
740 50 : subscribe_socket->state = NONE;
741 50 : handle_listen_socket_shutdown(&subscribe_socket->recv_socket);
742 50 : if(subscribe_socket->cb_error_connection_refused) {
743 50 : subscribe_socket->cb_error_connection_refused(subscribe_socket);
744 : }
745 50 : }
746 :
747 :
748 : static void
749 2236 : subsocket_on_send_completed(struct netio_send_socket* socket, uint64_t key)
750 : {
751 2236 : struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->usr;
752 :
753 : //check for remaining tags from on_connection_established
754 2236 : while(subscribe_socket->total_tags > 0){
755 0 : size_t idx = subscribe_socket->total_tags - 1;
756 0 : netio_tag_t tag = subscribe_socket->tags_to_subscribe[idx];
757 0 : int ret = send_subscription_message(subscribe_socket, tag, NETIO_SUBSCRIBE);
758 0 : if (ret == NETIO_STATUS_OK){
759 0 : subscribe_socket->total_tags--;
760 : } else {
761 0 : log_warn("subsocket_on_send_completed: send_subscription_message returned %d", ret);
762 0 : break;
763 : }
764 : }
765 2236 : }
766 :
767 :
768 : /**
769 : * Initialize an unbuffered subscribe socket
770 : *
771 : * @param socket: An unbuffered subscribe socket
772 : * @param ctx: A netio context
773 : * @param hostname: A local hostname or IP to bind to
774 : * @param remote_host: Hostname or IP of the remote publish socket
775 : * @param remote_port: Port of the remote publish socket
776 : * @param count: Size of the buffer array
777 : */
778 164 : void netio_unbuffered_subscribe_socket_init(struct netio_unbuffered_subscribe_socket* socket,
779 : struct netio_context* ctx,
780 : const char* hostname,
781 : const char* remote_host,
782 : unsigned remote_port,
783 : size_t buffer_size,
784 : size_t count)
785 : {
786 164 : memset(socket, 0, sizeof(*socket));
787 164 : socket->ctx = ctx;
788 164 : socket->state = NONE;
789 164 : struct netio_unbuffered_socket_attr attr = {count, buffer_size};
790 164 : netio_init_listen_socket(&socket->recv_socket, ctx, &attr);
791 164 : socket->recv_socket.usr = socket;
792 164 : socket->recv_socket.cb_msg_received = subscribe_socket_on_msg_received;
793 164 : socket->recv_socket.cb_connection_established = subscribe_socket_on_connection_established;
794 164 : socket->recv_socket.cb_connection_closed = subscribe_socket_on_connection_closed;
795 164 : netio_listen(&socket->recv_socket, hostname, 0);
796 :
797 164 : socket->remote_hostname = strdup(remote_host);
798 164 : socket->remote_port = remote_port;
799 :
800 164 : log_dbg("subscribe socket is listening");
801 :
802 164 : socket->total_tags = 0;
803 164 : }
804 :
805 :
806 :
807 : /**
808 : * Initializes an unbuffered send socket.
809 : *
810 : * @param socket: The socket to initialize
811 : * @param ctx: The NetIO context object in which to initialize the socket
812 : */
813 : void
814 282 : netio_unbuffered_send_socket_init(struct netio_send_socket* socket, struct netio_context* ctx)
815 : {
816 282 : netio_init_send_socket(socket, ctx);
817 282 : socket->cb_internal_connection_closed = on_unbuffered_send_connection_closed;
818 282 : }
819 :
820 :
821 :
822 : /**
823 : * Subscribe an unbuffered subscribe socket to a given tag.
824 : *
825 : * @param socket: An unbuffered subscribe socket
826 : * @param tag: A netio tag
827 : */
828 : int
829 1184 : netio_unbuffered_subscribe(struct netio_unbuffered_subscribe_socket* socket, netio_tag_t tag)
830 : {
831 1184 : if(socket->state == NONE) {
832 164 : log_dbg("Creating and connecting a new socket");
833 164 : netio_init_send_socket(&socket->socket, socket->ctx);
834 164 : socket->socket.usr = socket;
835 164 : socket->socket.cb_connection_established = subsocket_on_connection_established;
836 164 : socket->socket.cb_connection_closed = subsocket_on_connection_closed;
837 164 : socket->socket.cb_send_completed = subsocket_on_send_completed;
838 164 : socket->socket.cb_error_connection_refused = subsocket_on_error_connection_refused;
839 164 : socket->socket.cb_internal_connection_closed = on_unbuffered_send_connection_closed;
840 164 : netio_connect(&socket->socket, socket->remote_hostname, socket->remote_port);
841 164 : socket->state = INITIALIZED;
842 : }
843 :
844 6944 : for(unsigned i=0; i<socket->total_tags; i++) {
845 5760 : if(socket->tags_to_subscribe[i] == tag) {
846 : return 0;
847 : }
848 : }
849 :
850 : //if send socket connected send message
851 : //otherwise on_connection_established will do it
852 1184 : if (socket->socket.state){
853 696 : log_info("Sending subscription message for tag 0x%lx", tag);
854 696 : int ret = send_subscription_message(socket, tag, NETIO_SUBSCRIBE);
855 696 : return ret;
856 : } else {
857 488 : log_info("Queing subscription message for txg 0x%lx", tag);
858 488 : socket->tags_to_subscribe[socket->total_tags] = tag;
859 488 : socket->total_tags++;
860 488 : return 0;
861 : }
862 :
863 : return 0;
864 : }
865 :
866 :
867 : static int
868 0 : remove_tag_to_subscribe(struct netio_unbuffered_subscribe_socket* socket, netio_tag_t tag)
869 : {
870 0 : int found = 0;
871 0 : for(unsigned int i=0; i<socket->total_tags; ++i){
872 0 : if(socket->tags_to_subscribe[i] == tag){
873 0 : log_info("Removing tag 0x%lx from tags_to_subscribe of socket %p", socket);
874 0 : for(unsigned int j = i; j < socket->total_tags-1; ++j){
875 0 : socket->tags_to_subscribe[j] = socket->tags_to_subscribe[j+1];
876 : }
877 0 : found = 1;
878 0 : socket->total_tags--;
879 0 : break;
880 : }
881 : }
882 0 : if(found == 0){
883 0 : log_warn("tag 0x%lx expected in tags_to_subscribe of socket %p not found", tag);
884 : }
885 0 : return NETIO_STATUS_OK;
886 : }
887 :
888 :
889 : /**
890 : * Unsubscribe from a given message tag.
891 : *
892 : * For a given subscribe socket, `netio_unsubscribe` can be called multiple times.
893 : *
894 : * @param socket: The unbuffered subscribe socket.
895 : * @param tag: The tag to unsubscribe from.
896 : */
897 : int
898 1109 : netio_unbuffered_unsubscribe(struct netio_unbuffered_subscribe_socket* socket, netio_tag_t tag)
899 : {
900 1109 : int ret = NETIO_STATUS_OK;
901 1109 : if(socket->state == INITIALIZED) {
902 1109 : log_dbg("Subscribe socket initialised, can proceed with usubscription");
903 1109 : if (socket->socket.state) {
904 1109 : ret = send_subscription_message(socket, tag, NETIO_UNSUBSCRIBE);
905 1109 : log_dbg("Unsubscription message for tag 0x%lx sent, return code %d", tag, ret);
906 : } else {
907 0 : ret = remove_tag_to_subscribe(socket, tag);
908 : }
909 : } else {
910 1109 : log_dbg("The connection has been already closed.");
911 : }
912 1109 : return ret;
913 : }
|