Program Listing for File unbufpubsub.c

Return to documentation for file (unbufpubsub.c)

#include <stdio.h>
#include <string.h>
#include <assert.h>
#include "log.h"
#include "netio/netio.h"
#include "netio/netio_tcp.h"

#include <sys/socket.h>
#include <netdb.h>


#define PUBLISH_SOCKET_MAX_COMPLETIONS (512)

#if defined DEBUG || defined DEBUG_UPUB
#define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
#define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
#define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
#else
#define log_dbg(...)
#define log_trc(...)
#endif

static unsigned lookup_tag(struct netio_subscription_table* table, netio_tag_t tag, unsigned* start);

static int
cmp_subscription(const void* a, const void *b)
{
  struct netio_subscription* suba = (struct netio_subscription*)a;
  struct netio_subscription* subb = (struct netio_subscription*)b;

  if(suba->tag == subb->tag) {
    return 0;
  }
  return suba->tag > subb->tag ? 1 : -1;
}


static int
table_add_subscription(struct netio_subscription_table* table, netio_tag_t tag, struct netio_send_socket* socket)
{
  if(table->num_subscriptions == table->size) {
    log_error("Maximum number of subscriptions. New subscription for 0x%lx dropped.", tag);
    return 1;
  }

  //Check if the subscription is already in the list
  unsigned start;
  unsigned count = lookup_tag(table, tag, &start);
  for(unsigned i=0; i<count; i++) {
    if(table->subscriptions[start+i].socket == socket) {
      return 0;
    }
  }

  table->subscriptions[table->num_subscriptions].tag = tag;
  table->subscriptions[table->num_subscriptions].socket = socket;
  table->subscriptions[table->num_subscriptions].again = 0;
  table->num_subscriptions++;
  table->ts++;
  log_info("New entry in subscription table, tag=%lu in hex 0x%lx, socket=%p, total #subscription=%lu", tag, tag, socket, table->num_subscriptions);
  qsort(table->subscriptions, table->num_subscriptions, sizeof(struct netio_subscription), cmp_subscription);
  return 0;
}


static void
table_remove_subscription(struct netio_unbuffered_publish_socket* pubsocket, struct netio_send_socket* socket, netio_tag_t tag, uint8_t closed_connection)
{
  struct netio_subscription_table* table = &pubsocket->subscription_table;
  log_dbg("Total subscriptions: %lu", table->num_subscriptions);

  unsigned i=0;
  unsigned remaining_subscriptions_of_socket=0;
  for(unsigned int k = 0; k < table->num_subscriptions; ++k) {
    if(table->subscriptions[k].socket == socket){remaining_subscriptions_of_socket++;}
  }

  while(i<table->num_subscriptions) {
    log_dbg("Removing subscription tag  %lx table socket %p socket %p", table->subscriptions[i].tag, table->subscriptions[i].socket, socket);
    if(table->subscriptions[i].socket == socket && (table->subscriptions[i].tag == tag || closed_connection)) {
      //increment the completion semaphore counter as if messages were sent to the disconnected client
      struct netio_completion_stack* cs = &pubsocket->completion_stack;
      for(size_t j=0; j < cs->num_objects; ++j) {
        if(cs->objects[j].header.tag == table->subscriptions[i].tag && cs->objects[j].key != 0xFFFFFFFFFFFFFFFF){
          netio_semaphore_increment(&cs->objects[j].sem, 1);
        }
      }

      log_dbg("Available completion objects %lu / %lu",  cs->available_objects, cs->num_objects);
      log_dbg("Removing connection in subscription table, tag=%lu, socket=%p index %lu becomes %u",
                table->subscriptions[i].tag,
                table->subscriptions[i].socket,
                table->num_subscriptions-1,
                i);
      table->subscriptions[i].tag = table->subscriptions[table->num_subscriptions-1].tag;
      table->subscriptions[i].socket = table->subscriptions[table->num_subscriptions-1].socket;
      table->subscriptions[i].again = table->subscriptions[table->num_subscriptions-1].again;
      table->num_subscriptions--;
      remaining_subscriptions_of_socket--;
      table->ts++;
    }
    else{
      i++;
      log_dbg("Skipping tag=%lu because the socket is %p", table->subscriptions[i].tag, table->subscriptions[i].socket);
    }
  }

  log_dbg("At the end of table_remove_subscription available completion objects %lu / %lu",  pubsocket->completion_stack.available_objects, pubsocket->completion_stack.num_objects);
  qsort(table->subscriptions, table->num_subscriptions, sizeof(struct netio_subscription), cmp_subscription);
  log_dbg("Remaining subscriptions: %lu",table->num_subscriptions);

  log_info("Removing entry from unbuffered subscription table, Fid=0x%lx, socket=%p, total #subscription=%lu", tag, socket, table->num_subscriptions);

  if(remaining_subscriptions_of_socket==0){
    log_info("No subscriptions remaining");
    if (socket->tcp_fi_mode == NETIO_MODE_TCP){
      // netio_close_socket(&socket->ctx->evloop,socket,USEND); // TODO prevents re-subscription
    } else if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC && closed_connection==0){
      netio_disconnect(socket);
    }
  }
}

