Program Listing for File netio.c

Return to documentation for file (netio.c)

#include <unistd.h>
#include <stdio.h>
#include <sys/uio.h>
#include <string.h>
#include "netio/netio.h"
#include "netio/netio_tcp.h"
#include "connection_event.h"
#include "completion_event.h"
#include <stdlib.h>
#include <sys/types.h>
#include <errno.h>
#include <netdb.h>
#include <fcntl.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "log.h"

#if defined DEBUG || defined DEBUG_IO
#define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
#define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
#define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
#else
#define log_dbg(...)
#define log_trc(...)
#endif

# define ERROR_LOG( ... ) do { log_fatal(__VA_ARGS__); fflush(stdout); exit(-2);} while(0)

#define ON_ERROR_CONNECTION_REFUSED(s, msg, c) \
do { \
  s->fi_errno = -c; \
  s->fi_message = strdup(msg); \
    netio_error_connection_refused_fire(s); \
} while(0);

#define ON_ERROR_BIND_REFUSED(s, msg, c) \
do { \
  s->fi_errno = -c; \
  s->fi_message = strdup(msg); \
    netio_error_bind_refused_fire(s); \
} while(0);


//Globals


// STATIC FUNCTIONS ////////////////////////////////////////////////////////////

static int
_socket_init_info(struct netio_send_socket* socket, const char* hostname, unsigned port, void* addr, size_t addrlen)
{
  log_dbg("Going trough _socket_init_info");
  int ret=0;
  struct fi_info* hints;

  hints = fi_allocinfo();
  hints->addr_format = FI_FORMAT_UNSPEC;
  hints->ep_attr->type  = FI_EP_MSG;
  hints->caps = FI_MSG;
  hints->mode   = FI_LOCAL_MR;
  // As of libfabric 1.10, the tcp provider only support FI_PROGRESS_MANUAL
  // So the following will not allow the tcp provider to be used
  hints->domain_attr->data_progress = FI_PROGRESS_AUTO;
  hints->domain_attr->resource_mgmt = FI_RM_ENABLED;

  char port_addr[32];
  snprintf(port_addr, 32, "%u", port);
  log_dbg("connecting to endpoint %s:%u", hostname, port);

  uint64_t flags = 0;
  if(hostname == NULL) {
    hostname = "127.0.0.1";
    flags = FI_SOURCE;
  }

  if(addr) {
    // TODO we have to convert the sockaddr_in into hostname port - why can't we directly supply the addr+addrlen in hints->dest_addr?
    struct sockaddr_in* sockaddr = (struct sockaddr_in*)addr;
    char* str_addr = inet_ntoa(sockaddr->sin_addr);
    log_dbg("sockaddr: %s:%d", str_addr, ntohs(sockaddr->sin_port));

    hostname  = str_addr;
    snprintf(port_addr, 32, "%d", ntohs(sockaddr->sin_port));
    flags = 0;
  }

  if((ret = fi_getinfo(FI_VERSION(1, 1), hostname, port_addr, flags, hints, &socket->fi)))
  {
    fi_freeinfo(hints);
    log_error("Failed to initialise socket, error %d: %s", ret, fi_strerror(-ret));
    ON_ERROR_CONNECTION_REFUSED(socket, "fi_getinfo failed", ret);
    return -1;
  }

  log_dbg("addr format: %x", socket->fi->addr_format);
  log_dbg("fi_freeinfo");
  fi_freeinfo(hints);
  return 0;
}


static int
_socket_init_domain(struct netio_send_socket* socket)
{
  int ret=0;
  struct netio_domain *domain = malloc(sizeof(struct netio_domain));
  domain->reg_mr = 0;
  domain->mr = malloc(NETIO_DOMAIN_MAX_MR*sizeof(struct fid*));
  domain->nb_sockets = 1;
  socket->domain = domain;

  if((ret = fi_fabric(socket->fi->fabric_attr, &domain->fabric, NULL)))
  {
    log_error("Failed to initialise fabric, error %d: %s", ret, fi_strerror(-ret));
    ON_ERROR_CONNECTION_REFUSED(socket, "fi_fabric failed", ret);
    return -1;
  }

  if((ret = fi_domain(domain->fabric, socket->fi, &domain->domain, NULL)))
  {
    log_error("Failed to initialise domain, error %d: %s", ret, fi_strerror(-ret));
    ON_ERROR_CONNECTION_REFUSED(socket, "fi_domain failed", ret);
    return -1;
  }
  return 0;
}

