.. _program_listing_file_unbufpubsub.c: Program Listing for File unbufpubsub.c ====================================== |exhale_lsh| :ref:`Return to documentation for file ` (``unbufpubsub.c``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #include #include #include #include "log.h" #include "netio/netio.h" #include "netio/netio_tcp.h" #include #include #define PUBLISH_SOCKET_MAX_COMPLETIONS (512) #if defined DEBUG || defined DEBUG_UPUB #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__) #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__) #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__) #else #define log_dbg(...) #define log_trc(...) #endif static unsigned lookup_tag(struct netio_subscription_table* table, netio_tag_t tag, unsigned* start); static int cmp_subscription(const void* a, const void *b) { struct netio_subscription* suba = (struct netio_subscription*)a; struct netio_subscription* subb = (struct netio_subscription*)b; if(suba->tag == subb->tag) { return 0; } return suba->tag > subb->tag ? 1 : -1; } static int table_add_subscription(struct netio_subscription_table* table, netio_tag_t tag, struct netio_send_socket* socket) { if(table->num_subscriptions == table->size) { log_error("Maximum number of subscriptions. New subscription for 0x%lx dropped.", tag); return 1; } //Check if the subscription is already in the list unsigned start; unsigned count = lookup_tag(table, tag, &start); for(unsigned i=0; isubscriptions[start+i].socket == socket) { return 0; } } table->subscriptions[table->num_subscriptions].tag = tag; table->subscriptions[table->num_subscriptions].socket = socket; table->subscriptions[table->num_subscriptions].again = 0; table->num_subscriptions++; table->ts++; log_info("New entry in subscription table, tag=%lu in hex 0x%lx, socket=%p, total #subscription=%lu", tag, tag, socket, table->num_subscriptions); qsort(table->subscriptions, table->num_subscriptions, sizeof(struct netio_subscription), cmp_subscription); return 0; } static void table_remove_subscription(struct netio_unbuffered_publish_socket* pubsocket, struct netio_send_socket* socket, netio_tag_t tag, uint8_t closed_connection) { struct netio_subscription_table* table = &pubsocket->subscription_table; log_dbg("Total subscriptions: %lu", table->num_subscriptions); unsigned i=0; unsigned remaining_subscriptions_of_socket=0; for(unsigned int k = 0; k < table->num_subscriptions; ++k) { if(table->subscriptions[k].socket == socket){remaining_subscriptions_of_socket++;} } while(inum_subscriptions) { log_dbg("Removing subscription tag %lx table socket %p socket %p", table->subscriptions[i].tag, table->subscriptions[i].socket, socket); if(table->subscriptions[i].socket == socket && (table->subscriptions[i].tag == tag || closed_connection)) { //increment the completion semaphore counter as if messages were sent to the disconnected client struct netio_completion_stack* cs = &pubsocket->completion_stack; for(size_t j=0; j < cs->num_objects; ++j) { if(cs->objects[j].header.tag == table->subscriptions[i].tag && cs->objects[j].key != 0xFFFFFFFFFFFFFFFF){ netio_semaphore_increment(&cs->objects[j].sem, 1); } } log_dbg("Available completion objects %lu / %lu", cs->available_objects, cs->num_objects); log_dbg("Removing connection in subscription table, tag=%lu, socket=%p index %lu becomes %u", table->subscriptions[i].tag, table->subscriptions[i].socket, table->num_subscriptions-1, i); table->subscriptions[i].tag = table->subscriptions[table->num_subscriptions-1].tag; table->subscriptions[i].socket = table->subscriptions[table->num_subscriptions-1].socket; table->subscriptions[i].again = table->subscriptions[table->num_subscriptions-1].again; table->num_subscriptions--; remaining_subscriptions_of_socket--; table->ts++; } else{ i++; log_dbg("Skipping tag=%lu because the socket is %p", table->subscriptions[i].tag, table->subscriptions[i].socket); } } log_dbg("At the end of table_remove_subscription available completion objects %lu / %lu", pubsocket->completion_stack.available_objects, pubsocket->completion_stack.num_objects); qsort(table->subscriptions, table->num_subscriptions, sizeof(struct netio_subscription), cmp_subscription); log_dbg("Remaining subscriptions: %lu",table->num_subscriptions); log_info("Removing entry from unbuffered subscription table, Fid=0x%lx, socket=%p, total #subscription=%lu", tag, socket, table->num_subscriptions); if(remaining_subscriptions_of_socket==0){ log_info("No subscriptions remaining"); if (socket->tcp_fi_mode == NETIO_MODE_TCP){ // netio_close_socket(&socket->ctx->evloop,socket,USEND); // TODO prevents re-subscription } else if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC && closed_connection==0){ netio_disconnect(socket); } } } static int table_get_subs_by_socket(struct netio_subscription_table* table, struct netio_send_socket* socket) { log_dbg("Subscription table lookup: Total subscriptions: %lu", table->num_subscriptions); unsigned remaining_subscriptions_of_socket=0; for(unsigned int k = 0; k < table->num_subscriptions; ++k) { if(table->subscriptions[k].socket == socket){remaining_subscriptions_of_socket++;} } log_dbg("Subscription table lookup: for socket %p: %lu", socket, remaining_subscriptions_of_socket); return remaining_subscriptions_of_socket; } static void release_completion_object(struct netio_completion_object* completion) { log_trc("releasing completion object"); if(netio_completion_stack_push(&completion->socket->completion_stack, completion)) { log_error("Could not push completion object since stack was full. This should not happen."); } } static void on_completion_trigger(void* c) { struct netio_completion_object* completion = (struct netio_completion_object*)c; log_trc("calling cb_msg_published"); if(completion->socket->cb_msg_published) { completion->socket->cb_msg_published(completion->socket, completion->key); } release_completion_object(completion); } void print_completion_objects(struct netio_unbuffered_publish_socket* socket){ printf("Number of available completion objects: %zu \n", socket->completion_stack.available_objects); printf("===============================================\n"); printf("CO# \t KEY \t \t TAG \n"); printf("-----------------------------------------------\n"); for(unsigned int i=0; i < socket->completion_stack.num_objects; ++i){ uint32_t tag = (socket->completion_stack.key_array[i] >> 32); printf("%u \t 0x%lx \t %x \n", i, socket->completion_stack.key_array[i], tag); } printf("===============================================\n"); printf("Subscription table: there are %zu elinks subscribed\n", socket->subscription_table.num_subscriptions); printf("FID \t \t \t SOCKET\n"); printf("-----------------------------------------------\n"); for (unsigned int i=0; i < socket->subscription_table.num_subscriptions; ++i){ printf("0x%lx \t %p \n", socket->subscription_table.subscriptions[i].tag, socket->subscription_table.subscriptions[i].socket); } socket->completion_stack.printed = 1; } static struct netio_completion_object* fetch_completion_object(struct netio_unbuffered_publish_socket* socket) { struct netio_completion_object* completion; if(netio_completion_stack_pop(&socket->completion_stack, &completion)) { #if defined DEBUG || defined DEBUG_UPUB if (socket->completion_stack.printed == 0){ print_completion_objects(socket); } #endif return NULL; } netio_semaphore_init(&completion->sem, 0); completion->sem.data = completion; completion->sem.cb = on_completion_trigger; completion->socket = socket; return completion; } static void increment_completion_object(struct netio_unbuffered_publish_socket* pubsocket, uint64_t key) { log_trc("incrementing completion object"); struct netio_completion_object *completion = (struct netio_completion_object*)key; netio_semaphore_increment(&completion->sem, 1); log_trc("current: %d expected: %d", completion->sem.current, completion->sem.threshold); } static void pubsocket_on_connection_established(struct netio_send_socket* socket) { log_dbg("publish socket established connection to remote, can publish now"); struct netio_unbuffered_publish_socket* pubsocket = (struct netio_unbuffered_publish_socket*)socket->unbuf_pub_socket; //add deferred subscriptions to the table struct deferred_subscription* sub = socket->deferred_subs; while(sub){ int ret = table_add_subscription(&pubsocket->subscription_table, sub->tag, socket); if(pubsocket->cb_subscribe && ret == 0){ pubsocket->cb_subscribe(pubsocket, sub->tag, NULL, 0); } pop_subscription(&sub); } //user callback if(pubsocket->cb_connection_established) { pubsocket->cb_connection_established(pubsocket); } } static void pubsocket_on_connection_closed(struct netio_send_socket* socket) { log_info("published socket: connection to remote was closed"); struct netio_unbuffered_publish_socket* pubsocket = (struct netio_unbuffered_publish_socket*)socket->unbuf_pub_socket; if(pubsocket->cb_connection_closed) { pubsocket->cb_connection_closed(pubsocket); } if (table_get_subs_by_socket(&pubsocket->subscription_table, socket)){ uint8_t closed_connection = 1; table_remove_subscription(pubsocket, socket, 0, closed_connection); } handle_send_socket_shutdown(socket); remove_socket(&(pubsocket->subscription_table.socket_list), socket); } static void on_unbuffered_send_connection_closed(struct netio_send_socket* socket) { log_info("Send socket: connection to remote was closed"); if(socket->unbuf_pub_socket != NULL) { pubsocket_on_connection_closed(socket); } else{ handle_send_socket_shutdown(socket); } } static void pubsocket_on_send_completed(struct netio_send_socket* socket, uint64_t key) { struct netio_unbuffered_publish_socket* pubsocket = (struct netio_unbuffered_publish_socket*)socket->unbuf_pub_socket; increment_completion_object(pubsocket, key); log_trc("completion on pubsocket connection, key=%lu", key); } static struct netio_send_socket* socket_list_add_or_lookup(struct netio_unbuffered_publish_socket* pubsocket, struct netio_socket_list** list, void* addr, size_t addrlen, int port, struct netio_context* ctx) { if(addrlen == 0) { log_error("Invalid zero-byte address"); return NULL; } struct netio_socket_list* entry = find_socket_by_address(*list, addr, addrlen, port); if ( entry == NULL ) { entry = add_socket_with_address(list, USEND, addr, addrlen, port); struct netio_send_socket* socket = (struct netio_send_socket*)entry->socket; if (pubsocket->tcp_fi_mode==NETIO_MODE_LIBFABRIC) { struct netio_domain* domain = NULL; //domain is shared among sockets, if (entry->next != NULL && entry->next->socket != NULL) { domain = ((struct netio_send_socket*)(entry->next->socket))->domain; } netio_init_send_socket(socket, ctx); //here we memset to zero and we loose domain socket->unbuf_pub_socket = pubsocket; socket->cb_connection_established = pubsocket_on_connection_established; socket->cb_connection_closed = pubsocket_on_connection_closed; socket->cb_send_completed = pubsocket_on_send_completed; netio_connect_rawaddr_domain(socket, entry->addr, entry->addrlen, domain); if ( domain == NULL ){ //check on local domain variable netio_register_send_buffer(socket, &pubsocket->buf, 0); netio_completion_stack_register_send_socket(&pubsocket->completion_stack, socket); } } else if (pubsocket->tcp_fi_mode==NETIO_MODE_TCP){ netio_init_send_tcp_socket(socket, ctx); socket->unbuf_pub_socket = pubsocket; socket->cb_connection_established = pubsocket_on_connection_established; socket->cb_connection_closed = pubsocket_on_connection_closed; socket->cb_send_completed = pubsocket_on_send_completed; log_dbg("Calling netio_connect_tcp for addr=<%s> port=%d", (char*)entry->addr, port); netio_connect_tcp(socket, entry->addr, port); } else { log_error("socket_list_add_or_lookup called for unkown connection mode %d", pubsocket->tcp_fi_mode); remove_socket(list, socket); return NULL; } } struct netio_send_socket* ss = (struct netio_send_socket*)entry->socket; return ss; } static void subscribe(struct netio_unbuffered_publish_socket* pubsocket, struct netio_recv_socket* recv_socket, void* addr, size_t addrlen, int port, netio_tag_t tag) { struct netio_send_socket* socket = socket_list_add_or_lookup(pubsocket, &pubsocket->subscription_table.socket_list, addr, addrlen, port, pubsocket->ctx); if(socket->recv_socket == NULL){ socket->recv_socket =recv_socket; } if (socket->state == CONNECTED){ table_add_subscription(&pubsocket->subscription_table, tag, socket); if(pubsocket->cb_subscribe) { pubsocket->cb_subscribe(pubsocket, tag, addr, addrlen); } } else { push_back_subscription(&socket->deferred_subs, tag); } } static void unsubscribe(struct netio_unbuffered_publish_socket* pubsocket, void* addr, size_t addrlen, int port, netio_tag_t tag) { struct netio_socket_list* list = find_socket_by_address(pubsocket->subscription_table.socket_list, addr, addrlen, port); if(list == NULL){return;} struct netio_send_socket* socket = (struct netio_send_socket*)list->socket; uint8_t connection_closed = 0; table_remove_subscription(pubsocket, socket, tag, connection_closed); pubsocket->subscription_table.ts++; if(pubsocket->cb_unsubscribe) { pubsocket->cb_unsubscribe(pubsocket, tag, addr, addrlen); } } static void lsocket_on_connection_established(struct netio_recv_socket* socket) { log_dbg("Buffered listen socket: on connection established"); if(socket->tcp_fi_mode == NETIO_MODE_TCP){ struct netio_buffer* buf = malloc(sizeof(struct netio_buffer)); buf->size = sizeof(struct netio_subscription_message); buf->data = malloc(buf->size); netio_post_recv(socket, buf); socket->usr = buf; } } static void parse_subscription_message(struct netio_unbuffered_publish_socket* socket, struct netio_subscription_message* msg, struct netio_recv_socket* recv_socket) { if (msg->action){ log_dbg("subscription request for tag %lu, addrlen=%lu", msg->tag, msg->addrlen); subscribe(socket, recv_socket, msg->addr, msg->addrlen, msg->port, msg->tag); } else{ log_dbg("unsubscription request for tag %lu, addrlen=%lu", msg->tag, msg->addrlen); unsubscribe(socket, msg->addr, msg->addrlen, msg->port, msg->tag); } } static void lsocket_on_msg_received(struct netio_recv_socket* socket, struct netio_buffer* buf, void* data, size_t len) { log_trc("message received"); struct netio_unbuffered_publish_socket* pubsocket = (struct netio_unbuffered_publish_socket*)socket->lsocket->usr; if(len != sizeof(struct netio_subscription_message)) { log_error("Illegal subscription message size %lu", len); netio_post_recv(socket, buf); return; } parse_subscription_message(pubsocket, (struct netio_subscription_message*)buf->data, socket); netio_post_recv(socket, buf); } static int send_subscription_message(struct netio_unbuffered_subscribe_socket* socket, netio_tag_t tag, int action) { if( action == NETIO_SUBSCRIBE ){ log_info("Sending subscription for tag 0x%lx", tag); } else if ( action == NETIO_UNSUBSCRIBE ){ log_info("Sending unsubscription for tag 0x%lx", tag); } else { log_error("Invalid subscription action %d", action); return 0; } int ret = 0; if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){ socket->msg.tag = tag; socket->msg.action = action; socket->buf.data = &socket->msg; socket->buf.size = sizeof(struct netio_subscription_message); ret = netio_send_inline_buffer(&socket->socket, &socket->buf); } else if (socket->tcp_fi_mode == NETIO_MODE_TCP){ // look for available buf size_t id = 0; while (socket->bufs[id].to_send) { id++; } log_dbg("tag id %d", id); socket->msgs[id] = socket->msgs[0]; // initialize msg to default socket->msgs[id].tag = tag; socket->msgs[id].action = action; socket->bufs[id].data = &socket->msgs[id]; socket->bufs[id].size = sizeof(struct netio_subscription_message); socket->bufs[id].to_send = 1; ret = netio_send_buffer(&socket->socket, &socket->bufs[id]); } else { log_error("Socket connection type unsupported %d", socket->tcp_fi_mode); ret = 1; } return ret; } void netio_unbuffered_publish_libfabric_socket_init(struct netio_unbuffered_publish_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffer* buf) { log_dbg("netio_unbuffered_publish_libfabric_socket_init"); socket->ctx = ctx; socket->tcp_fi_mode = NETIO_MODE_LIBFABRIC; memcpy(&socket->buf, buf, sizeof(struct netio_buffer)); netio_subscription_table_init(&socket->subscription_table); netio_completion_stack_init(&socket->completion_stack, PUBLISH_SOCKET_MAX_COMPLETIONS); netio_init_listen_socket(&socket->lsocket, ctx, NULL); socket->lsocket.usr = socket; socket->lsocket.cb_connection_established = lsocket_on_connection_established; socket->lsocket.cb_msg_received = lsocket_on_msg_received; socket->lsocket.recv_sub_msg = 1; netio_listen(&socket->lsocket, hostname, port); socket->buf_array[0] = &socket->completion_stack.buf; for(unsigned i=1; ibuf_array[i] = &socket->buf; } } /* tcp versio of above */ void netio_unbuffered_publish_tcp_socket_init(struct netio_unbuffered_publish_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffer* buf) { log_dbg("INIT: netio_unbuffered_publish_tcp_socket_init"); socket->ctx = ctx; socket->tcp_fi_mode = NETIO_MODE_TCP; memcpy(&socket->buf, buf, sizeof(struct netio_buffer)); netio_subscription_table_init(&socket->subscription_table); netio_completion_stack_init(&socket->completion_stack, PUBLISH_SOCKET_MAX_COMPLETIONS); netio_init_listen_tcp_socket(&socket->lsocket, ctx, NULL); socket->lsocket.usr = socket; socket->lsocket.cb_connection_established = lsocket_on_connection_established; socket->lsocket.cb_msg_received = lsocket_on_msg_received; socket->lsocket.recv_sub_msg = 1; netio_listen_tcp(&socket->lsocket, hostname, port); socket->buf_array[0] = &socket->completion_stack.buf; for(unsigned i=1; ibuf_array[i] = &socket->buf; } } void netio_unbuffered_publish_socket_init(struct netio_unbuffered_publish_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffer* buf) { if (netio_tcp_mode(hostname)) { netio_unbuffered_publish_tcp_socket_init(socket, ctx, netio_hostname(hostname), port, buf); } else { netio_unbuffered_publish_libfabric_socket_init(socket, ctx, netio_hostname(hostname), port, buf); } } static unsigned lookup_tag(struct netio_subscription_table* table, netio_tag_t tag, unsigned* start) { struct netio_subscription key; key.tag = tag; struct netio_subscription* ptr = (struct netio_subscription*)bsearch(&key, table->subscriptions, table->num_subscriptions, sizeof(struct netio_subscription), cmp_subscription); log_trc("found ptr=0x%p", ptr); if(ptr == NULL) { return 0; } unsigned start_idx = ptr - table->subscriptions; while(start_idx > 0 && table->subscriptions[start_idx-1].tag == tag) { start_idx--; } log_trc("start_idx=%d", start_idx); unsigned count = 0; for(unsigned i=start_idx; inum_subscriptions; i++) { if(table->subscriptions[i].tag == tag) { count++; } else { break; } } *start = start_idx; return count; } static unsigned lookup_tag_cached(struct netio_subscription_table* table, netio_tag_t tag, struct netio_subscription_cache* cache, unsigned* start) { if(cache->ts != table->ts) { log_trc("subscription updating cache"); cache->count = lookup_tag(table, tag, &cache->idx_start); cache->ts = table->ts; } #if defined DEBUG || defined DEBUG_UBUF printf("Subscription table:"); for(unsigned i=0; inum_subscriptions; i++) { printf("%u:%lu ", i, table->subscriptions[i].tag); } printf("\n"); #endif *start = cache->idx_start; log_trc("cache count=%d for tag=%lu", cache->count, tag); return cache->count; } int netio_unbuffered_publishv(struct netio_unbuffered_publish_socket* socket, netio_tag_t tag, struct iovec* iov, size_t count, uint64_t* key, int flags, struct netio_subscription_cache* cache) { return netio_unbuffered_publishv_usr(socket, tag, iov, count, key, flags, cache, 0, 0); } int netio_unbuffered_publishv_usr(struct netio_unbuffered_publish_socket* socket, netio_tag_t tag, struct iovec* iov, size_t count, uint64_t* key, int flags, struct netio_subscription_cache* cache, uint64_t usr, uint8_t usr_size) { int ret = NETIO_STATUS_OK; if(count > NETIO_MAX_IOV_LEN-1) { return NETIO_ERROR_MAX_IOV_EXCEEDED; } log_trc("unbuffered publishv, key=0x%p, tag=%lu", key, tag); unsigned start_idx; unsigned num_subscriptions; if(cache) { num_subscriptions = lookup_tag_cached(&socket->subscription_table, tag, cache, &start_idx); } else { num_subscriptions = lookup_tag(&socket->subscription_table, tag, &start_idx); } log_trc("unbuffered publishv: num_subscriptions for tag %lu: %d, start_idx: %d", tag, num_subscriptions, start_idx); if(num_subscriptions == 0) { if(socket->cb_msg_published) { socket->cb_msg_published(socket, *key); } return NETIO_STATUS_OK_NOSUB; } int used_completion = 0; struct netio_completion_object* completion = NULL; if(flags & NETIO_REENTRY) { log_trc("unbuffered publishv: REENTRY - fetching completion from user"); completion = (struct netio_completion_object*)(*key); used_completion = 1; } else { log_trc("unbuffered publishv: fetching completion object"); completion = fetch_completion_object(socket); if(completion) { netio_semphore_set_threshold(&completion->sem, num_subscriptions); completion->key = *key; completion->header.tag = tag; completion->header.usr = usr; completion->usr_size = usr_size; socket->completion_stack.key_array[socket->completion_stack.available_objects] = *key; *key = (uint64_t)completion; used_completion = 0; log_trc("fetched completion: %lu for tag 0%lx", completion->key, tag); } else { log_trc("unbuffered publishv: no completion available -> AGAIN"); // When no completion is available, we return NETIO_STATUS_AGAIN // The user is supposed to call the same call again (no need to keep track of completion object) return NETIO_STATUS_AGAIN; } } for(unsigned i=start_idx; isubscription_table.subscriptions[i]; log_trc("subscription %d has tag %lu", i, subscription->tag); if(subscription->tag == tag) { // skip connections that were already successful if we are in reentry mode if(flags & NETIO_REENTRY) { if(subscription->again == 0) { continue; } } struct netio_send_socket* ssocket = (struct netio_send_socket*)(subscription->socket); log_trc("unbuffered publishv: send message on subscribed connection, iov-count=%lu, iov-len[0]=%lu", count, iov[0].iov_len); int result; struct iovec hdr_iov[NETIO_MAX_IOV_LEN]; hdr_iov[0].iov_base = &completion->header; hdr_iov[0].iov_len = sizeof(netio_tag_t) + completion->usr_size; uint32_t total_size=hdr_iov[0].iov_len; for(unsigned j=0; jkey); if (socket->tcp_fi_mode==NETIO_MODE_LIBFABRIC) { result = netio_sendv_imm(ssocket, socket->buf_array, hdr_iov, count+1, (uint64_t)completion, 0); } else { result = netio_tcp_sendv_imm(ssocket, total_size, hdr_iov, count+1, (uint64_t)completion, 0); } log_trc("unbuffered publishv: result=%d", result); if(result == NETIO_STATUS_OK) { used_completion = 1; subscription->again = 0; } else if(result == NETIO_STATUS_AGAIN) { subscription->again = 1; used_completion = 1; ret = NETIO_STATUS_PARTIAL; } else { return result; // some error occured and we return immediately // TODO we should handle the error and unsubscribe the faulty remote } } } if(used_completion == 0) { netio_completion_stack_push(&socket->completion_stack, completion); } return ret; } static void subscribe_socket_on_connection_established(struct netio_recv_socket* socket) { struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->lsocket->usr; if(subscribe_socket->cb_connection_established) { subscribe_socket->cb_connection_established(subscribe_socket); } } static void subscribe_socket_on_connection_closed(struct netio_recv_socket* socket) { struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->lsocket->usr; log_info("subscribe socket: connection closed"); if(subscribe_socket->cb_connection_closed) { subscribe_socket->cb_connection_closed(subscribe_socket); } } static void subscribe_socket_on_msg_received(struct netio_recv_socket* rsocket, struct netio_buffer* buf, void* data, size_t len) { struct netio_unbuffered_subscribe_socket* socket = (struct netio_unbuffered_subscribe_socket*)rsocket->lsocket->usr; netio_tag_t tag; memcpy(&tag, data, sizeof(netio_tag_t)); data = (char *)data + sizeof(netio_tag_t); len -= sizeof(netio_tag_t); log_trc("buffer received of length %lu for tag %lu", len, tag); if(socket->cb_msg_received) { socket->cb_msg_received(socket, tag, data, len); } netio_post_recv(rsocket, buf); } static void subsocket_on_connection_established(struct netio_send_socket* socket) { log_dbg("subsocket connection established"); int ret; struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->usr; if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){ subscribe_socket->msg.addrlen = NETIO_MAX_ADDRLEN; if((ret = fi_getname(&subscribe_socket->recv_socket.pep->fid, subscribe_socket->msg.addr, &subscribe_socket->msg.addrlen)) !=0) { log_fatal("Cannot get information on local interface %d: %s", ret, fi_strerror(-ret)); exit(1); } subscribe_socket->buf.data = &subscribe_socket->msg; subscribe_socket->buf.size = sizeof(struct netio_subscription_message); netio_register_send_buffer(socket, &subscribe_socket->buf, 0); } else if (socket->tcp_fi_mode == NETIO_MODE_TCP) { // initialize msgs[0] with defaults struct sockaddr sock_addr; socklen_t addrlen=sizeof(sock_addr); getsockname(socket->eq_ev_ctx.fd, &sock_addr, &addrlen); getnameinfo(&sock_addr, addrlen, subscribe_socket->msgs[0].addr,NETIO_MAX_ADDRLEN, NULL, 0, NI_NUMERICHOST); addrlen=strlen(subscribe_socket->msgs[0].addr); subscribe_socket->msgs[0].addr[addrlen] = 0; subscribe_socket->msgs[0].addrlen = addrlen+1; subscribe_socket->msgs[0].port = subscribe_socket->msg.port; } if(subscribe_socket->total_tags == 0){ log_info("Closing send connection again because there is no tag to subscribe to."); netio_disconnect(socket); } if(subscribe_socket->total_tags == 0){ log_info("Closing send connection again because there is no tag to subscribe to."); netio_disconnect(socket); } // send tags one by one while(subscribe_socket->total_tags > 0){ size_t idx = subscribe_socket->total_tags - 1; netio_tag_t tag = subscribe_socket->tags_to_subscribe[idx]; log_info("subsocket_on_connection_established, sending subscription for tag 0x%lx pos %lu", tag, idx); ret = send_subscription_message(subscribe_socket, tag, NETIO_SUBSCRIBE); if (ret == NETIO_STATUS_OK){ subscribe_socket->total_tags--; } else { log_warn("subsocket_on_connection_established: send_subscription_message returned %d", ret); break; } } } static void subsocket_on_connection_closed(struct netio_send_socket* socket) { struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->usr; subscribe_socket->state = NONE; } static void subsocket_on_error_connection_refused(struct netio_send_socket* socket) { log_dbg("subsocket connection refused"); struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->usr; subscribe_socket->state = NONE; handle_listen_socket_shutdown(&subscribe_socket->recv_socket); if(subscribe_socket->cb_error_connection_refused) { subscribe_socket->cb_error_connection_refused(subscribe_socket); } } static void subsocket_on_send_completed(struct netio_send_socket* socket, uint64_t key) { struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->usr; //check for remaining tags from on_connection_established while(subscribe_socket->total_tags > 0){ size_t idx = subscribe_socket->total_tags - 1; netio_tag_t tag = subscribe_socket->tags_to_subscribe[idx]; int ret = send_subscription_message(subscribe_socket, tag, NETIO_SUBSCRIBE); if (ret == NETIO_STATUS_OK){ subscribe_socket->total_tags--; } else { log_warn("subsocket_on_send_completed: send_subscription_message returned %d", ret); break; } } } void netio_unbuffered_subscribe_libfabric_socket_init(struct netio_unbuffered_subscribe_socket* socket, struct netio_context* ctx, const char* hostname, const char* remote_host, unsigned remote_port, size_t buffer_size, size_t count) { memset(socket, 0, sizeof(*socket)); socket->ctx = ctx; socket->state = NONE; socket->tcp_fi_mode = NETIO_MODE_LIBFABRIC; struct netio_unbuffered_socket_attr attr = {count, buffer_size}; netio_init_listen_socket(&socket->recv_socket, ctx, &attr); socket->recv_socket.usr = socket; socket->recv_socket.cb_msg_received = subscribe_socket_on_msg_received; socket->recv_socket.cb_connection_established = subscribe_socket_on_connection_established; socket->recv_socket.cb_connection_closed = subscribe_socket_on_connection_closed; netio_listen(&socket->recv_socket, hostname, 0); socket->remote_hostname = strdup(remote_host); socket->remote_port = remote_port; log_dbg("subscribe socket is listening"); socket->total_tags = 0; } /* As above but with tcp instead of libfabric */ void netio_unbuffered_subscribe_tcp_socket_init(struct netio_unbuffered_subscribe_socket* socket, struct netio_context* ctx, const char* hostname, const char* remote_host, unsigned remote_port, size_t buffer_size, size_t count) { log_dbg("INIT: netio_unbuffered_subscribe_tcp_socket_init"); memset(socket, 0, sizeof(*socket)); socket->ctx = ctx; socket->state = NONE; socket->tcp_fi_mode = NETIO_MODE_TCP; struct netio_unbuffered_socket_attr attr = {count, buffer_size}; netio_init_listen_tcp_socket(&socket->recv_socket, ctx, &attr); socket->recv_socket.usr = socket; socket->recv_socket.cb_msg_received = subscribe_socket_on_msg_received; socket->recv_socket.cb_connection_established = subscribe_socket_on_connection_established; socket->recv_socket.cb_connection_closed = subscribe_socket_on_connection_closed; netio_listen_tcp(&socket->recv_socket, hostname, 0); log_dbg("socket=%p recv_socket.port=%d\n",socket,socket->recv_socket.port); socket->msg.port=socket->recv_socket.port; socket->remote_hostname = strdup(remote_host); socket->remote_port = remote_port; socket->total_tags = 0; } void netio_unbuffered_subscribe_socket_init(struct netio_unbuffered_subscribe_socket* socket, struct netio_context* ctx, const char* hostname, const char* remote_host, unsigned remote_port, //struct netio_buffer* buffers, size_t buffer_size, size_t count) { if (netio_tcp_mode(hostname)) { netio_unbuffered_subscribe_tcp_socket_init(socket, ctx, netio_hostname(hostname), netio_hostname(remote_host), remote_port, buffer_size, count); } else { netio_unbuffered_subscribe_libfabric_socket_init(socket, ctx, netio_hostname(hostname), netio_hostname(remote_host), remote_port, buffer_size, count); } } void netio_unbuffered_send_socket_init(struct netio_send_socket* socket, struct netio_context* ctx) { netio_init_send_socket(socket, ctx); socket->cb_internal_connection_closed = on_unbuffered_send_connection_closed; } int netio_unbuffered_subscribe(struct netio_unbuffered_subscribe_socket* socket, netio_tag_t tag) { if(socket->state == NONE) { log_dbg("Creating and connecting a new socket"); if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC) { netio_init_send_socket(&socket->socket, socket->ctx); socket->socket.usr = socket; socket->socket.cb_connection_established = subsocket_on_connection_established; socket->socket.cb_connection_closed = subsocket_on_connection_closed; socket->socket.cb_send_completed = subsocket_on_send_completed; socket->socket.cb_error_connection_refused = subsocket_on_error_connection_refused; socket->socket.cb_internal_connection_closed = on_unbuffered_send_connection_closed; netio_connect(&socket->socket, socket->remote_hostname, socket->remote_port); } else if (socket->tcp_fi_mode == NETIO_MODE_TCP) { netio_init_send_tcp_socket(&socket->socket, socket->ctx); socket->socket.usr = socket; socket->socket.cb_connection_established = subsocket_on_connection_established; socket->socket.cb_connection_closed = subsocket_on_connection_closed; socket->socket.cb_error_connection_refused = subsocket_on_error_connection_refused; socket->socket.cb_send_completed = subsocket_on_send_completed; socket->socket.cb_internal_connection_closed = on_unbuffered_send_connection_closed; netio_connect_tcp(&socket->socket, socket->remote_hostname, socket->remote_port); } else { log_error("Unsupported connection type %d", socket->tcp_fi_mode); return 1; } socket->state = INITIALIZED; } for(unsigned i=0; itotal_tags; i++) { if(socket->tags_to_subscribe[i] == tag) { return 0; } } //if send socket connected send message //otherwise on_connection_established will do it if (socket->socket.state){ log_info("Sending subscription message for tag 0x%lx", tag); int ret = send_subscription_message(socket, tag, NETIO_SUBSCRIBE); return ret; } else { log_info("Queing subscription message for txg 0x%lx", tag); socket->tags_to_subscribe[socket->total_tags] = tag; socket->total_tags++; return 0; } return 0; } static int remove_tag_to_subscribe(struct netio_unbuffered_subscribe_socket* socket, netio_tag_t tag) { int found = 0; for(unsigned int i=0; itotal_tags; ++i){ if(socket->tags_to_subscribe[i] == tag){ log_info("Removing tag 0x%lx from tags_to_subscribe of socket %p", socket); for(unsigned int j = i; j < socket->total_tags-1; ++j){ socket->tags_to_subscribe[j] = socket->tags_to_subscribe[j+1]; } found = 1; socket->total_tags--; break; } } if(found == 0){ log_warn("tag 0x%lx expected in tags_to_subscribe of socket %p not found", tag); } return NETIO_STATUS_OK; } int netio_unbuffered_unsubscribe(struct netio_unbuffered_subscribe_socket* socket, netio_tag_t tag) { int ret = NETIO_STATUS_OK; if(socket->state == INITIALIZED) { log_dbg("Subscribe socket initialised, can proceed with unsubscription"); if (socket->socket.state) { ret = send_subscription_message(socket, tag, NETIO_UNSUBSCRIBE); log_dbg("Unsubscription message for tag 0x%lx sent, return code %d", tag, ret); } else { ret = remove_tag_to_subscribe(socket, tag); } } else { log_dbg("The connection has been already closed."); } return ret; } unsigned netio_pubsocket_get_available_co(struct netio_unbuffered_publish_socket* socket) { if(socket != NULL){ return socket->completion_stack.available_objects; } else { return 0; } }