static int
table_get_subs_by_socket(struct netio_subscription_table* table, struct netio_send_socket* socket)
{
  log_dbg("Subscription table lookup: Total subscriptions: %lu", table->num_subscriptions);
  unsigned remaining_subscriptions_of_socket=0;
  for(unsigned int k = 0; k < table->num_subscriptions; ++k) {
    if(table->subscriptions[k].socket == socket){remaining_subscriptions_of_socket++;}
  }
  log_dbg("Subscription table lookup: for socket %p: %lu", socket, remaining_subscriptions_of_socket);
  return remaining_subscriptions_of_socket;
}

static void
release_completion_object(struct netio_completion_object* completion)
{
  log_trc("releasing completion object");
  if(netio_completion_stack_push(&completion->socket->completion_stack, completion)) {
    log_error("Could not push completion object since stack was full. This should not happen.");
  }
}

static void
on_completion_trigger(void* c)
{
  struct netio_completion_object* completion = (struct netio_completion_object*)c;
  log_trc("calling cb_msg_published");
  if(completion->socket->cb_msg_published) {
    completion->socket->cb_msg_published(completion->socket, completion->key);
  }
  release_completion_object(completion);
}

void print_completion_objects(struct netio_unbuffered_publish_socket* socket){
  printf("Number of available completion objects: %zu \n", socket->completion_stack.available_objects);
  printf("===============================================\n");
  printf("CO# \t KEY \t \t TAG \n");
  printf("-----------------------------------------------\n");
  for(unsigned int i=0; i < socket->completion_stack.num_objects; ++i){
    uint32_t tag = (socket->completion_stack.key_array[i] >> 32);
    printf("%u \t 0x%lx \t %x \n", i, socket->completion_stack.key_array[i], tag);
  }
  printf("===============================================\n");
  printf("Subscription table: there are %zu elinks subscribed\n", socket->subscription_table.num_subscriptions);
  printf("FID \t \t \t SOCKET\n");
  printf("-----------------------------------------------\n");
  for (unsigned int i=0; i < socket->subscription_table.num_subscriptions; ++i){
    printf("0x%lx \t %p \n", socket->subscription_table.subscriptions[i].tag, socket->subscription_table.subscriptions[i].socket);
  }
  socket->completion_stack.printed = 1;
}



static struct netio_completion_object*
fetch_completion_object(struct netio_unbuffered_publish_socket* socket)
{
  struct netio_completion_object* completion;
  if(netio_completion_stack_pop(&socket->completion_stack, &completion)) {
#if defined DEBUG || defined DEBUG_UPUB
    if (socket->completion_stack.printed == 0){
      print_completion_objects(socket);
    }
#endif
    return NULL;
  }

  netio_semaphore_init(&completion->sem, 0);
  completion->sem.data = completion;
  completion->sem.cb = on_completion_trigger;
  completion->socket = socket;
  return completion;
}

static void
increment_completion_object(struct netio_unbuffered_publish_socket* pubsocket, uint64_t key)
{
  log_trc("incrementing completion object");
  struct netio_completion_object *completion = (struct netio_completion_object*)key;
  netio_semaphore_increment(&completion->sem, 1);
  log_trc("current: %d     expected: %d", completion->sem.current, completion->sem.threshold);
}

static void
pubsocket_on_connection_established(struct netio_send_socket* socket)
{
  log_dbg("publish socket established connection to remote, can publish now");
  struct netio_unbuffered_publish_socket* pubsocket = (struct netio_unbuffered_publish_socket*)socket->unbuf_pub_socket;
  //add deferred subscriptions to the table
  struct deferred_subscription* sub = socket->deferred_subs;
  while(sub){
    int ret = table_add_subscription(&pubsocket->subscription_table, sub->tag, socket);
    if(pubsocket->cb_subscribe && ret == 0){
      pubsocket->cb_subscribe(pubsocket, sub->tag, NULL, 0);
    }
    pop_subscription(&sub);
  }
  //user callback
  if(pubsocket->cb_connection_established) {
    pubsocket->cb_connection_established(pubsocket);
  }
}


static void
pubsocket_on_connection_closed(struct netio_send_socket* socket)
{
  log_info("published socket: connection to remote was closed");
  struct netio_unbuffered_publish_socket* pubsocket = (struct netio_unbuffered_publish_socket*)socket->unbuf_pub_socket;
  if(pubsocket->cb_connection_closed) {
    pubsocket->cb_connection_closed(pubsocket);
  }
  if (table_get_subs_by_socket(&pubsocket->subscription_table, socket)){
    uint8_t closed_connection = 1;
    table_remove_subscription(pubsocket, socket, 0, closed_connection);
  }
  handle_send_socket_shutdown(socket);
  remove_socket(&(pubsocket->subscription_table.socket_list), socket);
}