static int
_socket_connect(struct netio_send_socket* socket)
{
  int ret=0;
  struct fi_eq_attr eq_attr = {10000, 0, FI_WAIT_FD, 0, 0};

  //Resources initialisation
  socket->eqfd = -1;
  socket->cqfd = -1;
  socket->ep = NULL;
  socket->eq = NULL;
  socket->cq = NULL;

  if((ret = fi_eq_open(socket->domain->fabric, &eq_attr, &socket->eq, NULL)))
  {
    log_error("Failed to open Event Queue for send socket, error %d: %s", ret, fi_strerror(-ret));
    ON_ERROR_CONNECTION_REFUSED(socket, "fi_eq_open failed", ret);
    return -1;
  }

  if((ret = fi_endpoint(socket->domain->domain, socket->fi, &socket->ep, NULL)))
  {
    log_error("Failed to open Endpoint for send socket, error %d: %s", ret, fi_strerror(-ret));
    ON_ERROR_CONNECTION_REFUSED(socket, "fi_endpoint failed", ret);
    return -1;
  }

  if((ret = fi_ep_bind((socket->ep), &socket->eq->fid, 0)))
  {
    log_error("Failed to bind endpoint, error %d: %s", ret, fi_strerror(-ret));
    ON_ERROR_CONNECTION_REFUSED(socket, "fi_ep_bind failed", ret);
    return -1;
  }

  struct fi_cq_attr cq_attr;
  cq_attr.size = NETIO_MAX_CQ_ENTRIES;      /* # entries for CQ */
  cq_attr.flags = 0;     /* operation flags */
  cq_attr.format = FI_CQ_FORMAT_DATA; //FI_CQ_FORMAT_CONTEXT;    /* completion format */
  cq_attr.wait_obj= FI_WAIT_FD;  /* requested wait object */
  cq_attr.signaling_vector = 0; /* interrupt affinity */
  cq_attr.wait_cond = FI_CQ_COND_NONE; /* wait condition format */ // The threshold indicates the number of entries that are to be queued before at the CQ before the wait is satisfied.
  cq_attr.wait_set = NULL;  /* optional wait set */

  //FI_TRANSMIT CQ
  if((ret = fi_cq_open(socket->domain->domain, &cq_attr, &socket->cq, NULL)) != 0)
  {
    log_error("Failed to open Completion Queue for send socket, error %d: %s", ret, fi_strerror(-ret));
    ON_ERROR_CONNECTION_REFUSED(socket, "fi_cq_open failed", ret);
    return -1;
  }

  if((ret = fi_ep_bind((socket->ep), &socket->cq->fid, FI_TRANSMIT)) != 0)
  {
    log_error("Failed to open Completion Queue for send socket, error %d: %s", ret, fi_strerror(-ret));
    ON_ERROR_CONNECTION_REFUSED(socket, "fi_ep_bind failed", ret);
    return -1;
  }

  //FI_RECV CQ - also necessary
  cq_attr.format = FI_CQ_FORMAT_UNSPEC;
  cq_attr.wait_obj= FI_WAIT_NONE;
  if((ret = fi_cq_open(socket->domain->domain, &cq_attr, &socket->rcq, NULL)) != 0)
  {
    log_error("Failed to open Completion Queue for send socket, error %d: %s", ret, fi_strerror(-ret));
    ON_ERROR_CONNECTION_REFUSED(socket, "fi_cq_open failed", ret);
    return -1;
  }

  if((ret = fi_ep_bind((socket->ep), &socket->rcq->fid, FI_RECV)) != 0)
  {
    log_error("Failed to open Completion Queue for send socket, error %d: %s", ret, fi_strerror(-ret));
    ON_ERROR_CONNECTION_REFUSED(socket, "fi_ep_bind failed", ret);
    return -1;
  }


  if((ret = fi_enable(socket->ep)) != 0)
  {
    log_error("Failed to enable endpoint for send socket, error %d: %s", ret, fi_strerror(-ret));
    ON_ERROR_CONNECTION_REFUSED(socket, "fi_enable failed", ret);
    return -1;
  }

  /* Connect to server */
  if((ret = fi_connect(socket->ep, socket->fi->dest_addr, NULL, 0)) != 0)
  {
    log_warn("Connection to remote failed, error %d: %s", ret, fi_strerror(-ret));
    ON_ERROR_CONNECTION_REFUSED(socket, "fi_connect failed", ret);
    return -1;
  }

  if((ret = fi_control(&socket->eq->fid, FI_GETWAIT, &socket->eqfd)) != 0)
  {
    log_error("Cannot retrieve the Event Queue wait object of send socket, error %d: %s", ret, fi_strerror(-ret));
    ON_ERROR_CONNECTION_REFUSED(socket, "fi_control failed", ret);
    return -1;
  }

  socket->eq_ev_ctx.fd = socket->eqfd;
  socket->eq_ev_ctx.data = socket;
  socket->eq_ev_ctx.cb = on_send_socket_libfabric_cm_event;

  log_dbg("Adding SEND EQ polled fid %d %p \n", socket->eqfd, &socket->eq->fid);
  add_polled_fid(&socket->ctx->evloop.pfids, socket->domain->fabric, &socket->eq->fid, socket->eqfd, socket, on_send_socket_libfabric_cm_event);
  add_open_fd(&socket->ctx->evloop.openfds, socket->eqfd, NETIO_EQ, USEND, socket);
  netio_register_read_fd(&socket->ctx->evloop, &socket->eq_ev_ctx);
  log_dbg("send_socket: EQ fd %d waiting for connection", socket->eqfd);

  return 0;
}


