Program Listing for File buffered.c

Return to documentation for file (buffered.c)

#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include "log.h"
#include "connection_event.h"
#include "netio/netio.h"
#include "netio/netio_tcp.h"

#if defined DEBUG || defined DEBUG_BUF
#define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
#define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
#define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
#else
#define log_dbg(...)
#define log_trc(...)
#endif

#define FATAL(msg, c) \
do { \
    log_fatal("%s %d: %s", msg, c, fi_strerror(-c)); \
    exit(2); \
} while(0);


/* This type is used as a length-marker in buffers for encoded messages.
   Encoding of buffers is as follows: LEN0|MSG0|LEN1|MSG1|...|LEN_N|MSG_N */
typedef uint32_t msg_size_t;


// STATIC FUNCTIONS ////////////////////////////////////////////////////////////


static int
flush(struct netio_buffered_send_socket* socket)
{
  int ret = NETIO_STATUS_OK;
  if(socket->current_buffer)
  {
    socket->current_buffer->size = socket->pos;
    int send_status = netio_send_buffer(&socket->send_socket, socket->current_buffer);
    if (send_status == NETIO_STATUS_AGAIN){
      socket->busy = 1;
      log_dbg("netio_send_buffer returned %d, trying again", ret);
      ret = NETIO_STATUS_AGAIN;
    } else {
      socket->busy = 0;
      socket->current_buffer = NULL;
      if(socket->timeout_ms != 0){
        netio_timer_stop(&socket->flush_timer);
      }
    }
  } else { //there is no current buffer. disable busy if on
    socket->busy = 0;
  }
  return ret;
}


static void
flush_cb(void* ptr)
{
  struct netio_buffered_send_socket* socket = (struct netio_buffered_send_socket*)ptr;
  flush(socket);
}


static void
on_send_completed(struct netio_send_socket* socket, uint64_t key)
{
    struct netio_buffer* buf = (struct netio_buffer*)key;
  struct netio_buffered_send_socket* bs = (struct netio_buffered_send_socket*)(socket->usr);
  if(netio_bufferstack_push(&bs->buffers, buf)) {
    log_fatal("The buffer stack exceeded its limits.");
    exit(1);
  }
    if(bs->buffers.available_buffers == 1) {
        netio_signal_fire(&bs->signal_buffer_available);
    }
}

static void
on_connect(struct netio_send_socket* socket)
{
  struct netio_buffered_send_socket* bs = (struct netio_buffered_send_socket*)(socket->usr);
  netio_bufferstack_send_init(&bs->buffers, socket, bs->num_pages, bs->buffersize, 0);
  if(bs->cb_connection_established) {
    bs->cb_connection_established(bs);
  }
}

static void
on_buf_send_socket_connection_closed(struct netio_send_socket* ssocket)
{
    log_dbg("on_buf_send_socket_connection_closed callback");
    struct netio_buffered_send_socket* bs = (struct netio_buffered_send_socket*)(ssocket->usr);
    log_dbg("removing buffer available signal fd %d from evloop %d", bs->signal_buffer_available.ev_ctx.fd, bs->signal_buffer_available.epollfd);
    netio_signal_close(&ssocket->ctx->evloop, &bs->signal_buffer_available);
    if(bs->timeout_ms != 0){
      log_dbg("removing flush timer fd %d from evloop %d",  bs->flush_timer.ev_ctx.fd, bs->signal_buffer_available.epollfd);
      netio_timer_close(&ssocket->ctx->evloop, &bs->flush_timer);
    }
    bs->current_buffer = NULL;
    if(ssocket->tcp_fi_mode == NETIO_MODE_LIBFABRIC){
      handle_send_socket_shutdown(ssocket);
    } else if (ssocket->tcp_fi_mode == NETIO_MODE_TCP){
      handle_tcp_send_socket_shutdown(ssocket);
    }

    netio_bufferstack_close(&bs->buffers, bs->num_pages);

    // This is pubsocket_on_connection_closed in pubsub.c
    // pubsocket_on_connection_closed will call table_remove_subscription.
    // table_remove_subscription can call netio_disconnect that sends a shutdown
    // For RDMA shutdown goes via CM, for TCP/IP it requires the FD.
    if(bs->pub_socket) { //only remove when send socket is part of a publish socket
      if(bs->cb_connection_closed) {
        bs->cb_connection_closed(bs);
      }
      struct netio_publish_socket* psocket = (struct netio_publish_socket*)bs->pub_socket;
      remove_socket(&(psocket->subscription_table.socket_list), (void*)bs);
    } else {
      if(bs->cb_connection_closed) {
        bs->cb_connection_closed(bs);
      }
    }
}