static void
on_unbuffered_send_connection_closed(struct netio_send_socket* socket)
{
    log_info("Send socket: connection to remote was closed");
    if(socket->unbuf_pub_socket != NULL) {
      pubsocket_on_connection_closed(socket);
    } else{
      handle_send_socket_shutdown(socket);
    }
}

static void
pubsocket_on_send_completed(struct netio_send_socket* socket, uint64_t key)
{
  struct netio_unbuffered_publish_socket* pubsocket = (struct netio_unbuffered_publish_socket*)socket->unbuf_pub_socket;
  increment_completion_object(pubsocket, key);
  log_trc("completion on pubsocket connection, key=%lu", key);
}


static struct netio_send_socket*
socket_list_add_or_lookup(struct netio_unbuffered_publish_socket* pubsocket,
                          struct netio_socket_list** list,
                          void* addr, size_t addrlen,
                          int port,
                          struct netio_context* ctx)
{
  if(addrlen == 0) {
    log_error("Invalid zero-byte address");
    return NULL;
  }

  struct netio_socket_list* entry = find_socket_by_address(*list, addr, addrlen, port);

  if ( entry == NULL ) {
    entry = add_socket_with_address(list, USEND, addr, addrlen, port);
    struct netio_send_socket* socket = (struct netio_send_socket*)entry->socket;

    if (pubsocket->tcp_fi_mode==NETIO_MODE_LIBFABRIC) {
      struct netio_domain* domain = NULL;  //domain is shared among sockets,
      if (entry->next != NULL && entry->next->socket != NULL) {
        domain = ((struct netio_send_socket*)(entry->next->socket))->domain;
      }
      netio_init_send_socket(socket, ctx); //here we memset to zero and we loose domain
      socket->unbuf_pub_socket = pubsocket;
      socket->cb_connection_established = pubsocket_on_connection_established;
      socket->cb_connection_closed = pubsocket_on_connection_closed;
      socket->cb_send_completed = pubsocket_on_send_completed;
      netio_connect_rawaddr_domain(socket, entry->addr, entry->addrlen, domain);
    if ( domain == NULL ){  //check on local domain variable
      netio_register_send_buffer(socket, &pubsocket->buf, 0);
      netio_completion_stack_register_send_socket(&pubsocket->completion_stack, socket);
      }
    }
    else if (pubsocket->tcp_fi_mode==NETIO_MODE_TCP){
      netio_init_send_tcp_socket(socket, ctx);
      socket->unbuf_pub_socket = pubsocket;
      socket->cb_connection_established = pubsocket_on_connection_established;
      socket->cb_connection_closed = pubsocket_on_connection_closed;
      socket->cb_send_completed = pubsocket_on_send_completed;
      log_dbg("Calling netio_connect_tcp for addr=<%s> port=%d", (char*)entry->addr, port);
      netio_connect_tcp(socket, entry->addr, port);
    }
    else {
      log_error("socket_list_add_or_lookup called for unkown connection mode %d", pubsocket->tcp_fi_mode);
      remove_socket(list, socket);
      return NULL;
    }
  }

  struct netio_send_socket* ss = (struct netio_send_socket*)entry->socket;
  return ss;
}


static void
subscribe(struct netio_unbuffered_publish_socket* pubsocket, struct netio_recv_socket* recv_socket, void* addr, size_t addrlen, int port, netio_tag_t tag)
{
  struct netio_send_socket* socket = socket_list_add_or_lookup(pubsocket, &pubsocket->subscription_table.socket_list, addr, addrlen, port, pubsocket->ctx);
  if(socket->recv_socket == NULL){
    socket->recv_socket =recv_socket;
  }

  if (socket->state == CONNECTED){
    table_add_subscription(&pubsocket->subscription_table, tag, socket);
    if(pubsocket->cb_subscribe) {
      pubsocket->cb_subscribe(pubsocket, tag, addr, addrlen);
    }
  } else {
    push_back_subscription(&socket->deferred_subs, tag);
  }
}

static void
unsubscribe(struct netio_unbuffered_publish_socket* pubsocket, void* addr, size_t addrlen, int port, netio_tag_t tag)
{
  struct netio_socket_list* list = find_socket_by_address(pubsocket->subscription_table.socket_list, addr, addrlen, port);
  if(list == NULL){return;}

  struct netio_send_socket* socket = (struct netio_send_socket*)list->socket;
  uint8_t connection_closed = 0;
  table_remove_subscription(pubsocket, socket, tag, connection_closed);
  pubsocket->subscription_table.ts++;

  if(pubsocket->cb_unsubscribe) {
    pubsocket->cb_unsubscribe(pubsocket, tag, addr, addrlen);
  }
}

static void
lsocket_on_connection_established(struct netio_recv_socket* socket)
{
  log_dbg("Buffered listen socket: on connection established");
  if(socket->tcp_fi_mode == NETIO_MODE_TCP){
    struct netio_buffer* buf = malloc(sizeof(struct netio_buffer));
    buf->size = sizeof(struct netio_subscription_message);
    buf->data = malloc(buf->size);
    netio_post_recv(socket, buf);
    socket->usr = buf;
  }
}