// API FUNCTIONS ///////////////////////////////////////////////////////////////

void
netio_set_debug_level(int level)
{
  log_set_level(level);
}

int netio_tcp_mode(const char* hostname) {
  return (strncmp( hostname, "tcp:", 4) == 0);
}

const char* netio_protocol(const char* hostname) {
  return netio_tcp_mode(hostname) ? "tcp" : "libfabric";
}

const char* netio_hostname(const char* hostname) {
  const char* split = strchr(hostname, ':');
  return split ? &split[1] : hostname;
}

void
netio_init(struct netio_context* ctx)
{
  log_set_level(DEFAULT_DEBUG_LEVEL);
  memset(ctx, 0, sizeof(*ctx));
  netio_eventloop_init(&ctx->evloop);
}


void
netio_init_send_socket(struct netio_send_socket* socket, struct netio_context* ctx)
{
  memset(socket, 0, sizeof(*socket));
  socket->ctx = ctx;
  socket->epollfd = socket->ctx->evloop.epollfd;
  socket->state = UNCONNECTED;
  socket->tcp_fi_mode = NETIO_MODE_LIBFABRIC;
  socket->cq_size = NETIO_MAX_CQ_EVENTS;
  socket->unbuf_pub_socket = NULL;
  socket->cb_internal_connection_closed = NULL;
  socket->deferred_subs = NULL;
  socket->recv_socket = NULL;
}


void
netio_init_listen_socket(struct netio_listen_socket* socket, struct netio_context* ctx, struct netio_unbuffered_socket_attr* attr)
{
  memset(socket, 0, sizeof(*socket));
  socket->ctx = ctx;
  socket->recv_sockets = NULL;
  socket->tcp_fi_mode = NETIO_MODE_LIBFABRIC;
  if (attr == NULL){
    socket->attr.buffer_size = 0;
    socket->attr.num_buffers = 0;
  } else {
    if (attr->num_buffers > NETIO_DOMAIN_MAX_MR){
      log_error("Number of recv buffers requested %lu exceeds max value. Using %lu.", attr->num_buffers, NETIO_DOMAIN_MAX_MR);
      attr->num_buffers = NETIO_DOMAIN_MAX_MR;
    }
    socket->attr = *attr;
  }
}


