Line data Source code
1 : #include <stdio.h>
2 : #include <string.h>
3 : #include <assert.h>
4 : #include <netdb.h>
5 : #include "log.h"
6 : #include "netio/netio.h"
7 :
8 : #if defined DEBUG || defined DEBUG_PUB
9 : #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
10 : #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
11 : #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
12 : #else
13 : #define log_dbg(...)
14 : #define log_trc(...)
15 : #endif
16 :
17 :
18 : char*
19 686 : netio_domain_name_lookup(const char* domain_name)
20 : {
21 686 : if(!domain_name) return NULL;
22 686 : struct sockaddr_in sock_address;
23 686 : char* ip_address = (char*)malloc(sizeof(char) * 17);
24 686 : int is_ip_address = inet_pton(AF_INET, domain_name, &(sock_address.sin_addr));
25 686 : if(!is_ip_address)
26 : {
27 8 : struct hostent* host = gethostbyname(domain_name);
28 8 : if(host)
29 : {
30 4 : strcpy(ip_address, inet_ntoa(*(struct in_addr *)host->h_addr));
31 : }
32 : else
33 : {
34 4 : char* _domain_name = malloc(sizeof(char) * (strlen(domain_name) + 1));
35 4 : strcpy(_domain_name, domain_name);
36 4 : log_error("The host cannot be resolved. Domain name set to %s", _domain_name);
37 4 : free(ip_address);
38 4 : return _domain_name;
39 : }
40 : }
41 : else
42 : {
43 678 : strcpy(ip_address, domain_name);
44 : }
45 :
46 : return ip_address;
47 : }
48 :
49 : static int
50 10144 : cmp_subscription(const void* a, const void *b)
51 : {
52 10144 : struct netio_subscription* suba = (struct netio_subscription*)a;
53 10144 : struct netio_subscription* subb = (struct netio_subscription*)b;
54 :
55 10144 : if(suba->tag == subb->tag) {
56 : return 0;
57 : }
58 1 : return suba->tag > subb->tag ? 1 : -1;
59 : }
60 :
61 : static int
62 44 : table_add_subscription(struct netio_subscription_table* table, netio_tag_t tag, struct netio_buffered_send_socket* socket)
63 : {
64 44 : if(table->num_subscriptions == table->size) {
65 0 : log_error("Maximum number of subscriptions. New subscription for %lx dropped.", tag);
66 0 : return 1;
67 : }
68 :
69 : // TODO need to keep the list sorted
70 44 : table->subscriptions[table->num_subscriptions].tag = tag;
71 44 : table->subscriptions[table->num_subscriptions].socket = socket;
72 44 : table->subscriptions[table->num_subscriptions].again = 0;
73 :
74 : log_dbg("Adding connection in subscription table, tag=%lu, socket=%p",
75 : table->subscriptions[table->num_subscriptions].tag,
76 : table->subscriptions[table->num_subscriptions].socket
77 44 : );
78 :
79 44 : table->num_subscriptions++;
80 44 : table->ts++;
81 44 : log_info("New entry in buffered subscription table, tag=%lu in hex 0x%lx, socket=%p, total #subscription=%lu", tag, tag, socket, table->num_subscriptions);
82 44 : qsort(table->subscriptions, table->num_subscriptions, sizeof(struct netio_subscription), cmp_subscription);
83 44 : return 0;
84 : }
85 :
86 :
87 : /**
88 : * @brief Handle an unsubscription or a client disconnection.
89 : *
90 : * @param netio_subscription_table: the table of active subscriptions
91 : * @param socket: the unbuffered send socket used to send the data to the subscriber
92 : * @param tag: the tag for which an unsubscribe request has been received
93 : * @param closed_connection: a flag to enable the removal of all the subscriptions associated to
94 : * the send socket in response to a closed connection.
95 : */
96 : static void
97 40 : table_remove_subscription(struct netio_subscription_table* table, struct netio_buffered_send_socket* socket, netio_tag_t tag, uint8_t closed_connection)
98 : {
99 40 : log_dbg("Total subscriptions: %lu", table->num_subscriptions);
100 40 : unsigned i=0;
101 40 : unsigned remaining_subscriptions_of_socket=0;
102 83 : for(unsigned int k = 0; k < table->num_subscriptions; ++k) {
103 43 : if(table->subscriptions[k].socket == socket){remaining_subscriptions_of_socket++;}
104 : }
105 :
106 :
107 83 : while(i<table->num_subscriptions) {
108 43 : if(table->subscriptions[i].socket == socket && (table->subscriptions[i].tag == tag || closed_connection)) {
109 : log_dbg("Removing entry in subscription table, tag=%lu, socket=%p index %lu becomes %u",
110 : table->subscriptions[i].tag,
111 : table->subscriptions[i].socket,
112 : table->num_subscriptions-1,
113 40 : i);
114 40 : table->subscriptions[i].tag = table->subscriptions[table->num_subscriptions-1].tag;
115 40 : table->subscriptions[i].socket = table->subscriptions[table->num_subscriptions-1].socket;
116 40 : table->subscriptions[i].again = table->subscriptions[table->num_subscriptions-1].again;
117 40 : table->num_subscriptions--;
118 40 : remaining_subscriptions_of_socket--;
119 40 : table->ts++;
120 : }
121 : else{
122 3 : i++;
123 83 : log_dbg("Skipping tag=%lu because the socket is %p", table->subscriptions[i].tag, table->subscriptions[i].socket);
124 : }
125 : }
126 40 : qsort(table->subscriptions, table->num_subscriptions, sizeof(struct netio_subscription), cmp_subscription);
127 40 : log_dbg("Remaining subscriptions: %lu",table->num_subscriptions);
128 40 : log_info("Removing entry from buffered subscription table, Fid=0x%lx, socket=%p, total #subscription=%lu", tag, socket, table->num_subscriptions);
129 40 : if(closed_connection==0 && remaining_subscriptions_of_socket==0){
130 33 : log_info("Disconnecting endpoint with zero subscriptions");
131 33 : netio_disconnect(&socket->send_socket);
132 : }
133 40 : }
134 :
135 :
136 : static int
137 39 : table_get_subs_by_socket(struct netio_subscription_table* table, struct netio_buffered_send_socket* socket)
138 : {
139 39 : log_dbg("Subscription table lookup: Total subscriptions: %lu", table->num_subscriptions);
140 39 : unsigned remaining_subscriptions_of_socket=0;
141 45 : for(unsigned int k = 0; k < table->num_subscriptions; ++k) {
142 6 : if(table->subscriptions[k].socket == socket){remaining_subscriptions_of_socket++;}
143 : }
144 39 : log_dbg("Subscription table lookup: for socket %p: %lu", socket, remaining_subscriptions_of_socket);
145 39 : return remaining_subscriptions_of_socket;
146 : }
147 :
148 : static void
149 214519 : on_buffer_available(void* ptr)
150 : {
151 214519 : log_trc("a buffer became available, calling callback");
152 214519 : struct netio_publish_socket* socket = (struct netio_publish_socket*)ptr;
153 214519 : if(socket->cb_buffer_available) {
154 214519 : socket->cb_buffer_available(socket);
155 : }
156 214519 : }
157 :
158 :
159 : static void
160 43 : pubsocket_on_connection_established(struct netio_buffered_send_socket* socket)
161 : {
162 43 : log_dbg("publish socket established connection to remote, can publish now");
163 43 : struct netio_publish_socket* pubsocket = (struct netio_publish_socket*)socket->pub_socket;
164 : //add deferred subscriptions to the table
165 43 : struct deferred_subscription* sub = socket->send_socket.deferred_subs;
166 87 : while(sub){
167 44 : int ret = table_add_subscription(&pubsocket->subscription_table, sub->tag, socket);
168 44 : if(pubsocket->cb_subscribe && ret == 0){
169 44 : pubsocket->cb_subscribe(pubsocket, sub->tag, NULL, 0);
170 : }
171 44 : pop_subscription(&sub);
172 : }
173 : //user callback
174 43 : if(pubsocket->cb_connection_established) {
175 8 : pubsocket->cb_connection_established(pubsocket);
176 : }
177 43 : }
178 :
179 :
180 : static void
181 39 : pubsocket_on_connection_closed(struct netio_buffered_send_socket* socket)
182 : {
183 39 : log_dbg("published socket with buffered send socket %p and send socket %p: connection to remote was closed", socket, &socket->send_socket);
184 39 : struct netio_publish_socket* pubsocket = (struct netio_publish_socket*)socket->pub_socket;
185 39 : if(pubsocket->cb_connection_closed) {
186 6 : pubsocket->cb_connection_closed(pubsocket);
187 : }
188 : //Only if the connection was closed without unsubscribing first.
189 78 : if (table_get_subs_by_socket(&pubsocket->subscription_table, socket)){
190 6 : log_dbg("Removing a send socket for which a connection was closed without unsubscribing first");
191 6 : uint8_t connection_closed = 1;
192 6 : table_remove_subscription(&pubsocket->subscription_table, socket, 0, connection_closed);
193 : }
194 39 : }
195 :
196 :
197 : static struct netio_buffered_send_socket*
198 44 : socket_list_add_or_lookup(struct netio_publish_socket* pubsocket, struct netio_socket_list** list, void* addr, size_t addrlen, struct netio_context* ctx, struct netio_buffered_socket_attr* attr)
199 : {
200 44 : if(addrlen == 0) {
201 : return NULL;
202 : }
203 :
204 44 : struct netio_socket_list* entry = find_socket_by_address(*list, addr, addrlen);
205 :
206 44 : if ( entry == NULL ) {
207 43 : entry = add_socket_with_address(list, BSEND, addr, addrlen);
208 43 : struct netio_buffered_send_socket* bufsocket = entry->socket;
209 43 : netio_buffered_send_socket_init(bufsocket, ctx, attr);
210 43 : bufsocket->pub_socket = pubsocket;
211 43 : bufsocket->cb_connection_established = pubsocket_on_connection_established;
212 43 : bufsocket->cb_connection_closed = pubsocket_on_connection_closed;
213 :
214 43 : netio_buffered_connect_rawaddr(bufsocket, entry->addr, entry->addrlen);
215 43 : bufsocket->signal_buffer_available.data = pubsocket;
216 43 : bufsocket->signal_buffer_available.cb = on_buffer_available;
217 : }
218 44 : struct netio_buffered_send_socket* ss = (struct netio_buffered_send_socket*)entry->socket;
219 44 : return ss;
220 : }
221 :
222 :
223 : static void
224 44 : subscribe(struct netio_publish_socket* pubsocket, struct netio_recv_socket* recv_socket, void* addr, size_t addrlen, netio_tag_t tag)
225 : {
226 44 : struct netio_buffered_send_socket* socket = socket_list_add_or_lookup(pubsocket, &pubsocket->subscription_table.socket_list, addr, addrlen, pubsocket->ctx, &pubsocket->attr);
227 :
228 44 : if(socket->send_socket.recv_socket == NULL){
229 43 : socket->send_socket.recv_socket = recv_socket;
230 : }
231 :
232 44 : if (socket->send_socket.state == CONNECTED){
233 0 : table_add_subscription(&pubsocket->subscription_table, tag, socket);
234 0 : if(pubsocket->cb_subscribe) {
235 0 : pubsocket->cb_subscribe(pubsocket, tag, addr, addrlen);
236 : }
237 : } else {
238 44 : push_back_subscription(&socket->send_socket.deferred_subs, tag);
239 : }
240 44 : }
241 :
242 :
243 : static void
244 34 : unsubscribe(struct netio_publish_socket* pubsocket, void* addr, size_t addrlen, netio_tag_t tag)
245 : {
246 34 : struct netio_socket_list* list = find_socket_by_address(pubsocket->subscription_table.socket_list, addr, addrlen);
247 34 : if(list == NULL){return;}
248 :
249 34 : struct netio_buffered_send_socket* socket = (struct netio_buffered_send_socket*)list->socket;
250 34 : uint8_t connection_closed = 0;
251 34 : table_remove_subscription(&pubsocket->subscription_table, socket, tag, connection_closed);
252 34 : pubsocket->subscription_table.ts++;
253 :
254 34 : if(pubsocket->cb_unsubscribe) {
255 2 : pubsocket->cb_unsubscribe(pubsocket, tag, addr, addrlen);
256 : }
257 :
258 : }
259 :
260 :
261 : static void
262 43 : lsocket_on_connection_established(struct netio_recv_socket* socket)
263 : {
264 43 : log_dbg("Buffered listen socket: on connection establsihed");
265 43 : }
266 :
267 :
268 : static void
269 78 : parse_subscription_message(struct netio_publish_socket* socket, struct netio_subscription_message* msg, struct netio_recv_socket* recv_socket)
270 : {
271 78 : if (msg->action){
272 44 : log_dbg("subscription request for tag %lu, addrlen=%lu", msg->tag, msg->addrlen);
273 44 : subscribe(socket, recv_socket, msg->addr, msg->addrlen, msg->tag);
274 : }
275 : else{
276 34 : log_dbg("unsubscription request for tag %lu, addrlen=%lu", msg->tag, msg->addrlen);
277 34 : unsubscribe(socket, msg->addr, msg->addrlen, msg->tag);
278 : }
279 78 : }
280 :
281 :
282 : static void
283 78 : lsocket_on_msg_received(struct netio_recv_socket* socket, struct netio_buffer* buf, void* data, size_t len)
284 : {
285 78 : log_dbg("message received by recv socket %p", socket);
286 78 : struct netio_publish_socket* pubsocket = (struct netio_publish_socket*)socket->lsocket->usr;
287 78 : if(len != sizeof(struct netio_subscription_message)) {
288 0 : log_error("Illegal subscription message size %lu. Did you confuse tohost and from host links", len);
289 0 : netio_post_recv(socket, buf);
290 0 : return;
291 : }
292 78 : parse_subscription_message(pubsocket, (struct netio_subscription_message*)buf->data, socket);
293 78 : netio_post_recv(socket, buf);
294 : }
295 :
296 :
297 : static int
298 2534 : send_subscription_message(struct netio_subscribe_socket* socket, netio_tag_t tag, int action)
299 : {
300 2534 : if( action == NETIO_SUBSCRIBE ){
301 1296 : log_info("Sending subscription for tag 0x%lx", tag);
302 1238 : } else if ( action == NETIO_UNSUBSCRIBE ){
303 1238 : log_info("Sending unsubscription for tag 0x%lx", tag);
304 : } else {
305 0 : log_error("Invalid subscription action %d", action);
306 0 : return 0;
307 : }
308 2534 : socket->msg.tag = tag;
309 2534 : socket->msg.action = action;
310 2534 : socket->buf.data = &socket->msg;
311 2534 : socket->buf.size = sizeof(struct netio_subscription_message);
312 2534 : int ret = netio_send_inline_buffer(&socket->socket, &socket->buf);
313 2534 : return ret;
314 : }
315 :
316 :
317 : static void
318 227 : subsocket_on_connection_established(struct netio_send_socket* socket)
319 : {
320 227 : log_dbg("subsocket connection established");
321 227 : int ret = 0;
322 227 : struct netio_subscribe_socket* subscribe_socket = (struct netio_subscribe_socket*)socket->usr;
323 :
324 227 : if(subscribe_socket->total_tags == 0){
325 0 : log_info("Closing send connection again because there is no tag to subscribe to.");
326 0 : netio_disconnect(socket);
327 : }
328 :
329 227 : subscribe_socket->msg.addrlen = NETIO_MAX_ADDRLEN;
330 227 : if((ret = fi_getname(&subscribe_socket->recv_socket.listen_socket.pep->fid,
331 227 : subscribe_socket->msg.addr,
332 : &subscribe_socket->msg.addrlen)) != 0) {
333 0 : log_fatal("Cannot get information on local interface %d: %s", ret, fi_strerror(-ret));
334 0 : exit(1);
335 : }
336 :
337 227 : subscribe_socket->buf.data = &subscribe_socket->msg;
338 227 : subscribe_socket->buf.size = sizeof(struct netio_subscription_message);
339 227 : netio_register_send_buffer(socket, &subscribe_socket->buf, 0);
340 :
341 : // send tags one by one
342 809 : while(subscribe_socket->total_tags > 0){
343 582 : size_t idx = subscribe_socket->total_tags - 1;
344 582 : netio_tag_t tag = subscribe_socket->tags_to_subscribe[idx];
345 582 : log_info("subsocket_on_connection_established, sending subscription for tag 0x%lx pos %lu", tag, idx);
346 582 : ret = send_subscription_message(subscribe_socket, tag, NETIO_SUBSCRIBE);
347 582 : if (ret == NETIO_STATUS_OK){
348 582 : subscribe_socket->total_tags--;
349 : } else {
350 0 : log_warn("subsocket_on_connection_established: send_subscription_message returned %d", ret);
351 0 : break;
352 : }
353 : }
354 227 : }
355 :
356 : static void
357 136 : subsocket_on_send_connection_closed(struct netio_send_socket* socket)
358 : {
359 136 : log_dbg("subsocket_on_send_connection_closed callback");
360 136 : struct netio_subscribe_socket* subscribe_socket = (struct netio_subscribe_socket*)socket->usr;
361 136 : subscribe_socket->state = NONE;
362 136 : handle_send_socket_shutdown(socket);
363 136 : }
364 :
365 :
366 : static void
367 59 : subsocket_on_error_connection_refused(struct netio_send_socket* ssocket) {
368 59 : log_dbg("subsocket connection refused");
369 59 : struct netio_subscribe_socket* subscribe_socket = (struct netio_subscribe_socket*)ssocket->usr;
370 59 : subscribe_socket->state = NONE;
371 :
372 59 : handle_listen_socket_shutdown(&subscribe_socket->recv_socket.listen_socket);
373 59 : if(subscribe_socket->cb_error_connection_refused) {
374 59 : subscribe_socket->cb_error_connection_refused(subscribe_socket);
375 : }
376 59 : }
377 :
378 :
379 : static void
380 2504 : subsocket_on_send_completed(struct netio_send_socket* socket, uint64_t key)
381 : {
382 2504 : struct netio_subscribe_socket* subscribe_socket = (struct netio_subscribe_socket*)socket->usr;
383 : //check for remaining tags from on_connection_established
384 2504 : while(subscribe_socket->total_tags > 0){
385 0 : size_t idx = subscribe_socket->total_tags - 1;
386 0 : netio_tag_t tag = subscribe_socket->tags_to_subscribe[idx];
387 0 : int ret = send_subscription_message(subscribe_socket, tag, NETIO_SUBSCRIBE);
388 0 : if (ret == NETIO_STATUS_OK){
389 0 : subscribe_socket->total_tags--;
390 : } else {
391 0 : log_warn("subsocket_on_send_completed: send_subscription_message returned %d", ret);
392 0 : break;
393 : }
394 : }
395 2504 : }
396 :
397 :
398 : static void
399 229 : subscribe_socket_on_connection_established(struct netio_buffered_recv_socket* rsocket)
400 : {
401 229 : log_dbg("connection to subscribe socket has been established");
402 229 : struct netio_subscribe_socket* socket = (struct netio_subscribe_socket*)rsocket->lsocket->listen_socket.usr;
403 229 : if(socket->cb_connection_established) {
404 195 : socket->cb_connection_established(socket);
405 : }
406 229 : }
407 :
408 :
409 : static void
410 168 : subscribe_socket_on_connection_closed(struct netio_buffered_recv_socket* rsocket)
411 : {
412 168 : log_info("connection to subscribe socket has been closed");
413 168 : struct netio_subscribe_socket* socket = (struct netio_subscribe_socket*)rsocket->lsocket->listen_socket.usr;
414 168 : if(socket->cb_connection_closed) {
415 168 : socket->cb_connection_closed(socket);
416 : }
417 168 : }
418 :
419 :
420 : static void
421 3406697491 : subscribe_socket_on_msg_received(struct netio_buffered_recv_socket* rsocket, void* data, size_t len)
422 : {
423 3406697491 : log_trc("buffer received");
424 3406697491 : struct netio_subscribe_socket* socket = (struct netio_subscribe_socket*)rsocket->lsocket->listen_socket.usr;
425 :
426 3406697491 : if(len <= sizeof(netio_tag_t)) {
427 0 : log_error("Received invalid message of size %lu. Need %lu bytes for tag, at least 1 of payload", len, sizeof(netio_tag_t));
428 0 : return;
429 : }
430 :
431 3406697491 : netio_tag_t tag = *((netio_tag_t*)data);
432 3406697491 : if(socket->cb_msg_received) {
433 3406697491 : socket->cb_msg_received(socket, tag, (char *)data + sizeof(netio_tag_t), len - sizeof(netio_tag_t));
434 : }
435 : }
436 :
437 :
438 : void
439 149 : netio_subscription_table_init(struct netio_subscription_table* table)
440 : {
441 149 : table->socket_list = NULL;
442 149 : table->subscriptions = (struct netio_subscription*)malloc(NETIO_INITIAL_SUBSCRIPTIONS*sizeof(struct netio_subscription));
443 149 : table->num_subscriptions = 0;
444 149 : table->size = NETIO_INITIAL_SUBSCRIPTIONS;
445 149 : table->ts = 0;
446 149 : }
447 :
448 :
449 : /**
450 : * Initializes a buffered publish socket.
451 : *
452 : * @param socket: The buffered publish socket to initialize
453 : * @param ctx: The NetIO context in which to initialize the socket
454 : * @param hostname: Hostname or IP address to bind to
455 : * @param port: Port to bind to
456 : * @param attr: Buffered connection settings to be used for the underlying connections
457 : *
458 : * @see `netio_buffered_send_socket_init` for a description of the connection parameters
459 : */
460 : void
461 114 : netio_publish_socket_init(struct netio_publish_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffered_socket_attr* attr)
462 : {
463 114 : memset(socket, 0, sizeof(*socket));
464 114 : socket->ctx = ctx;
465 114 : netio_subscription_table_init(&socket->subscription_table);
466 114 : netio_init_listen_socket(&socket->lsocket, ctx, NULL);
467 114 : if (attr->num_pages > NETIO_DOMAIN_MAX_MR){
468 0 : log_error("Number of requested publisher netio pages %lu exceeds max value. Using %lu.", attr->num_pages, NETIO_DOMAIN_MAX_MR);
469 0 : attr->num_pages = NETIO_DOMAIN_MAX_MR;
470 : }
471 114 : memcpy(&socket->attr, attr, sizeof(struct netio_buffered_socket_attr));
472 114 : socket->lsocket.usr = socket;
473 114 : socket->lsocket.cb_connection_established = lsocket_on_connection_established;
474 114 : socket->lsocket.cb_msg_received = lsocket_on_msg_received;
475 114 : socket->lsocket.recv_sub_msg = 1;
476 114 : char* _hostname = netio_domain_name_lookup(hostname);
477 114 : netio_listen(&socket->lsocket, (const char*)_hostname, port);
478 114 : free(_hostname);
479 114 : }
480 :
481 :
482 : /**
483 : * Initializes a buffered subscribe socket.
484 : *
485 : * @param socket: The buffered subscribe socket to initialize
486 : * @param ctx: The NetIO context in which to initialize the socket
487 : * @param attr: Buffered connection settings to be used for the underlying connections
488 : * @param hostname: Hostname or IP address of the local interface to bind to
489 : * @param remote_host: Hostname or IP of the remote publish socket
490 : * @param remote_port: Port of the remote publish socket
491 : *
492 : * @see `netio_buffered_send_socket_init` for a description of the connection parameters
493 : */
494 : void
495 286 : netio_subscribe_socket_init(struct netio_subscribe_socket* socket,
496 : struct netio_context* ctx,
497 : struct netio_buffered_socket_attr* attr,
498 : const char* hostname,
499 : const char* remote_host,
500 : unsigned remote_port)
501 : {
502 286 : memset(socket, 0, sizeof(*socket));
503 286 : socket->ctx = ctx;
504 286 : socket->state = NONE;
505 286 : memcpy(&socket->attr, attr, sizeof(struct netio_buffered_socket_attr));
506 286 : netio_buffered_listen_socket_init(&socket->recv_socket, ctx, &socket->attr);
507 286 : socket->recv_socket.listen_socket.usr = socket;
508 286 : socket->recv_socket.cb_connection_established = subscribe_socket_on_connection_established;
509 286 : socket->recv_socket.cb_connection_closed = subscribe_socket_on_connection_closed;
510 286 : socket->recv_socket.cb_msg_received = subscribe_socket_on_msg_received;
511 : //set cb_buf_received meant only for pileup measurement to NULL
512 286 : socket->cb_buf_received = NULL;
513 286 : char* lookedup_hostname = netio_domain_name_lookup(hostname);
514 286 : char* lookedup_remote_hostname = netio_domain_name_lookup(remote_host);
515 286 : netio_buffered_listen(&socket->recv_socket, (const char*)lookedup_hostname, 0);
516 :
517 286 : socket->remote_hostname = strdup((const char*)lookedup_remote_hostname);
518 286 : socket->remote_port = remote_port;
519 286 : socket->total_tags = 0;
520 286 : free(lookedup_remote_hostname);
521 286 : free(lookedup_hostname);
522 286 : }
523 :
524 :
525 : /**
526 : * Initialize a netio_subscription_cache object.
527 : *
528 : * @param cache: The cache to be initialized
529 : */
530 : void
531 1081 : netio_subscription_cache_init(struct netio_subscription_cache* cache)
532 : {
533 1081 : cache->ts = 0;
534 1081 : cache->count = 0;
535 1081 : cache->idx_start = 0;
536 1081 : }
537 :
538 :
539 : static unsigned
540 10166 : lookup_tag(struct netio_subscription_table* table, netio_tag_t tag, unsigned* start)
541 : {
542 10166 : struct netio_subscription key;
543 10166 : key.tag = tag;
544 20332 : struct netio_subscription* ptr = (struct netio_subscription*)bsearch(&key,
545 10166 : table->subscriptions,
546 : table->num_subscriptions,
547 : sizeof(struct netio_subscription),
548 : cmp_subscription);
549 10166 : if(ptr == NULL) {
550 : return 0;
551 : }
552 10141 : unsigned start_idx = ptr - table->subscriptions;
553 10143 : while(start_idx > 0 && table->subscriptions[start_idx-1].tag == tag) {
554 : start_idx--;
555 : }
556 : unsigned count = 0;
557 20284 : for(unsigned int i=start_idx; i<table->num_subscriptions; i++) {
558 10143 : if(table->subscriptions[i].tag == tag) {
559 10143 : count++;
560 : } else {
561 : break;
562 : }
563 : }
564 10141 : *start = start_idx;
565 10141 : return count;
566 : }
567 :
568 : static unsigned
569 838293491 : lookup_tag_cached(struct netio_subscription_table* table, netio_tag_t tag, struct netio_subscription_cache* cache, unsigned* start)
570 : {
571 838293491 : if(cache->ts != table->ts) {
572 62 : cache->count = lookup_tag(table, tag, &cache->idx_start);
573 62 : cache->ts = table->ts;
574 : }
575 838293491 : *start = cache->idx_start;
576 838293491 : return cache->count;
577 : }
578 :
579 : /**
580 : * Publishes a message under a given tag
581 : *
582 : * @param socket: The socket to publish on
583 : * @param tag: The tag under which to publish
584 : * @param data: Message data
585 : * @param len: Message size
586 : * @param flags: NETIO_REENTRY publishing of this message was attempted before and
587 : * resulted in NETIO_STATUS_AGAIN. Calling publish with
588 : * this flag will only send on connections where the
589 : * message was previously unpublished.
590 : * @param cache: Optional user-supplied cache for the subscription table lookup.
591 : */
592 : int
593 838303595 : netio_buffered_publish(struct netio_publish_socket* socket, netio_tag_t tag, void* data, size_t len, int flags, struct netio_subscription_cache* cache)
594 : {
595 838303595 : int ret = NETIO_STATUS_OK;
596 :
597 838303595 : log_trc("netio_buffered_publish (size=%lu)", len);
598 :
599 838303595 : struct iovec iov[2];
600 838303595 : iov[0].iov_base = &tag;
601 838303595 : iov[0].iov_len = sizeof(netio_tag_t);
602 838303595 : iov[1].iov_base = data;
603 838303595 : iov[1].iov_len = len;
604 :
605 838303595 : unsigned start_idx;
606 838303595 : unsigned num_subscriptions;
607 :
608 838303595 : if(cache) {
609 838293553 : num_subscriptions = lookup_tag_cached(&socket->subscription_table, tag, cache, &start_idx);
610 : } else {
611 10104 : num_subscriptions = lookup_tag(&socket->subscription_table, tag, &start_idx);
612 : }
613 838303595 : if(num_subscriptions == 0) return NETIO_STATUS_OK_NOSUB;
614 :
615 165641420 : for(unsigned i=start_idx; i<start_idx+num_subscriptions; i++) {
616 87573513 : struct netio_subscription* subscription = &socket->subscription_table.subscriptions[i];
617 87573513 : if(subscription->tag == tag) {
618 :
619 : // skip connections that were already successful if we are in reentry mode
620 87573513 : if(flags & NETIO_REENTRY) {
621 431703 : if(subscription->again == 0) {
622 3042 : continue;
623 : }
624 : }
625 :
626 87570471 : int result = netio_buffered_sendv(subscription->socket, iov, 2);
627 :
628 87570471 : if(result == NETIO_STATUS_OK) {
629 87141790 : subscription->again = 0;
630 428681 : } else if(result == NETIO_STATUS_AGAIN) {
631 428681 : subscription->again = 1;
632 428681 : ret = NETIO_STATUS_AGAIN;
633 : }
634 0 : else if(result == NETIO_STATUS_TOO_BIG) {
635 0 : subscription->again = 0;
636 0 : ret = NETIO_STATUS_TOO_BIG;
637 : }
638 : else {
639 0 : return result; // some error occured and we return immediately
640 : }
641 : }
642 : }
643 : return ret;
644 : }
645 :
646 :
647 : /**
648 : * Flushes buffers on all connections of a given publish socket for a certain tag.
649 : *
650 : * @param socket The buffered publish socket
651 : * @param tag The message tag
652 : * @param cache An optional subscription cache object
653 : */
654 : void
655 6639790 : netio_buffered_publish_flush(struct netio_publish_socket* socket, netio_tag_t tag, struct netio_subscription_cache* cache)
656 : {
657 7440435 : for(unsigned i=0; i<socket->subscription_table.num_subscriptions; i++) {
658 800645 : struct netio_subscription* subscription = &socket->subscription_table.subscriptions[i];
659 800645 : netio_buffered_flush(subscription->socket);
660 : }
661 6639790 : }
662 :
663 :
664 : /**
665 : * Subscribe to a given message tag.
666 : *
667 : * For a given subscribe socket, `netio_subscribe` can be called multiple times.
668 : *
669 : * @param socket: The buffered subscribe socket.
670 : * @param tag: The subscription tag.
671 : */
672 : int
673 1355 : netio_subscribe(struct netio_subscribe_socket* socket, netio_tag_t tag)
674 : {
675 1355 : if(socket->state == NONE) {
676 286 : log_dbg("Creating and connecting a new send_socket");
677 286 : netio_init_send_socket(&socket->socket, socket->ctx);
678 286 : socket->socket.usr = socket;
679 286 : socket->socket.cb_connection_established = subsocket_on_connection_established;
680 286 : socket->socket.cb_connection_closed = subsocket_on_send_connection_closed;
681 286 : socket->socket.cb_error_connection_refused = subsocket_on_error_connection_refused;
682 286 : socket->socket.cb_send_completed = subsocket_on_send_completed;
683 286 : netio_connect(&socket->socket, socket->remote_hostname, socket->remote_port);
684 286 : socket->state = INITIALIZED;
685 : }
686 :
687 7290 : for(unsigned i=0; i<socket->total_tags; i++) {
688 5935 : if(socket->tags_to_subscribe[i] == tag) {
689 : return 0;
690 : }
691 : }
692 :
693 : //if send socket connected send message
694 : //otherwise on_connection_established will do it
695 1355 : if (socket->socket.state){
696 714 : log_info("Sending subscription message for tag 0x%lx", tag);
697 714 : int ret = send_subscription_message(socket, tag, NETIO_SUBSCRIBE);
698 714 : return ret;
699 : } else {
700 641 : log_info("Queing subscription message for txg 0x%lx", tag);
701 641 : socket->tags_to_subscribe[socket->total_tags] = tag;
702 641 : socket->total_tags++;
703 641 : return 0;
704 : }
705 : }
706 :
707 :
708 : static int
709 0 : remove_tag_to_subscribe(struct netio_subscribe_socket* socket, netio_tag_t tag)
710 : {
711 0 : int found = 0;
712 0 : for(unsigned int i=0; i<socket->total_tags; ++i){
713 0 : if(socket->tags_to_subscribe[i] == tag){
714 0 : log_info("Removing tag 0x%lx from tags_to_subscribe of socket %p", socket);
715 0 : for(unsigned int j = i; j < socket->total_tags-1; ++j){
716 0 : socket->tags_to_subscribe[j] = socket->tags_to_subscribe[j+1];
717 : }
718 0 : found = 1;
719 0 : socket->total_tags--;
720 0 : break;
721 : }
722 : }
723 0 : if(found == 0){
724 0 : log_warn("tag 0x%lx expected in tags_to_subscribe of socket %p not found", tag);
725 : }
726 0 : return NETIO_STATUS_OK;
727 : }
728 :
729 :
730 : /**
731 : * Unsubscribe from a given message tag.
732 : *
733 : * For a given subscribe socket, `netio_unsubscribe` can be called multiple times.
734 : *
735 : * @param socket: The subscribe socket.
736 : * @param tag: The tag to unsubscribe from.
737 : */
738 : int
739 1238 : netio_unsubscribe(struct netio_subscribe_socket* socket, netio_tag_t tag)
740 : {
741 1238 : int ret = NETIO_STATUS_OK;
742 1238 : if(socket->state == INITIALIZED) {
743 1238 : log_dbg("Subscribe socket initialised, can proceed with usubscription");
744 1238 : if (socket->socket.state) {
745 1238 : ret = send_subscription_message(socket, tag, NETIO_UNSUBSCRIBE);
746 1238 : log_dbg("Unsubscription message for tag 0x%lx sent, return code %d", tag, ret);
747 : } else {
748 0 : ret = remove_tag_to_subscribe(socket, tag);
749 : }
750 : } else {
751 1238 : log_dbg("The connection has been already closed.");
752 : }
753 1238 : return ret;
754 : }
|