static void
parse_subscription_message(struct netio_unbuffered_publish_socket* socket, struct netio_subscription_message* msg, struct netio_recv_socket* recv_socket)
{
  if (msg->action){
    log_dbg("subscription request for tag %lu, addrlen=%lu", msg->tag, msg->addrlen);
    subscribe(socket, recv_socket, msg->addr, msg->addrlen, msg->port, msg->tag);
  }
  else{
    log_dbg("unsubscription request for tag %lu, addrlen=%lu", msg->tag, msg->addrlen);
    unsubscribe(socket, msg->addr, msg->addrlen, msg->port, msg->tag);
  }
}


static void
lsocket_on_msg_received(struct netio_recv_socket* socket, struct netio_buffer* buf, void* data, size_t len)
{
    log_trc("message received");
    struct netio_unbuffered_publish_socket* pubsocket = (struct netio_unbuffered_publish_socket*)socket->lsocket->usr;
    if(len != sizeof(struct netio_subscription_message)) {
      log_error("Illegal subscription message size %lu", len);
      netio_post_recv(socket, buf);
      return;
    }
    parse_subscription_message(pubsocket, (struct netio_subscription_message*)buf->data, socket);
    netio_post_recv(socket, buf);
}


static int
send_subscription_message(struct netio_unbuffered_subscribe_socket* socket, netio_tag_t tag, int action)
{
    if( action == NETIO_SUBSCRIBE ){
      log_info("Sending subscription for tag 0x%lx", tag);
    } else if ( action == NETIO_UNSUBSCRIBE ){
      log_info("Sending unsubscription for tag 0x%lx", tag);
    } else {
      log_error("Invalid subscription action %d", action);
      return 0;
    }
    int ret = 0;
    if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
      socket->msg.tag = tag;
      socket->msg.action = action;
      socket->buf.data = &socket->msg;
      socket->buf.size = sizeof(struct netio_subscription_message);
      ret = netio_send_inline_buffer(&socket->socket, &socket->buf);
    } else if (socket->tcp_fi_mode == NETIO_MODE_TCP){
      // look for available buf
      size_t id = 0;
      while (socket->bufs[id].to_send) {
        id++;
      }
      log_dbg("tag id %d", id);
      socket->msgs[id] = socket->msgs[0]; // initialize msg to default
      socket->msgs[id].tag = tag;
      socket->msgs[id].action = action;
      socket->bufs[id].data = &socket->msgs[id];
      socket->bufs[id].size = sizeof(struct netio_subscription_message);
      socket->bufs[id].to_send = 1;
      ret = netio_send_buffer(&socket->socket, &socket->bufs[id]);
    } else {
      log_error("Socket connection type unsupported %d", socket->tcp_fi_mode);
      ret = 1;
    }
    return ret;
}

void
netio_unbuffered_publish_libfabric_socket_init(struct netio_unbuffered_publish_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffer* buf)
{
    log_dbg("netio_unbuffered_publish_libfabric_socket_init");
    socket->ctx = ctx;
    socket->tcp_fi_mode = NETIO_MODE_LIBFABRIC;
    memcpy(&socket->buf, buf, sizeof(struct netio_buffer));
    netio_subscription_table_init(&socket->subscription_table);
    netio_completion_stack_init(&socket->completion_stack, PUBLISH_SOCKET_MAX_COMPLETIONS);
    netio_init_listen_socket(&socket->lsocket, ctx, NULL);
    socket->lsocket.usr = socket;
    socket->lsocket.cb_connection_established = lsocket_on_connection_established;
    socket->lsocket.cb_msg_received = lsocket_on_msg_received;
    socket->lsocket.recv_sub_msg = 1;
    netio_listen(&socket->lsocket, hostname, port);

    socket->buf_array[0] = &socket->completion_stack.buf;
    for(unsigned i=1; i<NETIO_MAX_IOV_LEN; i++) {
      socket->buf_array[i] = &socket->buf;
    }
}

/* tcp versio of above */
void
netio_unbuffered_publish_tcp_socket_init(struct netio_unbuffered_publish_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffer* buf)
{
    log_dbg("INIT: netio_unbuffered_publish_tcp_socket_init");
    socket->ctx = ctx;
    socket->tcp_fi_mode = NETIO_MODE_TCP;
    memcpy(&socket->buf, buf, sizeof(struct netio_buffer));
    netio_subscription_table_init(&socket->subscription_table);
    netio_completion_stack_init(&socket->completion_stack, PUBLISH_SOCKET_MAX_COMPLETIONS);
    netio_init_listen_tcp_socket(&socket->lsocket, ctx, NULL);
    socket->lsocket.usr = socket;
    socket->lsocket.cb_connection_established = lsocket_on_connection_established;
    socket->lsocket.cb_msg_received = lsocket_on_msg_received;
    socket->lsocket.recv_sub_msg = 1;
    netio_listen_tcp(&socket->lsocket, hostname, port);

    socket->buf_array[0] = &socket->completion_stack.buf;
    for(unsigned i=1; i<NETIO_MAX_IOV_LEN; i++) {
      socket->buf_array[i] = &socket->buf;
    }
}