static void
on_error_connection_refused(struct netio_send_socket* socket) {
  struct netio_buffered_send_socket* bs = (struct netio_buffered_send_socket*)(socket->usr);
  netio_signal_close(&(socket->ctx->evloop), &bs->signal_buffer_available);
  if(bs->timeout_ms != 0){
    netio_timer_close(&(socket->ctx->evloop), &bs->flush_timer);
  }
  if(bs->cb_error_connection_refused) {
    bs->cb_error_connection_refused(bs);
  }
}

static void
on_buffered_msg_received(struct netio_recv_socket* socket, struct netio_buffer* buf, void* data, size_t len)
{
    struct netio_buffered_recv_socket* bs = (struct netio_buffered_recv_socket*)socket->usr;
    if(bs->lsocket->cb_msg_received) {
      size_t pos = 0;
      while(pos < len) {
        msg_size_t* s = (msg_size_t*)((char *)buf->data + pos);
        pos += sizeof(msg_size_t);
        bs->lsocket->cb_msg_received(bs, (char *)buf->data + pos, *s);
        pos += *s;
      }
    }

    //to study the L1ID pileup
    struct netio_subscribe_socket* ssocket = bs->lsocket->listen_socket.usr;
    if(ssocket && ssocket->cb_buf_received) {
      ssocket->cb_buf_received(ssocket, buf, len);
    }

    netio_post_recv(socket, buf);
}



// API FUNCTIONS ///////////////////////////////////////////////////////////////

void
netio_buffered_listen_socket_init(struct netio_buffered_listen_socket* socket,
                                  struct netio_context* ctx,
                                  struct netio_buffered_socket_attr* attr)
{
      memset(socket, 0, sizeof(*socket));
    if (attr->num_pages > NETIO_DOMAIN_MAX_MR){
      log_error("Number of recv pages requested %lu exceeds max value. Using %lu.", attr->num_pages, NETIO_DOMAIN_MAX_MR);
      attr->num_pages = NETIO_DOMAIN_MAX_MR;
    }
    socket->pagesize = attr->pagesize;
    socket->num_pages = attr->num_pages;
    netio_init_listen_socket(&socket->listen_socket, ctx, NULL);
}


/* Same as above except _tcp in netio_init_listen_socket */
void
netio_buffered_listen_tcp_socket_init(struct netio_buffered_listen_socket* socket,
                                  struct netio_context* ctx,
                                  struct netio_buffered_socket_attr* attr)
{
      memset(socket, 0, sizeof(*socket));
    socket->pagesize = attr->pagesize;
    socket->num_pages = attr->num_pages;
    netio_init_listen_tcp_socket(&socket->listen_socket, ctx, NULL);
}