void
netio_init_recv_socket(struct netio_recv_socket* socket, struct netio_listen_socket* lsocket)
{
  memset(socket, 0, sizeof(*socket));
  socket->ctx = lsocket->ctx;
  socket->lsocket = lsocket;
  socket->reg_mr = 0;
  socket->cq_size = NETIO_MAX_CQ_EVENTS;
  socket->sub_msg_buffers = NULL;
  socket->mr = malloc(NETIO_DOMAIN_MAX_MR*sizeof(struct fid*));
  socket->tcp_fi_mode = NETIO_MODE_LIBFABRIC;
}

void
netio_send_socket_init_and_connect(struct netio_send_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port) {
  int tcp = netio_tcp_mode(hostname);
  const char* host = netio_hostname(hostname);
  if (tcp) {
    netio_init_send_tcp_socket(socket, ctx);
    netio_connect_tcp(socket, host, port);
  } else {
    netio_init_send_socket(socket, ctx);
    netio_connect_domain(socket, host, port, NULL);
  }
}

void
netio_listen_socket_init_and_listen(struct netio_listen_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_unbuffered_socket_attr* attr) {
  int tcp = netio_tcp_mode(hostname);
  const char* host = netio_hostname(hostname);
  if (tcp) {
    netio_init_listen_tcp_socket(socket, ctx, attr);
    netio_listen_tcp(socket, host, port);
  } else {
    netio_init_listen_socket(socket, ctx, attr);
    netio_listen(socket, host, port);
  }
}

void
netio_connect(struct netio_send_socket* socket, const char* hostname, unsigned port)
{
  if (socket->tcp_fi_mode == NETIO_MODE_TCP){
    netio_connect_tcp(socket, netio_hostname(hostname), port);
  }
  else{
    netio_connect_domain(socket, netio_hostname(hostname), port, NULL);
  }
}

void
netio_connect_domain(struct netio_send_socket* socket, const char* hostname, unsigned port, struct netio_domain* domain)
{
  log_dbg("_socket_init_info");
  if ((_socket_init_info(socket, hostname, port, NULL, 0))) return;
  if (domain == NULL) {
    log_dbg("_socket_init_domain in netio_connect_domain");
    if ( _socket_init_domain(socket) ) return;
  } else {
    domain->nb_sockets += 1;
    socket->domain = domain;
  }
  _socket_connect(socket);
}


void netio_connect_rawaddr(struct netio_send_socket* socket, void* addr, size_t addrlen)
{
  netio_connect_rawaddr_domain(socket, addr, addrlen, NULL);
}


void netio_connect_rawaddr_domain(struct netio_send_socket* socket, void* addr, size_t addrlen, struct netio_domain* domain)
{
  log_dbg("_socket_init_info");
  if ((_socket_init_info(socket, NULL, 0, addr, addrlen))) return;
  if (domain == NULL) {
    log_dbg("_socket_init_domain in netio_connect_rawaddr_domain");
    if ( _socket_init_domain(socket) ) return;
  } else {
    log_dbg("socket %p keeping domain %p with %d sockets", socket, domain, domain->nb_sockets);
    domain->nb_sockets += 1;
    socket->domain = domain;
  }
  _socket_connect(socket);
}


void
netio_disconnect(struct netio_send_socket* socket)
{
  if(socket->tcp_fi_mode == NETIO_MODE_TCP){
    shutdown(socket->cq_ev_ctx.fd, SHUT_RDWR);
  } else {
    if(!socket->ep) {
      log_dbg("Send socket %p EP not initialised, not calling fi_shutdown.", socket);
      return;
    }
    int ret=0;
    if((ret = fi_shutdown(socket->ep, 0))){
      log_error("Failed to shutdown send_socket, error %d: %s", ret, fi_strerror(-ret));
      ON_ERROR_CONNECTION_REFUSED(socket, "fi_shutdown failed", ret);
      return;
    }
  }
}