void
netio_unbuffered_publish_socket_init(struct netio_unbuffered_publish_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffer* buf)
{
    if (netio_tcp_mode(hostname)) {
        netio_unbuffered_publish_tcp_socket_init(socket, ctx, netio_hostname(hostname), port, buf);
    } else {
        netio_unbuffered_publish_libfabric_socket_init(socket, ctx, netio_hostname(hostname), port, buf);
    }
}

static unsigned
lookup_tag(struct netio_subscription_table* table, netio_tag_t tag, unsigned* start)
{
  struct netio_subscription key;
  key.tag = tag;
  struct netio_subscription* ptr = (struct netio_subscription*)bsearch(&key,
                                                                       table->subscriptions,
                                                                       table->num_subscriptions,
                                                                       sizeof(struct netio_subscription),
                                                                       cmp_subscription);

  log_trc("found ptr=0x%p", ptr);
  if(ptr == NULL) {
    return 0;
  }
  unsigned start_idx = ptr - table->subscriptions;
    while(start_idx > 0 && table->subscriptions[start_idx-1].tag == tag) {
        start_idx--;
    }
    log_trc("start_idx=%d", start_idx);
  unsigned count = 0;
  for(unsigned i=start_idx; i<table->num_subscriptions; i++) {
    if(table->subscriptions[i].tag == tag) {
      count++;
    } else {
      break;
    }
  }
  *start = start_idx;
  return count;
}

static unsigned
lookup_tag_cached(struct netio_subscription_table* table, netio_tag_t tag, struct netio_subscription_cache* cache, unsigned* start)
{
  if(cache->ts != table->ts) {
        log_trc("subscription updating cache");
    cache->count = lookup_tag(table, tag, &cache->idx_start);
    cache->ts = table->ts;
  }

#if defined DEBUG || defined DEBUG_UBUF
    printf("Subscription table:");
      for(unsigned i=0; i<table->num_subscriptions; i++) {
            printf("%u:%lu ", i, table->subscriptions[i].tag);
        }
        printf("\n");
#endif

  *start = cache->idx_start;
    log_trc("cache count=%d for tag=%lu", cache->count, tag);
  return cache->count;
}

int
netio_unbuffered_publishv(struct netio_unbuffered_publish_socket* socket,
                              netio_tag_t tag,
                              struct iovec* iov,
                              size_t count,
                              uint64_t* key,
                              int flags,
                              struct netio_subscription_cache* cache)
{
    return netio_unbuffered_publishv_usr(socket, tag, iov, count, key, flags, cache, 0, 0);
}

int
netio_unbuffered_publishv_usr(struct netio_unbuffered_publish_socket* socket,
                              netio_tag_t tag,
                              struct iovec* iov,
                              size_t count,
                              uint64_t* key,
                              int flags,
                              struct netio_subscription_cache* cache,
                              uint64_t usr,
                              uint8_t usr_size)
{
  int ret = NETIO_STATUS_OK;
  if(count > NETIO_MAX_IOV_LEN-1) {
    return NETIO_ERROR_MAX_IOV_EXCEEDED;
  }

  log_trc("unbuffered publishv, key=0x%p, tag=%lu", key, tag);

  unsigned start_idx;
  unsigned num_subscriptions;

  if(cache) {
    num_subscriptions = lookup_tag_cached(&socket->subscription_table, tag, cache, &start_idx);
  } else {
    num_subscriptions = lookup_tag(&socket->subscription_table, tag, &start_idx);
  }

  log_trc("unbuffered publishv: num_subscriptions for tag %lu: %d, start_idx: %d", tag, num_subscriptions, start_idx);
  if(num_subscriptions == 0) {
    if(socket->cb_msg_published) {
      socket->cb_msg_published(socket, *key);
    }
    return NETIO_STATUS_OK_NOSUB;
  }

  int used_completion = 0;
  struct netio_completion_object* completion = NULL;
  if(flags & NETIO_REENTRY) {
    log_trc("unbuffered publishv: REENTRY - fetching completion from user");
    completion = (struct netio_completion_object*)(*key);
    used_completion = 1;
  } else {
    log_trc("unbuffered publishv: fetching completion object");
    completion = fetch_completion_object(socket);
    if(completion) {
      netio_semphore_set_threshold(&completion->sem, num_subscriptions);
      completion->key = *key;
      completion->header.tag = tag;
      completion->header.usr = usr;
      completion->usr_size = usr_size;
      socket->completion_stack.key_array[socket->completion_stack.available_objects] = *key;
      *key = (uint64_t)completion;
      used_completion = 0;
      log_trc("fetched completion: %lu for tag 0%lx", completion->key, tag);
    } else {
      log_trc("unbuffered publishv: no completion available -> AGAIN");
      // When no completion is available, we return NETIO_STATUS_AGAIN
      // The user is supposed to call the same call again (no need to keep track of completion object)
      return NETIO_STATUS_AGAIN;
    }
  }


  for(unsigned i=start_idx; i<start_idx+num_subscriptions; i++) {
    struct netio_subscription* subscription = &socket->subscription_table.subscriptions[i];
    log_trc("subscription %d has tag %lu", i, subscription->tag);
    if(subscription->tag == tag) {
      // skip connections that were already successful if we are in reentry mode
      if(flags & NETIO_REENTRY) {
        if(subscription->again == 0) {
          continue;
        }
      }

      struct netio_send_socket* ssocket = (struct netio_send_socket*)(subscription->socket);
      log_trc("unbuffered publishv: send message on subscribed connection, iov-count=%lu, iov-len[0]=%lu", count, iov[0].iov_len);
      int result;
      struct iovec hdr_iov[NETIO_MAX_IOV_LEN];
      hdr_iov[0].iov_base = &completion->header;
      hdr_iov[0].iov_len = sizeof(netio_tag_t) + completion->usr_size;
      uint32_t total_size=hdr_iov[0].iov_len;

      for(unsigned j=0; j<count; j++) {
        hdr_iov[1+j].iov_base = iov[j].iov_base;
        hdr_iov[1+j].iov_len = iov[j].iov_len;
        total_size+=iov[j].iov_len;
      }
      log_trc("iov: [0]: %lu - compl: %lu", *((netio_tag_t*)hdr_iov[0].iov_base), completion->key);
      if (socket->tcp_fi_mode==NETIO_MODE_LIBFABRIC) {
        result = netio_sendv_imm(ssocket, socket->buf_array, hdr_iov, count+1, (uint64_t)completion, 0);
      }
      else {
        result = netio_tcp_sendv_imm(ssocket, total_size,
                                     hdr_iov, count+1, (uint64_t)completion, 0);
      }
      log_trc("unbuffered publishv: result=%d", result);

      if(result == NETIO_STATUS_OK) {
        used_completion = 1;
        subscription->again = 0;
      } else if(result == NETIO_STATUS_AGAIN) {
        subscription->again = 1;
        used_completion = 1;
        ret = NETIO_STATUS_PARTIAL;
      } else {
        return result;
        // some error occured and we return immediately
        // TODO we should handle the error and unsubscribe the faulty remote
      }
    }
  }

  if(used_completion == 0) {
    netio_completion_stack_push(&socket->completion_stack, completion);
  }

  return ret;
}