void
netio_buffered_listen(struct netio_buffered_listen_socket* socket, const char* hostname, unsigned port)
{
  log_info("netio_buffered_listen %s", hostname);
  int ret;
  struct fi_info* hints;
  struct fi_eq_attr eq_attr = {10000, 0, FI_WAIT_FD, 0, 0};

  hints = fi_allocinfo();
  hints->ep_attr->type  = FI_EP_MSG;
  hints->caps = FI_MSG;
  hints->mode = FI_LOCAL_MR;
  char port_addr[32];
  snprintf(port_addr, 32, "%u", port);

  log_dbg("listening (libfabric) on %s:%s", hostname, port_addr);

  if((ret = fi_getinfo(FI_VERSION(1, 1), hostname, port_addr, FI_SOURCE, hints,
                       &socket->listen_socket.fi)))
  {
      FATAL("Buf-listen socket, fail to get interface info, error ", ret);
  }
  // printf("%s\n", fi_tostr(socket->listen_socket.fi, FI_TYPE_INFO));

  if((ret = fi_fabric(socket->listen_socket.fi->fabric_attr, &socket->listen_socket.fabric, NULL)))
  {
      FATAL("Buf-listen socket, cannot open fabric, error ", ret);
  }

  if((ret = fi_eq_open(socket->listen_socket.fabric, &eq_attr, &socket->listen_socket.eq, NULL)))
  {
      FATAL("Buf-listen socket, cannot open Event Queue, error ", ret);
  }

  if((ret = fi_passive_ep(socket->listen_socket.fabric, socket->listen_socket.fi, &socket->listen_socket.pep, NULL)))
  {
      FATAL("Buf-listen socket, cannot open passive endpoint, error ", ret);
  }

  if((ret = fi_pep_bind(socket->listen_socket.pep, &socket->listen_socket.eq->fid, 0)))
  {
      FATAL("Buf-listen socket, cannot bind passive endpoint, error ", ret);
  }

  if((ret = fi_listen(socket->listen_socket.pep)))
  {
      FATAL("Buf-listen socket, cannot enable, error ", ret);
  }

  if((ret = fi_control(&socket->listen_socket.eq->fid, FI_GETWAIT, &socket->listen_socket.eqfd)))
  {
      FATAL("Buf-listen socket failed to obtain Event Queue wait object", ret);
  }

  socket->listen_socket.eq_ev_ctx.fd = socket->listen_socket.eqfd;
  socket->listen_socket.eq_ev_ctx.data = socket;
  socket->listen_socket.eq_ev_ctx.cb = on_buffered_listen_socket_libfabric_cm_event;
  struct netio_eventloop* eloop = &socket->listen_socket.ctx->evloop;
  netio_register_read_fd(eloop, &socket->listen_socket.eq_ev_ctx);
  add_open_fd(&eloop->openfds, socket->listen_socket.eqfd, NETIO_EQ, BLISTEN, socket);
  log_dbg("netio_buffered_listen_socket: registering EQ fd %d", socket->listen_socket.eqfd);
  fi_freeinfo(hints);
}


/* _tcp version of above. This time there are more differences */
void
netio_buffered_listen_tcp(struct netio_buffered_listen_socket* socket,
                          const char* hostname, unsigned port)
{
  log_info("Buffered TCP/IP listening on %s:%d", hostname, port);

  netio_listen_tcp(&socket->listen_socket, hostname, port);

  socket->listen_socket.eq_ev_ctx.cb = on_buffered_listen_socket_tcp_cm_event;
  netio_register_read_tcp_fd(&socket->listen_socket.ctx->evloop, &socket->listen_socket.eq_ev_ctx);
}



