.. _program_listing_file_pubsub.c: Program Listing for File pubsub.c ================================= |exhale_lsh| :ref:`Return to documentation for file ` (``pubsub.c``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #include #include #include #include #include "log.h" #include "netio/netio.h" #include "netio/netio_tcp.h" #if defined DEBUG || defined DEBUG_PUB #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__) #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__) #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__) #else #define log_dbg(...) #define log_trc(...) #endif char* netio_domain_name_lookup(const char* domain_name) { if(!domain_name) return NULL; struct sockaddr_in sock_address; char* ip_address = (char*)malloc(sizeof(char) * 17); int is_ip_address = inet_pton(AF_INET, domain_name, &(sock_address.sin_addr)); if(!is_ip_address) { struct hostent* host = gethostbyname(domain_name); if(host) { strcpy(ip_address, inet_ntoa(*(struct in_addr *)host->h_addr)); } else { char* _domain_name = malloc(sizeof(char) * (strlen(domain_name) + 1)); strcpy(_domain_name, domain_name); log_error("The host cannot be resolved. Domain name set to %s", _domain_name); free(ip_address); return _domain_name; } } else { strcpy(ip_address, domain_name); } return ip_address; } static int cmp_subscription(const void* a, const void *b) { struct netio_subscription* suba = (struct netio_subscription*)a; struct netio_subscription* subb = (struct netio_subscription*)b; if(suba->tag == subb->tag) { return 0; } return suba->tag > subb->tag ? 1 : -1; } static int table_add_subscription(struct netio_subscription_table* table, netio_tag_t tag, struct netio_buffered_send_socket* socket) { if(table->num_subscriptions == table->size) { log_error("Maximum number of subscriptions. New subscription for %lx dropped.", tag); return 1; } // TODO need to keep the list sorted table->subscriptions[table->num_subscriptions].tag = tag; table->subscriptions[table->num_subscriptions].socket = socket; table->subscriptions[table->num_subscriptions].again = 0; log_dbg("Adding connection in subscription table, tag=%lu, socket=%p", table->subscriptions[table->num_subscriptions].tag, table->subscriptions[table->num_subscriptions].socket ); table->num_subscriptions++; table->ts++; log_info("New entry in buffered subscription table, tag=%lu in hex 0x%lx, socket=%p, total #subscription=%lu", tag, tag, socket, table->num_subscriptions); qsort(table->subscriptions, table->num_subscriptions, sizeof(struct netio_subscription), cmp_subscription); return 0; } static void table_remove_subscription(struct netio_subscription_table* table, struct netio_buffered_send_socket* socket, netio_tag_t tag, uint8_t closed_connection) { log_dbg("Total subscriptions: %lu", table->num_subscriptions); unsigned i=0; unsigned remaining_subscriptions_of_socket=0; for(unsigned int k = 0; k < table->num_subscriptions; ++k) { if(table->subscriptions[k].socket == socket){remaining_subscriptions_of_socket++;} } while(inum_subscriptions) { if(table->subscriptions[i].socket == socket && (table->subscriptions[i].tag == tag || closed_connection)) { log_dbg("Removing entry in subscription table, tag=%lu, socket=%p index %lu becomes %u", table->subscriptions[i].tag, table->subscriptions[i].socket, table->num_subscriptions-1, i); table->subscriptions[i].tag = table->subscriptions[table->num_subscriptions-1].tag; table->subscriptions[i].socket = table->subscriptions[table->num_subscriptions-1].socket; table->subscriptions[i].again = table->subscriptions[table->num_subscriptions-1].again; table->num_subscriptions--; remaining_subscriptions_of_socket--; table->ts++; } else{ i++; log_dbg("Skipping tag=%lu because the socket is %p", table->subscriptions[i].tag, table->subscriptions[i].socket); } } qsort(table->subscriptions, table->num_subscriptions, sizeof(struct netio_subscription), cmp_subscription); log_dbg("Remaining subscriptions: %lu",table->num_subscriptions); log_info("Removing entry from buffered subscription table, Fid=0x%lx, socket=%p, total #subscription=%lu", tag, socket, table->num_subscriptions); if(closed_connection==0 && remaining_subscriptions_of_socket==0){ log_warn("Disconnecting endpoint with zero subscriptions"); if (socket->send_socket.tcp_fi_mode == NETIO_MODE_TCP){ // netio_disconnect(&socket->send_socket); // TODO prevents re-subscription } else if (socket->send_socket.tcp_fi_mode == NETIO_MODE_LIBFABRIC){ netio_disconnect(&socket->send_socket); } } } static int table_get_subs_by_socket(struct netio_subscription_table* table, struct netio_buffered_send_socket* socket) { log_dbg("Subscription table lookup: Total subscriptions: %lu", table->num_subscriptions); unsigned remaining_subscriptions_of_socket=0; for(unsigned int k = 0; k < table->num_subscriptions; ++k) { if(table->subscriptions[k].socket == socket){remaining_subscriptions_of_socket++;} } log_dbg("Subscription table lookup: for socket %p: %lu", socket, remaining_subscriptions_of_socket); return remaining_subscriptions_of_socket; } static void on_buffer_available(void* ptr) { log_trc("a buffer became available, calling callback"); struct netio_publish_socket* socket = (struct netio_publish_socket*)ptr; if(socket->cb_buffer_available) { socket->cb_buffer_available(socket); } } static void pubsocket_on_connection_established(struct netio_buffered_send_socket* socket) { log_dbg("publish socket established connection to remote, can publish now"); struct netio_publish_socket* pubsocket = (struct netio_publish_socket*)socket->pub_socket; //add deferred subscriptions to the table struct deferred_subscription* sub = socket->send_socket.deferred_subs; while(sub){ int ret = table_add_subscription(&pubsocket->subscription_table, sub->tag, socket); if(pubsocket->cb_subscribe && ret == 0){ pubsocket->cb_subscribe(pubsocket, sub->tag, NULL, 0); } pop_subscription(&sub); } //user callback if(pubsocket->cb_connection_established) { pubsocket->cb_connection_established(pubsocket); } } static void pubsocket_on_connection_closed(struct netio_buffered_send_socket* socket) { log_dbg("published socket with buffered send socket %p and send socket %p: connection to remote was closed", socket, &socket->send_socket); struct netio_publish_socket* pubsocket = (struct netio_publish_socket*)socket->pub_socket; if(pubsocket->cb_connection_closed) { pubsocket->cb_connection_closed(pubsocket); } //Only if the connection was closed without unsubscribing first. if (table_get_subs_by_socket(&pubsocket->subscription_table, socket)){ log_dbg("Removing a send socket for which a connection was closed without unsubscribing first"); uint8_t connection_closed = 1; table_remove_subscription(&pubsocket->subscription_table, socket, 0, connection_closed); } } static struct netio_buffered_send_socket* socket_list_add_or_lookup(struct netio_publish_socket* pubsocket, struct netio_socket_list** list, void* addr, size_t addrlen, int port, struct netio_context* ctx, struct netio_buffered_socket_attr* attr) { if(addrlen == 0) { return NULL; } struct netio_socket_list* entry = find_socket_by_address(*list, addr, addrlen, port); if ( entry == NULL ) { entry = add_socket_with_address(list, BSEND, addr, addrlen, port); struct netio_buffered_send_socket* bufsocket = entry->socket; if (pubsocket->tcp_fi_mode==NETIO_MODE_LIBFABRIC) { netio_buffered_send_socket_init(bufsocket, ctx, attr); bufsocket->pub_socket = pubsocket; bufsocket->cb_connection_established = pubsocket_on_connection_established; bufsocket->cb_connection_closed = pubsocket_on_connection_closed; netio_buffered_connect_rawaddr(bufsocket, entry->addr, entry->addrlen); } else if (pubsocket->tcp_fi_mode==NETIO_MODE_TCP){ netio_buffered_send_tcp_socket_init(bufsocket, ctx, attr); bufsocket->pub_socket = pubsocket; bufsocket->cb_connection_established = pubsocket_on_connection_established; bufsocket->cb_connection_closed = pubsocket_on_connection_closed; log_dbg("Calling netio_connect_tcp for addr=<%s> port=%d", (char*)entry->addr, port); netio_connect_tcp(&bufsocket->send_socket, entry->addr, port); } else { log_error("socket_list_add_or_lookup called for unkown connection mode %d", pubsocket->tcp_fi_mode); remove_socket(list, bufsocket); return NULL; } bufsocket->signal_buffer_available.data = pubsocket; bufsocket->signal_buffer_available.cb = on_buffer_available; } struct netio_buffered_send_socket* ss = (struct netio_buffered_send_socket*)entry->socket; return ss; } static void subscribe(struct netio_publish_socket* pubsocket, struct netio_recv_socket* recv_socket, void* addr, size_t addrlen, int port, netio_tag_t tag) { struct netio_buffered_send_socket* socket = socket_list_add_or_lookup(pubsocket, &pubsocket->subscription_table.socket_list, addr, addrlen, port, pubsocket->ctx, &pubsocket->attr); if(socket->send_socket.recv_socket == NULL){ socket->send_socket.recv_socket = recv_socket; } if (socket->send_socket.state == CONNECTED){ table_add_subscription(&pubsocket->subscription_table, tag, socket); if(pubsocket->cb_subscribe) { pubsocket->cb_subscribe(pubsocket, tag, addr, addrlen); } } else { push_back_subscription(&socket->send_socket.deferred_subs, tag); } } static void unsubscribe(struct netio_publish_socket* pubsocket, void* addr, size_t addrlen, int port, netio_tag_t tag) { struct netio_socket_list* list = find_socket_by_address(pubsocket->subscription_table.socket_list, addr, addrlen, port); if(list == NULL){return;} struct netio_buffered_send_socket* socket = (struct netio_buffered_send_socket*)list->socket; uint8_t connection_closed = 0; table_remove_subscription(&pubsocket->subscription_table, socket, tag, connection_closed); pubsocket->subscription_table.ts++; if(pubsocket->cb_unsubscribe) { pubsocket->cb_unsubscribe(pubsocket, tag, addr, addrlen); } } static void lsocket_on_connection_established(struct netio_recv_socket* socket) { log_dbg("Buffered listen socket: on connection established"); if(socket->tcp_fi_mode == NETIO_MODE_TCP){ //libfabric buffers posted in on_listen_socket_cm_event socket->sub_msg_buffers = malloc(32*sizeof(struct netio_buffer*)); for (int i = 0; i < 32; i++){ socket->sub_msg_buffers[i] = malloc(sizeof(struct netio_buffer)); socket->sub_msg_buffers[i]->size = sizeof(struct netio_subscription_message); socket->sub_msg_buffers[i]->data = malloc(socket->sub_msg_buffers[i]->size); netio_post_recv(socket, socket->sub_msg_buffers[i]); } } } static void parse_subscription_message(struct netio_publish_socket* socket, struct netio_subscription_message* msg, struct netio_recv_socket* recv_socket) { if (msg->action){ log_dbg("subscription request for tag %lu, addrlen=%lu", msg->tag, msg->addrlen); subscribe(socket, recv_socket, msg->addr, msg->addrlen, msg->port, msg->tag); } else{ log_dbg("unsubscription request for tag %lu, addrlen=%lu", msg->tag, msg->addrlen); unsubscribe(socket, msg->addr, msg->addrlen, msg->port, msg->tag); } } static void lsocket_on_msg_received(struct netio_recv_socket* socket, struct netio_buffer* buf, void* data, size_t len) { log_dbg("message received by recv socket %p", socket); struct netio_publish_socket* pubsocket = (struct netio_publish_socket*)socket->lsocket->usr; if(len != sizeof(struct netio_subscription_message)) { log_error("Illegal subscription message size %lu. Did you confuse tohost and from host links", len); netio_post_recv(socket, buf); return; } parse_subscription_message(pubsocket, (struct netio_subscription_message*)buf->data, socket); netio_post_recv(socket, buf); } static int send_subscription_message(struct netio_subscribe_socket* socket, netio_tag_t tag, int action) { if( action == NETIO_SUBSCRIBE ){ log_info("Sending subscription for tag 0x%lx", tag); } else if ( action == NETIO_UNSUBSCRIBE ){ log_info("Sending unsubscription for tag 0x%lx", tag); } else { log_error("Invalid subscription action %d", action); return 0; } int ret = 0; if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){ socket->msg.tag = tag; socket->msg.action = action; socket->buf.data = &socket->msg; socket->buf.size = sizeof(struct netio_subscription_message); ret = netio_send_inline_buffer(&socket->socket, &socket->buf); } else if (socket->tcp_fi_mode == NETIO_MODE_TCP){ // look for available buf size_t id = 0; while (socket->bufs[id].to_send) { id++; } log_info("tag id %d", id); socket->msgs[id] = socket->msgs[0]; // initialize msg to default socket->msgs[id].tag = tag; socket->msgs[id].action = action; socket->bufs[id].data = &socket->msgs[id]; socket->bufs[id].size = sizeof(struct netio_subscription_message); socket->bufs[id].to_send = 1; ret = netio_send_buffer(&socket->socket, &socket->bufs[id]); } else { log_error("Socket connection type unsupported %d", socket->tcp_fi_mode); ret = 1; } log_info("send_subscription_message done"); return ret; } static void subsocket_on_connection_established(struct netio_send_socket* socket) { log_dbg("subsocket connection established"); int ret; struct netio_subscribe_socket* subscribe_socket = (struct netio_subscribe_socket*)socket->usr; if(subscribe_socket->total_tags == 0){ log_info("Closing send connection again because there is no tag to subscribe to."); netio_disconnect(socket); } if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){ subscribe_socket->msg.addrlen = NETIO_MAX_ADDRLEN; if((ret = fi_getname(&subscribe_socket->recv_socket.listen_socket.pep->fid, subscribe_socket->msg.addr, &subscribe_socket->msg.addrlen)) != 0) { log_fatal("Cannot get information on local interface %d: %s", ret, fi_strerror(-ret)); exit(1); } subscribe_socket->buf.data = &subscribe_socket->msg; subscribe_socket->buf.size = sizeof(struct netio_subscription_message); netio_register_send_buffer(socket, &subscribe_socket->buf, 0); } else if (socket->tcp_fi_mode == NETIO_MODE_TCP) { // initialize msgs[0] with defaults struct sockaddr sock_addr; socklen_t addrlen=sizeof(sock_addr); getsockname(socket->eq_ev_ctx.fd, &sock_addr, &addrlen); getnameinfo(&sock_addr, addrlen, subscribe_socket->msgs[0].addr,NETIO_MAX_ADDRLEN, NULL, 0, NI_NUMERICHOST); addrlen=strlen(subscribe_socket->msgs[0].addr); subscribe_socket->msgs[0].addr[addrlen] = 0; subscribe_socket->msgs[0].addrlen = addrlen+1; subscribe_socket->msgs[0].port = subscribe_socket->msg.port; } // send tags one by one while(subscribe_socket->total_tags > 0){ size_t idx = subscribe_socket->total_tags - 1; netio_tag_t tag = subscribe_socket->tags_to_subscribe[idx]; log_info("subsocket_on_connection_established, sending subscription for tag 0x%lx pos %lu", tag, idx); ret = send_subscription_message(subscribe_socket, tag, NETIO_SUBSCRIBE); if (ret == NETIO_STATUS_OK){ subscribe_socket->total_tags--; } else { log_warn("subsocket_on_connection_established: send_subscription_message returned %d", ret); break; } } } static void subsocket_on_send_connection_closed(struct netio_send_socket* socket) { log_dbg("subsocket_on_send_connection_closed callback"); struct netio_subscribe_socket* subscribe_socket = (struct netio_subscribe_socket*)socket->usr; subscribe_socket->state = NONE; if(socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){ handle_send_socket_shutdown(socket); } else if (socket->tcp_fi_mode == NETIO_MODE_TCP){ handle_tcp_send_socket_shutdown(socket); } } static void subsocket_on_error_connection_refused(struct netio_send_socket* ssocket) { log_dbg("subsocket connection refused"); struct netio_subscribe_socket* subscribe_socket = (struct netio_subscribe_socket*)ssocket->usr; subscribe_socket->state = NONE; handle_listen_socket_shutdown(&subscribe_socket->recv_socket.listen_socket); if(subscribe_socket->cb_error_connection_refused) { subscribe_socket->cb_error_connection_refused(subscribe_socket); } } static void subsocket_on_send_completed(struct netio_send_socket* socket, uint64_t key) { struct netio_subscribe_socket* subscribe_socket = (struct netio_subscribe_socket*)socket->usr; //check for remaining tags from on_connection_established while(subscribe_socket->total_tags > 0){ size_t idx = subscribe_socket->total_tags - 1; netio_tag_t tag = subscribe_socket->tags_to_subscribe[idx]; int ret = send_subscription_message(subscribe_socket, tag, NETIO_SUBSCRIBE); if (ret == NETIO_STATUS_OK){ subscribe_socket->total_tags--; } else { log_warn("subsocket_on_send_completed: send_subscription_message returned %d", ret); break; } } } static void subscribe_socket_on_connection_established(struct netio_buffered_recv_socket* rsocket) { log_dbg("connection to subscribe socket has been established"); struct netio_subscribe_socket* socket = (struct netio_subscribe_socket*)rsocket->lsocket->listen_socket.usr; if(socket->cb_connection_established) { socket->cb_connection_established(socket); } } static void subscribe_socket_on_connection_closed(struct netio_buffered_recv_socket* rsocket) { log_info("connection to subscribe socket has been closed"); struct netio_subscribe_socket* socket = (struct netio_subscribe_socket*)rsocket->lsocket->listen_socket.usr; if(socket->cb_connection_closed) { socket->cb_connection_closed(socket); } } static void subscribe_socket_on_msg_received(struct netio_buffered_recv_socket* rsocket, void* data, size_t len) { log_trc("buffer received"); struct netio_subscribe_socket* socket = (struct netio_subscribe_socket*)rsocket->lsocket->listen_socket.usr; if(len <= sizeof(netio_tag_t)) { log_error("Received invalid message of size %lu. Need %lu bytes for tag, at least 1 of payload", len, sizeof(netio_tag_t)); return; } netio_tag_t tag = *((netio_tag_t*)data); if(socket->cb_msg_received) { socket->cb_msg_received(socket, tag, (char *)data + sizeof(netio_tag_t), len - sizeof(netio_tag_t)); } } void netio_subscription_table_init(struct netio_subscription_table* table) { table->socket_list = NULL; table->subscriptions = (struct netio_subscription*)malloc(NETIO_INITIAL_SUBSCRIPTIONS*sizeof(struct netio_subscription)); table->num_subscriptions = 0; table->size = NETIO_INITIAL_SUBSCRIPTIONS; table->ts = 0; } /* * Initializes a buffered publish socket. * * @param socket: The buffered publish socket to initialize * @param ctx: The NetIO context in which to initialize the socket * @param hostname: Hostname or IP address to bind to * @param port: Port to bind to * @param attr: Buffered connection settings to be used for the underlying connections * * @see `netio_buffered_send_socket_init` for a description of the connection parameters */ void netio_publish_libfabric_socket_init(struct netio_publish_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffered_socket_attr* attr) { memset(socket, 0, sizeof(*socket)); socket->ctx = ctx; socket->tcp_fi_mode=NETIO_MODE_LIBFABRIC; netio_subscription_table_init(&socket->subscription_table); netio_init_listen_socket(&socket->lsocket, ctx, NULL); if (attr->num_pages > NETIO_DOMAIN_MAX_MR){ log_error("Number of requested publisher netio pages %lu exceeds max value. Using %lu.", attr->num_pages, NETIO_DOMAIN_MAX_MR); attr->num_pages = NETIO_DOMAIN_MAX_MR; } memcpy(&socket->attr, attr, sizeof(struct netio_buffered_socket_attr)); socket->lsocket.usr = socket; socket->lsocket.cb_connection_established = lsocket_on_connection_established; socket->lsocket.cb_msg_received = lsocket_on_msg_received; socket->lsocket.recv_sub_msg = 1; char* _hostname = netio_domain_name_lookup(hostname); netio_listen(&socket->lsocket, (const char*)_hostname, port); free(_hostname); } void netio_publish_tcp_socket_init(struct netio_publish_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffered_socket_attr* attr) { memset(socket, 0, sizeof(*socket)); socket->ctx = ctx; socket->tcp_fi_mode = NETIO_MODE_TCP; netio_subscription_table_init(&socket->subscription_table); netio_init_listen_tcp_socket(&socket->lsocket, ctx, NULL); memcpy(&socket->attr, attr, sizeof(struct netio_buffered_socket_attr)); socket->lsocket.usr = socket; socket->lsocket.cb_connection_established = lsocket_on_connection_established; socket->lsocket.cb_msg_received = lsocket_on_msg_received; socket->lsocket.recv_sub_msg = 1; char* _hostname = netio_domain_name_lookup(hostname); netio_listen_tcp(&socket->lsocket, (const char*)_hostname, port); free(_hostname); } /* * Initializes a buffered publish socket. * * @param socket: The buffered publish socket to initialize * @param ctx: The NetIO context in which to initialize the socket * @param hostname: Hostname or IP address to bind to * @param port: Port to bind to * @param attr: Buffered connection settings to be used for the underlying connections * * @see `netio_buffered_send_socket_init` for a description of the connection parameters */ void netio_publish_socket_init(struct netio_publish_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffered_socket_attr* attr) { if (netio_tcp_mode(hostname)) { netio_publish_tcp_socket_init(socket, ctx, netio_hostname(hostname), port, attr); } else { netio_publish_libfabric_socket_init(socket, ctx, netio_hostname(hostname), port, attr); } } void netio_subscribe_libfabric_socket_init(struct netio_subscribe_socket* socket, struct netio_context* ctx, struct netio_buffered_socket_attr* attr, const char* hostname, const char* remote_host, unsigned remote_port) { memset(socket, 0, sizeof(*socket)); socket->ctx = ctx; socket->state = NONE; socket->tcp_fi_mode=NETIO_MODE_LIBFABRIC; memcpy(&socket->attr, attr, sizeof(struct netio_buffered_socket_attr)); netio_buffered_listen_socket_init(&socket->recv_socket, ctx, &socket->attr); socket->recv_socket.listen_socket.usr = socket; socket->recv_socket.cb_connection_established = subscribe_socket_on_connection_established; socket->recv_socket.cb_connection_closed = subscribe_socket_on_connection_closed; socket->recv_socket.cb_msg_received = subscribe_socket_on_msg_received; //set cb_buf_received meant only for pileup measurement to NULL socket->cb_buf_received = NULL; char* lookedup_hostname = netio_domain_name_lookup(hostname); char* lookedup_remote_hostname = netio_domain_name_lookup(remote_host); netio_buffered_listen(&socket->recv_socket, (const char*)lookedup_hostname, 0); socket->remote_hostname = strdup((const char*)lookedup_remote_hostname); socket->remote_port = remote_port; socket->total_tags = 0; free(lookedup_remote_hostname); free(lookedup_hostname); } /* Same as above except tcp instead of libfabric */ void netio_subscribe_tcp_socket_init(struct netio_subscribe_socket* socket, struct netio_context* ctx, struct netio_buffered_socket_attr* attr, const char* hostname, const char* remote_host, unsigned remote_port) { log_info("subscribe_tcp from <%s> to <%s>",hostname,remote_host); memset(socket, 0, sizeof(*socket)); socket->ctx = ctx; socket->state = NONE; socket->tcp_fi_mode=NETIO_MODE_TCP; memcpy(&socket->attr, attr, sizeof(struct netio_buffered_socket_attr)); log_dbg("Prepare recv socket (buffered_listen_tcp_socket)"); netio_buffered_listen_tcp_socket_init(&socket->recv_socket, ctx, &socket->attr); socket->recv_socket.listen_socket.usr = socket; socket->recv_socket.cb_connection_established = subscribe_socket_on_connection_established; socket->recv_socket.cb_connection_closed = subscribe_socket_on_connection_closed; socket->recv_socket.cb_msg_received = subscribe_socket_on_msg_received; //set cb_buf_received meant only for pileup measurement to NULL socket->cb_buf_received = NULL; char* lookedup_hostname = netio_domain_name_lookup(hostname); char* lookedup_remote_hostname = netio_domain_name_lookup(remote_host); netio_buffered_listen_tcp(&socket->recv_socket, (const char*)lookedup_hostname, 0); socket->msg.port=socket->recv_socket.listen_socket.port; socket->remote_hostname = strdup((const char*)lookedup_remote_hostname); socket->remote_port = remote_port; socket->total_tags = 0; free(lookedup_remote_hostname); free(lookedup_hostname); } void netio_subscribe_socket_init(struct netio_subscribe_socket* socket, struct netio_context* ctx, struct netio_buffered_socket_attr* attr, const char* hostname, const char* remote_host, unsigned remote_port) { if (netio_tcp_mode(hostname)) { netio_subscribe_tcp_socket_init(socket, ctx, attr, netio_hostname(hostname), netio_hostname(remote_host), remote_port); } else { netio_subscribe_libfabric_socket_init(socket, ctx, attr, netio_hostname(hostname), netio_hostname(remote_host), remote_port); } } void netio_subscription_cache_init(struct netio_subscription_cache* cache) { cache->ts = 0; cache->count = 0; cache->idx_start = 0; } static unsigned lookup_tag(struct netio_subscription_table* table, netio_tag_t tag, unsigned* start) { struct netio_subscription key; key.tag = tag; struct netio_subscription* ptr = (struct netio_subscription*)bsearch(&key, table->subscriptions, table->num_subscriptions, sizeof(struct netio_subscription), cmp_subscription); if(ptr == NULL) { return 0; } unsigned start_idx = ptr - table->subscriptions; while(start_idx > 0 && table->subscriptions[start_idx-1].tag == tag) { start_idx--; } unsigned count = 0; for(unsigned int i=start_idx; inum_subscriptions; i++) { if(table->subscriptions[i].tag == tag) { count++; } else { break; } } *start = start_idx; return count; } static unsigned lookup_tag_cached(struct netio_subscription_table* table, netio_tag_t tag, struct netio_subscription_cache* cache, unsigned* start) { if(cache->ts != table->ts) { cache->count = lookup_tag(table, tag, &cache->idx_start); cache->ts = table->ts; } *start = cache->idx_start; return cache->count; } int netio_buffered_publishi(struct netio_publish_socket* socket, struct iovec * iov, size_t len, int flags, struct netio_subscription_cache* cache) { int ret = NETIO_STATUS_OK; netio_tag_t tag = *(netio_tag_t*)iov[0].iov_base; unsigned start_idx; unsigned num_subscriptions; if(cache) { num_subscriptions = lookup_tag_cached(&socket->subscription_table, tag, cache, &start_idx); } else { num_subscriptions = lookup_tag(&socket->subscription_table, tag, &start_idx); } if(num_subscriptions == 0) return NETIO_STATUS_OK_NOSUB; for(unsigned i=start_idx; isubscription_table.subscriptions[i]; if(subscription->tag == tag) { // skip connections that were already successful if we are in reentry mode if(flags & NETIO_REENTRY) { if(subscription->again == 0) { continue; } } int result = netio_buffered_sendv(subscription->socket, iov, len); log_dbg("Sending iov on subscription->socket, result=%d",result); if(result == NETIO_STATUS_OK) { subscription->again = 0; } else if(result == NETIO_STATUS_AGAIN) { subscription->again = 1; ret = NETIO_STATUS_AGAIN; } else if(result == NETIO_STATUS_TOO_BIG) { subscription->again = 0; ret = NETIO_STATUS_TOO_BIG; } else { return result; // some error occured and we return immediately } } } return ret; } int netio_buffered_publish(struct netio_publish_socket* socket, netio_tag_t tag, void* data, size_t len, int flags, struct netio_subscription_cache* cache) { log_trc("netio_buffered_publish (size=%lu)", len); struct iovec iov[2]; iov[0].iov_base = &tag; iov[0].iov_len = sizeof(netio_tag_t); iov[1].iov_base = data; iov[1].iov_len = len; return netio_buffered_publishi(socket, iov, 2, flags, cache); } int netio_buffered_publishv(struct netio_publish_socket* socket, netio_tag_t tag, struct iovec * iov, size_t len, int flags, struct netio_subscription_cache* cache) { struct iovec iovv[NETIO_MAX_IOV_LEN + 1]; //NETIO_MAX_IOV_LEN is not a limititation, as all entries will be copied in one in netio_buffered_sendv iovv[0].iov_base = &tag; iovv[0].iov_len = sizeof(netio_tag_t); size_t size = (len > NETIO_MAX_IOV_LEN) ? NETIO_MAX_IOV_LEN : len; memcpy(iovv + 1, iov, size * sizeof(struct iovec)); return netio_buffered_publishi(socket, iovv, (size + 1), flags, cache); } void netio_buffered_publish_flush(struct netio_publish_socket* socket, netio_tag_t tag, struct netio_subscription_cache* cache) { for(unsigned i=0; isubscription_table.num_subscriptions; i++) { struct netio_subscription* subscription = &socket->subscription_table.subscriptions[i]; netio_buffered_flush(subscription->socket); } } int netio_subscribe(struct netio_subscribe_socket* socket, netio_tag_t tag) { if(socket->state == NONE) { //A new socket is created and the on subsocket_on_connection_established //will trigger the actual subscriptions. log_dbg("Creating and connecting a new send_socket"); if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC) { netio_init_send_socket(&socket->socket, socket->ctx); socket->socket.usr = socket; socket->socket.cb_connection_established = subsocket_on_connection_established; socket->socket.cb_connection_closed = subsocket_on_send_connection_closed; socket->socket.cb_error_connection_refused = subsocket_on_error_connection_refused; socket->socket.cb_send_completed = subsocket_on_send_completed; netio_connect(&socket->socket, socket->remote_hostname, socket->remote_port); } else if (socket->tcp_fi_mode == NETIO_MODE_TCP) { netio_init_send_tcp_socket(&socket->socket, socket->ctx); socket->socket.usr = socket; socket->socket.cb_connection_established = subsocket_on_connection_established; socket->socket.cb_connection_closed = subsocket_on_send_connection_closed; socket->socket.cb_error_connection_refused = subsocket_on_error_connection_refused; socket->socket.cb_send_completed = subsocket_on_send_completed; netio_connect_tcp(&socket->socket, socket->remote_hostname, socket->remote_port); } socket->state = INITIALIZED; } for(unsigned i=0; itotal_tags; i++) { if(socket->tags_to_subscribe[i] == tag) { return 0; } } //if send socket connected send message //otherwise on_connection_established will do it if (socket->socket.state){ log_info("Sending subscription message for tag 0x%lx", tag); int ret = send_subscription_message(socket, tag, NETIO_SUBSCRIBE); return ret; } else { log_info("Queing subscription message for txg 0x%lx", tag); socket->tags_to_subscribe[socket->total_tags] = tag; socket->total_tags++; return 0; } } static int remove_tag_to_subscribe(struct netio_subscribe_socket* socket, netio_tag_t tag) { int found = 0; for(unsigned int i=0; itotal_tags; ++i){ if(socket->tags_to_subscribe[i] == tag){ log_info("Removing tag 0x%lx from tags_to_subscribe of socket %p", socket); for(unsigned int j = i; j < socket->total_tags-1; ++j){ socket->tags_to_subscribe[j] = socket->tags_to_subscribe[j+1]; } found = 1; socket->total_tags--; break; } } if(found == 0){ log_warn("tag 0x%lx expected in tags_to_subscribe of socket %p not found", tag); } return NETIO_STATUS_OK; } int netio_unsubscribe(struct netio_subscribe_socket* socket, netio_tag_t tag) { int ret = NETIO_STATUS_OK; if(socket->state == INITIALIZED) { log_dbg("Subscribe socket initialised, can proceed with usubscription"); if (socket->socket.state) { ret = send_subscription_message(socket, tag, NETIO_UNSUBSCRIBE); log_dbg("Unsubscription message for tag 0x%lx sent, return code %d", tag, ret); } else { ret = remove_tag_to_subscribe(socket, tag); } } else { log_dbg("The connection has been already closed."); } return ret; } unsigned netio_pubsocket_get_minimum_pages(struct netio_publish_socket* socket) { if (!socket) { return 0; } size_t pages = socket->attr.num_pages; struct netio_socket_list* itr = socket->subscription_table.socket_list; while(itr != NULL){ struct netio_buffered_send_socket* buf_send_socket = (struct netio_buffered_send_socket*)itr->socket; uint64_t socket_pages = buf_send_socket->buffers.available_buffers; if(socket_pages < pages){ pages = socket_pages; } itr = itr->next; } return pages; }