static void
subscribe_socket_on_connection_established(struct netio_recv_socket* socket)
{
    struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->lsocket->usr;
  if(subscribe_socket->cb_connection_established) {
    subscribe_socket->cb_connection_established(subscribe_socket);
  }
}


static void
subscribe_socket_on_connection_closed(struct netio_recv_socket* socket)
{
    struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->lsocket->usr;
    log_info("subscribe socket: connection closed");
  if(subscribe_socket->cb_connection_closed)  {
    subscribe_socket->cb_connection_closed(subscribe_socket);
  }
}


static void
subscribe_socket_on_msg_received(struct netio_recv_socket* rsocket, struct netio_buffer* buf, void* data, size_t len)
{
    struct netio_unbuffered_subscribe_socket* socket = (struct netio_unbuffered_subscribe_socket*)rsocket->lsocket->usr;

    netio_tag_t tag;
    memcpy(&tag, data, sizeof(netio_tag_t));
    data = (char *)data + sizeof(netio_tag_t);
    len -= sizeof(netio_tag_t);

    log_trc("buffer received of length %lu for tag %lu", len, tag);

    if(socket->cb_msg_received) {
      socket->cb_msg_received(socket, tag, data, len);
    }

    netio_post_recv(rsocket, buf);
}


static void
subsocket_on_connection_established(struct netio_send_socket* socket)
{
    log_dbg("subsocket connection established");
    int ret;
    struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->usr;

    if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
      subscribe_socket->msg.addrlen = NETIO_MAX_ADDRLEN;
      if((ret = fi_getname(&subscribe_socket->recv_socket.pep->fid,
                          subscribe_socket->msg.addr,
                          &subscribe_socket->msg.addrlen)) !=0) {
        log_fatal("Cannot get information on local interface %d: %s", ret, fi_strerror(-ret));
        exit(1);
      }
      subscribe_socket->buf.data = &subscribe_socket->msg;
      subscribe_socket->buf.size = sizeof(struct netio_subscription_message);
      netio_register_send_buffer(socket, &subscribe_socket->buf, 0);
    }
    else if (socket->tcp_fi_mode == NETIO_MODE_TCP) {
      // initialize msgs[0] with defaults
      struct sockaddr sock_addr;
      socklen_t addrlen=sizeof(sock_addr);
      getsockname(socket->eq_ev_ctx.fd, &sock_addr, &addrlen);
      getnameinfo(&sock_addr, addrlen,
                subscribe_socket->msgs[0].addr,NETIO_MAX_ADDRLEN,
                NULL, 0, NI_NUMERICHOST);
      addrlen=strlen(subscribe_socket->msgs[0].addr);
      subscribe_socket->msgs[0].addr[addrlen] = 0;
      subscribe_socket->msgs[0].addrlen = addrlen+1;
      subscribe_socket->msgs[0].port = subscribe_socket->msg.port;
    }

    if(subscribe_socket->total_tags == 0){
      log_info("Closing send connection again because there is no tag to subscribe to.");
      netio_disconnect(socket);
    }

    if(subscribe_socket->total_tags == 0){
      log_info("Closing send connection again because there is no tag to subscribe to.");
      netio_disconnect(socket);
    }

    // send tags one by one
    while(subscribe_socket->total_tags > 0){
      size_t idx = subscribe_socket->total_tags - 1;
      netio_tag_t tag = subscribe_socket->tags_to_subscribe[idx];
      log_info("subsocket_on_connection_established, sending subscription for tag 0x%lx pos %lu", tag, idx);
      ret = send_subscription_message(subscribe_socket, tag, NETIO_SUBSCRIBE);
      if (ret == NETIO_STATUS_OK){
        subscribe_socket->total_tags--;
      } else {
        log_warn("subsocket_on_connection_established: send_subscription_message returned %d", ret);
        break;
      }
    }
}