void
netio_connection_shutdown(void* ptr)
{
  log_dbg("Handle_connection_shutdown.");
  struct signal_data* sd = (struct signal_data*)ptr;
  struct netio_send_socket* socket = (struct netio_send_socket*)sd->ptr;
  int ret=0;
  if(!socket->ep){
    log_dbg("Send socket %p EP not initialised, not calling fi_shutdown.", socket);
    return;
  }
  if((ret = fi_shutdown(socket->ep, 0)))
  {
    ON_ERROR_CONNECTION_REFUSED(socket, "fi_shutdown failed", ret);
    return;
  }
  //clean up signal
  netio_signal_close(sd->evloop, sd->signal);
  free(sd->signal);
  free(sd);
}


void
netio_listen(struct netio_listen_socket* socket, const char* hostname, unsigned port)
{
  int ret=0;
  struct fi_info* hints;
  struct fi_eq_attr eq_attr = {10000, 0, FI_WAIT_FD, 0, 0};

  hints = fi_allocinfo();
  hints->addr_format = FI_FORMAT_UNSPEC;
  hints->ep_attr->type  = FI_EP_MSG;
  hints->caps = FI_MSG;
  hints->mode = FI_LOCAL_MR;
  char port_addr[32];
  snprintf(port_addr, 32, "%u", port);

  //Resource initialisation
  socket->eqfd = -1;
  socket->pep = NULL;
  socket->eq = NULL;
  socket->fi = NULL;

  if((ret = fi_getinfo(FI_VERSION(1, 1), hostname, port_addr, FI_SOURCE, hints,
                       &socket->fi)))
  {
    log_error("Failed to get info on local interface, error %d: %s", ret, fi_strerror(-ret));
    ON_ERROR_BIND_REFUSED(socket, "fi_getinfo failed", ret);
    return;
  }
  log_dbg("addr format: %x", socket->fi->addr_format);

  if((ret = fi_fabric(socket->fi->fabric_attr, &socket->fabric, NULL)))
  {
    log_error("Failed to open fabric for listen socket, error %d: %s", ret, fi_strerror(-ret));
    ON_ERROR_BIND_REFUSED(socket, "fi_fabric failed", ret);
    return;
  }
  if((ret = fi_eq_open(socket->fabric, &eq_attr, &socket->eq, NULL)))
  {
    log_error("Failed to open Event Queue for listen socket, error %d: %s", ret, fi_strerror(-ret));
    ON_ERROR_BIND_REFUSED(socket, "fi_eq_open failed", ret);
    return;
  }

  if((ret = fi_passive_ep(socket->fabric, socket->fi, &socket->pep, NULL)))
  {
    log_error("Failed to open passive endpoint for listen socket, error %d: %s", ret, fi_strerror(-ret));
    ON_ERROR_BIND_REFUSED(socket, "fi_passive_ep failed", ret);
    return;
  }

  if((ret = fi_pep_bind(socket->pep, &socket->eq->fid, 0)))
  {
    log_error("Failed to bind passive endpoint for listen socket, error %d: %s", ret, fi_strerror(-ret));
    ON_ERROR_BIND_REFUSED(socket, "fi_pep_bind failed", ret);
    return;
  }

  if((ret = fi_listen(socket->pep)))
  {
    log_error("Failed to enable listen socket, error %d: %s", ret, fi_strerror(-ret));
    ON_ERROR_BIND_REFUSED(socket, "fi_listen failed", ret);
    return;
  }

  if((ret = fi_control(&socket->eq->fid, FI_GETWAIT, &socket->eqfd)))
  {
    log_error("Failed to retrive listen socket Event Queue wait object, error %d: %s", ret, fi_strerror(-ret));
    ON_ERROR_BIND_REFUSED(socket, "fi_control failed", ret);
    return;
  }

  socket->eq_ev_ctx.fd = socket->eqfd;
  socket->eq_ev_ctx.data = socket;
  socket->eq_ev_ctx.cb = on_listen_socket_libfabric_cm_event;

  //TODO: The listen EQ should be added too, but I don't know how to remove it to avoid a trywait SEGFAULT.
  //printf("Adding LISTEN EQ polled fid %d %p \n", socket->eqfd, &socket->eq->fid);
  //add_polled_fid(&socket->ctx->evloop.pfids, socket->fabric, &socket->eq->fid, socket->eqfd, socket, on_listen_socket_cm_event);
  add_open_fd(&socket->ctx->evloop.openfds, socket->eqfd, NETIO_EQ, ULISTEN, socket);

  netio_register_read_fd(&socket->ctx->evloop, &socket->eq_ev_ctx);
  log_dbg("netio_listen_socket: registering EQ fd %d", socket->eqfd);
  fi_freeinfo(hints);
}