void
netio_buffered_send_socket_init(struct netio_buffered_send_socket* socket, struct netio_context* ctx, struct netio_buffered_socket_attr* attr)
{
    memset(socket, 0, sizeof(*socket));
  netio_init_send_socket(&socket->send_socket, ctx);
  socket->send_socket.cq_size = NETIO_MAX_CQ_EVENTS_BUFFERED;
    socket->send_socket.usr = socket;
  socket->send_socket.cb_send_completed = on_send_completed;
  socket->send_socket.cb_connection_established = on_connect;
  socket->send_socket.cb_connection_closed = on_buf_send_socket_connection_closed;
  socket->send_socket.cb_error_connection_refused = on_error_connection_refused;
  socket->current_buffer = NULL;
  socket->pub_socket = NULL;
  socket->pos = 0;
  socket->busy = 0;
  socket->watermark = attr->watermark;
  if (attr->num_pages > NETIO_DOMAIN_MAX_MR){
    log_error("Number of send buffers requested %lu exceeds max value. Using %lu.", attr->num_pages, NETIO_DOMAIN_MAX_MR);
    attr->num_pages = NETIO_DOMAIN_MAX_MR;
  }
  socket->num_pages = attr->num_pages;
  socket->buffersize = attr->pagesize;
  netio_signal_init(&ctx->evloop, &socket->signal_buffer_available);
  socket->signal_buffer_available.cb = NULL; //deactivated by default
  socket->timeout_ms = attr->timeout_ms;
  if(attr->timeout_ms != 0){
    netio_timer_init(&ctx->evloop, &socket->flush_timer);
    socket->flush_timer.cb = flush_cb;
    socket->flush_timer.data = socket;
  } else {
    socket->flush_timer.cb = NULL;
  }
}

/* Same as above except for the _tcp in netio_init_send_tcp_socket  */
/* If this works, consider factoring out common code                */
void
netio_buffered_send_tcp_socket_init(struct netio_buffered_send_socket* socket, struct netio_context* ctx, struct netio_buffered_socket_attr* attr)
{
    memset(socket, 0, sizeof(*socket));
  netio_init_send_tcp_socket(&socket->send_socket, ctx);
    socket->send_socket.usr = socket;
  socket->send_socket.cb_send_completed = on_send_completed;
  socket->send_socket.cb_connection_established = on_connect;
  socket->send_socket.cb_connection_closed = on_buf_send_socket_connection_closed;
  socket->send_socket.cb_error_connection_refused = on_error_connection_refused;
  socket->current_buffer = NULL;
  socket->pub_socket = NULL;
  socket->pos = 0;
  socket->busy = 0;
  socket->watermark = attr->watermark;
  socket->num_pages = attr->num_pages;
  socket->buffersize = attr->pagesize;
  netio_signal_init(&ctx->evloop, &socket->signal_buffer_available);
  socket->signal_buffer_available.cb = NULL; //deactivated by default
  socket->timeout_ms = attr->timeout_ms;
  if(attr->timeout_ms != 0){
    netio_timer_init(&ctx->evloop, &socket->flush_timer);
    socket->flush_timer.cb = flush_cb;
    socket->flush_timer.data = socket;
  } else {
    socket->flush_timer.cb = NULL;
  }
}

void netio_buffered_send_socket_init_and_connect(struct netio_buffered_send_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffered_socket_attr* attr) {
  int tcp = netio_tcp_mode(hostname);
  const char* host = netio_hostname(hostname);
  if (tcp) {
    netio_buffered_send_tcp_socket_init(socket, ctx, attr);
  } else {
    netio_buffered_send_socket_init(socket, ctx, attr);
  }
  netio_buffered_connect(socket, host, port);
}

void netio_buffered_listen_socket_init_and_listen(struct netio_buffered_listen_socket* socket, struct netio_context* ctx, const char* hostname, unsigned port, struct netio_buffered_socket_attr* attr) {
  int tcp = netio_tcp_mode(hostname);
  const char* host = netio_hostname(hostname);
  if (tcp) {
    netio_buffered_listen_tcp_socket_init(socket, ctx, attr);
    netio_buffered_listen_tcp(socket, host, port);
  } else {
    netio_buffered_listen_socket_init(socket, ctx, attr);
    netio_buffered_listen(socket, host, port);
  }
}

void
netio_buffered_connect(struct netio_buffered_send_socket* socket, const char* hostname, unsigned port)
{
  netio_connect(&socket->send_socket, netio_hostname(hostname), port);
}


void
netio_buffered_connect_rawaddr(struct netio_buffered_send_socket* socket, void* addr, size_t addrlen)
{
    netio_connect_rawaddr(&socket->send_socket, addr, addrlen);
}