static void
subsocket_on_connection_closed(struct netio_send_socket* socket)
{
    struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->usr;
    subscribe_socket->state = NONE;
}


static void
subsocket_on_error_connection_refused(struct netio_send_socket* socket) {
    log_dbg("subsocket connection refused");
    struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->usr;
    subscribe_socket->state = NONE;
    handle_listen_socket_shutdown(&subscribe_socket->recv_socket);
    if(subscribe_socket->cb_error_connection_refused) {
      subscribe_socket->cb_error_connection_refused(subscribe_socket);
    }
}


static void
subsocket_on_send_completed(struct netio_send_socket* socket, uint64_t key)
{
    struct netio_unbuffered_subscribe_socket* subscribe_socket = (struct netio_unbuffered_subscribe_socket*)socket->usr;

    //check for remaining tags from on_connection_established
    while(subscribe_socket->total_tags > 0){
      size_t idx = subscribe_socket->total_tags - 1;
      netio_tag_t tag = subscribe_socket->tags_to_subscribe[idx];
      int ret = send_subscription_message(subscribe_socket, tag, NETIO_SUBSCRIBE);
      if (ret == NETIO_STATUS_OK){
        subscribe_socket->total_tags--;
      } else {
        log_warn("subsocket_on_send_completed: send_subscription_message returned %d", ret);
        break;
      }
    }
}


void netio_unbuffered_subscribe_libfabric_socket_init(struct netio_unbuffered_subscribe_socket* socket,
                                                      struct netio_context* ctx,
                                                      const char* hostname,
                                                      const char* remote_host,
                                                      unsigned remote_port,
                                                      size_t buffer_size,
                                                      size_t count)
{
    memset(socket, 0, sizeof(*socket));
    socket->ctx = ctx;
    socket->state = NONE;
  socket->tcp_fi_mode = NETIO_MODE_LIBFABRIC;
  struct netio_unbuffered_socket_attr attr = {count, buffer_size};
    netio_init_listen_socket(&socket->recv_socket, ctx, &attr);
    socket->recv_socket.usr = socket;
    socket->recv_socket.cb_msg_received = subscribe_socket_on_msg_received;
    socket->recv_socket.cb_connection_established = subscribe_socket_on_connection_established;
  socket->recv_socket.cb_connection_closed = subscribe_socket_on_connection_closed;
    netio_listen(&socket->recv_socket, hostname, 0);

    socket->remote_hostname = strdup(remote_host);
    socket->remote_port = remote_port;

    log_dbg("subscribe socket is listening");

    socket->total_tags = 0;
}


/* As above but with tcp instead of libfabric   */
void netio_unbuffered_subscribe_tcp_socket_init(struct netio_unbuffered_subscribe_socket* socket,
                                            struct netio_context* ctx,
                                            const char* hostname,
                                            const char* remote_host,
                                            unsigned remote_port,
                                            size_t buffer_size,
                                            size_t count)
{
  log_dbg("INIT: netio_unbuffered_subscribe_tcp_socket_init");
    memset(socket, 0, sizeof(*socket));
    socket->ctx = ctx;
    socket->state = NONE;
  socket->tcp_fi_mode = NETIO_MODE_TCP;
  struct netio_unbuffered_socket_attr attr = {count, buffer_size};
    netio_init_listen_tcp_socket(&socket->recv_socket, ctx, &attr);
    socket->recv_socket.usr = socket;
    socket->recv_socket.cb_msg_received = subscribe_socket_on_msg_received;
    socket->recv_socket.cb_connection_established = subscribe_socket_on_connection_established;
  socket->recv_socket.cb_connection_closed = subscribe_socket_on_connection_closed;
    netio_listen_tcp(&socket->recv_socket, hostname, 0);

  log_dbg("socket=%p   recv_socket.port=%d\n",socket,socket->recv_socket.port);
  socket->msg.port=socket->recv_socket.port;

    socket->remote_hostname = strdup(remote_host);
    socket->remote_port = remote_port;

    socket->total_tags = 0;
}