size_t netio_listen_socket_endpoint(struct netio_listen_socket* socket, struct sockaddr_storage *sa)
{
  size_t addrlen;
  //memset(sa, 0, sizeof(*sa));
  addrlen = sizeof(struct sockaddr_storage);

  int ret=0;
  if((ret = fi_getname(&socket->pep->fid, sa, &addrlen)))
  {
    log_error("Failed to retrieve the local endpoint address, error %d: %s", ret, fi_strerror(-ret));
    ON_ERROR_BIND_REFUSED(socket, "fi_getname failed", ret);
    return 0;
  }

  return addrlen;
}


void
netio_post_recv(struct netio_recv_socket* socket, struct netio_buffer* buf)
{
  struct iovec iov;
  void* desc;

  struct netio_tcp_recv_item *mrdn;

  if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC)
  {

    iov.iov_base = buf->data;
    iov.iov_len = buf->size;
    desc = fi_mr_desc(buf->mr);

    struct fi_msg msg;
    msg.msg_iov = &iov; /* scatter-gather array */
    msg.desc = &desc;
    msg.iov_count = 1;
    msg.addr = 0;
    msg.context = buf;
    msg.data = 0;

    uint64_t flags;
    flags = FI_REMOTE_CQ_DATA;//FI_MULTI_RECV;

    int ret=0;
    if((ret = fi_recvmsg(socket->ep, &msg, flags)) != 0)
    {
      log_error("Failed to post a buffer to receive inbound messages, error %d: %s", ret, fi_strerror(-ret));
    }
  }
  else
  {
    //Allocate memory for a message request descriptor
    struct netio_tcp_recv_item *mrd;
    mrd = (struct netio_tcp_recv_item *) malloc(sizeof(struct netio_tcp_recv_item));
    if(mrd == NULL)
    {
      ERROR_LOG("cannot allocate memory for descriptor");
    }
    /* log_debug("mrd is at %p", (void *)mrd); */

    mrd->element_active = 1;           //MJ do we need this variable?
    mrd->socket         = socket;      //this is a netio_recv_socket
    mrd->buffer         = buf;
    mrd->next_element   = NULL;
    mrd->bytes_received = 0;
    mrd->message_size   = 0;

    /* log_debug("receive descriptor allocated and initialized"); */

    //Append the descriptor to the list
    if(socket->message_request_header == NULL)
    {
      socket->message_request_header = (void *)mrd;
      /* log_debug("descriptor linked to head of queue"); */
    }
    else
    {
      int free_item = 1;
      struct netio_tcp_recv_item *mrdq;
      mrdq = (struct netio_tcp_recv_item *)socket->message_request_header;
      /* log_debug("Head of list points at = %p", mrdq); */
      int mrd_linked = 0;

      do
      {
        if (mrdq->next_element == NULL)
        {
          mrdq->next_element = (void *)mrd;
          mrd_linked = 1;
          //                log_error("End of list found at entry %d. Appending mrd to the desriptor at %p", free_item, mrdq);
        }
        else
        {
          mrdn = (struct netio_tcp_recv_item *)mrdq->next_element;
          //                log_error("item = %d. %p points at %p", free_item, mrdq, mrdn);
          free_item++;
          mrdq = mrdn;
        }
      }
      while(!mrd_linked);
    }
    /*
    log_debug("Calling netio_signal_fire");
    log_debug("&socket->tcp_signal = %p", &socket->tcp_signal);
    netio_signal_fire(&socket->tcp_signal);
    */
    return;
  }
}