int
netio_buffered_send(struct netio_buffered_send_socket* socket, void* data, size_t size)
{
    struct iovec iov;
    iov.iov_base = data;
    iov.iov_len = size;
    return netio_buffered_sendv(socket, &iov, 1);
}


int
netio_buffered_sendv(struct netio_buffered_send_socket* socket, struct iovec* iov, size_t num)
{
  if(socket->busy){
    int ret = flush(socket);
    if (ret == NETIO_STATUS_AGAIN){
      return NETIO_STATUS_AGAIN;
    }
  }

    size_t total_size = 0;
    for(unsigned int i=0; i<num; i++) {
        total_size += iov[i].iov_len;
    }

  //if current message is larger than the whole buffer
    if(total_size+sizeof(msg_size_t) > socket->buffersize) {
        return NETIO_STATUS_TOO_BIG;
    }

    if(socket->current_buffer == NULL) {
        if(netio_bufferstack_pop(&socket->buffers, &socket->current_buffer)) {
            return NETIO_STATUS_AGAIN;
        }
        socket->pos = 0;
    //Enable flush timer
    if(socket->timeout_ms != 0 ){
      netio_timer_start_ms(&socket->flush_timer, socket->timeout_ms);
    }
    } else {
    //if current message is larger than remaining space
    //flush buffer and retry with a new one
    if(socket->pos+total_size+sizeof(msg_size_t) >= socket->buffersize){
      flush(socket);
      return NETIO_STATUS_AGAIN;
    }
  }

    *(msg_size_t*)((char*)socket->current_buffer->data + socket->pos) = total_size;
    socket->pos += sizeof(msg_size_t);
    for(unsigned int i=0; i<num; i++) {
        memcpy((char *)socket->current_buffer->data + socket->pos, iov[i].iov_base, iov[i].iov_len);
        socket->pos += iov[i].iov_len;
    }

    if(socket->pos > socket->watermark) {
        flush(socket);
    }
    return NETIO_STATUS_OK;
}

void
netio_buffered_recv_socket_init(struct netio_buffered_recv_socket* socket, struct netio_buffered_listen_socket* lsocket)
{
      memset(socket, 0, sizeof(*socket));
    socket->lsocket = lsocket;
    netio_init_recv_socket(&socket->recv_socket, &socket->lsocket->listen_socket);
    socket->recv_socket.cq_size = NETIO_MAX_CQ_EVENTS_BUFFERED;
    socket->recv_socket.usr = socket;

    socket->num_pages = socket->lsocket->num_pages;
    socket->pages = (struct netio_buffer*)malloc(socket->num_pages * sizeof(struct netio_buffer));
    for(unsigned int i=0; i<socket->num_pages; i++) {
      socket->pages[i].data = malloc(socket->lsocket->pagesize);
      socket->pages[i].size = socket->lsocket->pagesize;
    }
    socket->recv_socket.lsocket->cb_msg_received = on_buffered_msg_received;
}

/* Same as above except for the _tcp in netio_init_recv_tcp_socket  */
void
netio_buffered_recv_tcp_socket_init(struct netio_buffered_recv_socket* socket, struct netio_buffered_listen_socket* lsocket)
{
  memset(socket, 0, sizeof(*socket));
  socket->lsocket = lsocket;
  netio_init_recv_tcp_socket(&socket->recv_socket, &socket->lsocket->listen_socket);
  socket->recv_socket.usr = socket;

  socket->num_pages = socket->lsocket->num_pages;
  socket->pages = (struct netio_buffer*)malloc(socket->num_pages * sizeof(struct netio_buffer));
  for(unsigned int i=0; i<socket->num_pages; i++) {
    socket->pages[i].data = malloc(socket->lsocket->pagesize);
    socket->pages[i].size = socket->lsocket->pagesize;
  }
  socket->recv_socket.lsocket->cb_msg_received = on_buffered_msg_received;
}


void
netio_buffered_flush(struct netio_buffered_send_socket* socket)
{
    flush(socket);
}