void netio_unbuffered_subscribe_socket_init(struct netio_unbuffered_subscribe_socket* socket,
                                            struct netio_context* ctx,
                                            const char* hostname,
                                            const char* remote_host,
                                            unsigned remote_port,
                                            //struct netio_buffer* buffers,
                                            size_t buffer_size,
                                            size_t count)
{
  if (netio_tcp_mode(hostname)) {
    netio_unbuffered_subscribe_tcp_socket_init(socket, ctx, netio_hostname(hostname), netio_hostname(remote_host), remote_port, buffer_size, count);
  } else {
    netio_unbuffered_subscribe_libfabric_socket_init(socket, ctx, netio_hostname(hostname), netio_hostname(remote_host), remote_port, buffer_size, count);
  }
}

void
netio_unbuffered_send_socket_init(struct netio_send_socket* socket, struct netio_context* ctx)
{
  netio_init_send_socket(socket, ctx);
  socket->cb_internal_connection_closed = on_unbuffered_send_connection_closed;
}


int
netio_unbuffered_subscribe(struct netio_unbuffered_subscribe_socket* socket, netio_tag_t tag)
{
  if(socket->state == NONE) {
    log_dbg("Creating and connecting a new socket");
    if (socket->tcp_fi_mode  == NETIO_MODE_LIBFABRIC) {
          netio_init_send_socket(&socket->socket, socket->ctx);
          socket->socket.usr = socket;
          socket->socket.cb_connection_established = subsocket_on_connection_established;
      socket->socket.cb_connection_closed = subsocket_on_connection_closed;
          socket->socket.cb_send_completed = subsocket_on_send_completed;
      socket->socket.cb_error_connection_refused = subsocket_on_error_connection_refused;
      socket->socket.cb_internal_connection_closed = on_unbuffered_send_connection_closed;
          netio_connect(&socket->socket, socket->remote_hostname, socket->remote_port);
    }
    else if (socket->tcp_fi_mode  == NETIO_MODE_TCP) {
      netio_init_send_tcp_socket(&socket->socket, socket->ctx);
      socket->socket.usr = socket;
      socket->socket.cb_connection_established = subsocket_on_connection_established;
      socket->socket.cb_connection_closed = subsocket_on_connection_closed;
      socket->socket.cb_error_connection_refused = subsocket_on_error_connection_refused;
      socket->socket.cb_send_completed = subsocket_on_send_completed;
      socket->socket.cb_internal_connection_closed = on_unbuffered_send_connection_closed;
      netio_connect_tcp(&socket->socket, socket->remote_hostname, socket->remote_port);
    } else {
      log_error("Unsupported connection type %d", socket->tcp_fi_mode);
      return 1;
    }
        socket->state = INITIALIZED;
    }

  for(unsigned i=0; i<socket->total_tags; i++) {
    if(socket->tags_to_subscribe[i] == tag) {
      return 0;
    }
  }

  //if send socket connected send message
  //otherwise on_connection_established will do it
  if (socket->socket.state){
    log_info("Sending subscription message for tag 0x%lx", tag);
    int ret = send_subscription_message(socket, tag, NETIO_SUBSCRIBE);
    return ret;
  } else {
    log_info("Queing subscription message for txg 0x%lx", tag);
    socket->tags_to_subscribe[socket->total_tags] = tag;
    socket->total_tags++;
    return 0;
  }

  return 0;
}


static int
remove_tag_to_subscribe(struct netio_unbuffered_subscribe_socket* socket, netio_tag_t tag)
{
  int found = 0;
  for(unsigned int i=0; i<socket->total_tags; ++i){
    if(socket->tags_to_subscribe[i] == tag){
      log_info("Removing tag 0x%lx from tags_to_subscribe of socket %p", socket);
      for(unsigned int j = i; j < socket->total_tags-1; ++j){
        socket->tags_to_subscribe[j] = socket->tags_to_subscribe[j+1];
      }
      found = 1;
      socket->total_tags--;
      break;
    }
  }
  if(found == 0){
    log_warn("tag 0x%lx expected in tags_to_subscribe of socket %p not found", tag);
  }
  return NETIO_STATUS_OK;
}


int
netio_unbuffered_unsubscribe(struct netio_unbuffered_subscribe_socket* socket, netio_tag_t tag)
{
  int ret = NETIO_STATUS_OK;
  if(socket->state == INITIALIZED) {
    log_dbg("Subscribe socket initialised, can proceed with unsubscription");
    if (socket->socket.state) {
      ret = send_subscription_message(socket, tag, NETIO_UNSUBSCRIBE);
      log_dbg("Unsubscription message for tag 0x%lx sent, return code %d", tag, ret);
    } else {
      ret = remove_tag_to_subscribe(socket, tag);
    }
  } else {
    log_dbg("The connection has been already closed.");
  }
  return ret;
}


unsigned
netio_pubsocket_get_available_co(struct netio_unbuffered_publish_socket* socket)
{
  if(socket != NULL){
    return socket->completion_stack.available_objects;
  } else {
    return 0;
  }
}