void
netio_remove_recv_socket(struct netio_recv_socket* socket){
  struct netio_listen_socket* lsocket = socket->lsocket;
  int ret = remove_socket(&lsocket->recv_sockets, (void*)socket);
  if (ret == 0){
    log_info("Unbuffered connection closed, recv socket deleted.");
  } else {
    log_warn("Unbuffered connection closed, could not delete recv socket.");
  }
}

void
netio_buffered_remove_recv_socket(struct netio_buffered_recv_socket* socket){
  struct netio_buffered_listen_socket* lsocket = socket->lsocket;
  int ret = remove_socket(&(lsocket->listen_socket.recv_sockets), (void*)socket);
  if (ret == 0){
    log_info("Buffered connection closed, buffered recv socket deleted.");
  } else {
    log_warn("Buffered connection closed, could not delete recv socket.");
  }
}

static int
generic_sendmsg(struct netio_send_socket* socket,
                struct iovec* iov,
                void** desc,
                size_t count,
                uint64_t key,
                uint64_t add_flags,
                uint64_t imm)
{
  int ret=0;
  uint64_t flags;

  struct fi_msg msg;
  msg.msg_iov = iov; /* scatter-gather array */
  msg.desc = desc;
  msg.iov_count = count;
  msg.addr = 0;
  msg.context = (void*)key;
  msg.data = imm;

  log_trc("sending iov message with immediate value 0x%lx", msg.data);

  flags = FI_INJECT_COMPLETE | add_flags;

  if(!socket->ep || !socket->ep->msg){
    log_error("Failed sending message because of null message or null endpoint.");
    return NETIO_STATUS_ERROR;
  }

  if((ret = fi_sendmsg(socket->ep, &msg, flags)) != 0)
  {
      if(ret == -FI_EAGAIN) {
          return NETIO_STATUS_AGAIN;
      }
      log_error("Failed to send message error (IOV count %lu, key %lu) %d: %s.", count, key, ret, fi_strerror(-ret));
      return NETIO_STATUS_ERROR;
  }
  return NETIO_STATUS_OK;
}


int
netio_send_buffer(struct netio_send_socket* socket, struct netio_buffer* buf)
{
  return netio_send(socket, buf, buf->data, buf->size, (uint64_t)buf);
}


int
netio_send_inline_buffer(struct netio_send_socket* socket, struct netio_buffer* buf)
{
  struct iovec iov;
  void* desc;

  iov.iov_base = buf->data;
  iov.iov_len = buf->size;
  desc = fi_mr_desc(buf->mr);
  uint64_t key = (uint64_t)buf;
  uint64_t flags = FI_INJECT;

  return generic_sendmsg(socket, /* struct netio_send_socket* socket */
                         &iov,   /* struct iovec* iov */
                         &desc,  /* void** desc */
                         1,      /* size_t count */
                         key,    /* uint64_t key */
                         flags,      /* uint64_t add_flags */
                         0       /* uint64_t imm */
         );
}


int
netio_send(struct netio_send_socket* socket, struct netio_buffer* buf, void* addr, size_t size, uint64_t key)
{
  struct iovec iov;
  void* desc;
  struct netio_tcp_send_item *mrdn;

  if (socket->tcp_fi_mode == NETIO_MODE_LIBFABRIC)
  {
    iov.iov_base = addr;
    iov.iov_len = size;
    desc = fi_mr_desc(buf->mr);

    return generic_sendmsg(socket, /* struct netio_send_socket* socket */
                           &iov,   /* struct iovec* iov */
                           &desc,  /* void** desc */
                           1,      /* size_t count */
                           key,    /* uint64_t key */
                           0,      /* uint64_t add_flags */
                           0       /* uint64_t imm */
           );
  }
  else
  {
    if (socket->state!=CONNECTED)
    {
      log_warn("socket not connected (state=%d)",socket->state);
    }
    //Allocate memory for a message request descriptor
    struct netio_tcp_send_item *mrd;
    mrd = (struct netio_tcp_send_item *) malloc(sizeof(struct netio_tcp_send_item));
    if(mrd == NULL)
    {
      ERROR_LOG("cannot allocate memory for descriptor");
    }
    mrd->element_active = NETIO_TCP_NEW;
    mrd->socket         = socket;
    mrd->buffer         = buf;
    log_dbg("netio-tcp: setting buffer size to msg size and buffer data to addr");
    mrd->buffer->size = size;
    mrd->buffer->data = addr;
    mrd->total_bytes    = size;
    mrd->bytes_left     = sizeof(int);
    mrd->next_element   = NULL;
    mrd->key            = (uint64_t) buf;
    //    log_debug("send descriptor allocated and initialized");

    //Append the descriptor to the list
    if(socket->message_request_header == NULL)
    {
      socket->message_request_header = (void *)mrd;
      //      log_debug("List was empty. Descriptor linked to head of list");
    }
    else
    {
      int free_item = 1;
      struct netio_tcp_send_item *mrdq;
      mrdq = (struct netio_tcp_send_item *)socket->message_request_header;
      //      log_debug("Head of list points at = %p", mrdq);
      int mrd_linked = 0;

      do
      {
        if (mrdq->next_element == NULL)
        {
          mrdq->next_element = (void *)mrd;
          mrd_linked = 1;
          //                log_debug("End of list found at entry %d. Appending mrd to the desriptor at %p", free_item, mrdq);
        }
        else
        {
          mrdn = (struct netio_tcp_send_item *)mrdq->next_element;
          //                log_debug("Item = %d. %p points at %p", free_item, mrdq, mrdn);
          free_item++;
          mrdq = mrdn;
        }
      }
      while(!mrd_linked);
    }

    //    log_debug("Calling netio_signal_fire for signal at %p", &socket->tcp_signal);
    netio_signal_fire(&socket->tcp_signal);

    //    log_info("done");
    return(NETIO_STATUS_OK);
  }

}


int
netio_send_imm(struct netio_send_socket* socket, struct netio_buffer* buf, void* addr, size_t size, uint64_t key, uint64_t imm)
{
  struct iovec iov;
  void* desc;

  iov.iov_base = addr;
  iov.iov_len = size;
  desc = fi_mr_desc(buf->mr);

  return generic_sendmsg(socket, /* struct netio_send_socket* socket */
                         &iov,   /* struct iovec* iov */
                         &desc,  /* void** desc */
                         1,      /* size_t count */
                         key,    /* uint64_t key */
                         FI_REMOTE_CQ_DATA,      /* uint64_t add_flags */
                         imm     /* uint64_t imm */
         );
}


int
netio_sendv(struct netio_send_socket* socket, struct netio_buffer** buf, struct iovec* iov, size_t count, uint64_t key)
{
  void* descarray[NETIO_MAX_IOV_LEN];

  for(unsigned i=0; i<count; i++) {
    descarray[i] = fi_mr_desc(buf[i]->mr);
  }

  return generic_sendmsg(socket, /* struct netio_send_socket* socket */
                         iov,    /* struct iovec* iov */
                         descarray,  /* void** desc */
                         count,      /* size_t count */
                         key,    /* uint64_t key */
                         0,      /* uint64_t add_flags */
                         0       /* uint64_t imm */
         );
}


int
netio_sendv_imm(struct netio_send_socket* socket, struct netio_buffer** buf, struct iovec* iov, size_t count, uint64_t key, uint64_t imm)
{
  void* descarray[NETIO_MAX_IOV_LEN];

  for(unsigned i=0; i<count; i++) {
    descarray[i] = fi_mr_desc(buf[i]->mr);
  }

  return generic_sendmsg(socket, /* struct netio_send_socket* socket */
                         iov,    /* struct iovec* iov */
                         descarray,  /* void** desc */
                         count,      /* size_t count */
                         key,    /* uint64_t key */
                         FI_REMOTE_CQ_DATA,      /* uint64_t add_flags */
                         imm     /* uint64_t imm */
         );
}