Program Listing for File eventloop.c

Return to documentation for file (eventloop.c)

#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <sys/timerfd.h>
#include <sys/eventfd.h>
#include <sys/stat.h>
#include <stdio.h>
#include "log.h"

#include "netio/netio.h"

const char* resource_name[] = { "NETIO_TIMER", "NETIO_SIGNAL", "NETIO_CQ", "NETIO_EQ", "NETIO_TCP"};
const char* socket_name[] = {"BSEND", "USEND", "BRECV", "URECV", "BSUB", "USUB", "BPUB", "UPUB", "BLISTEN", "ULISTEN", "NOSOCKET"};

#if defined DEBUG || defined DEBUG_EV
#define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
#define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
#define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
#else
#define log_dbg(...)
#define log_trc(...)
#endif


//#define TRACK_ALL_FD  //This option is not thread-safe, see FLX-2022.

// STATIC FUNCTIONS ////////////////////////////////////////////////////////////

static int check_fd_is_closed(struct closed_fds* closedfds, int fd){
  for(int i = 0; i < closedfds->count; i++){
    if(closedfds->fds[i] == fd){
      return true;
    }
  }
  return false;
}


static void
process_event(struct netio_event_context* evc, struct closed_fds* closedfds)
{
  if(closedfds->count == 0 || !check_fd_is_closed(closedfds, evc->fd)){
    if(evc->cb != NULL) {
      evc->cb(evc->fd, evc->data);
    }
  }
}

static void add_closed_fd(struct closed_fds* closedfds, int fd){
  if (closedfds->count +1 >= 3*NETIO_MAX_POLLED_FIDS){log_warn("Cannot add FD %d to list of closed FDs because Array is full.", fd); return;}
  closedfds->fds[closedfds->count++] = fd;
}

static void reset_closed_fds(struct closed_fds* closedfds){
  closedfds->count = 0;
}

static void
set_timerfd(int fd, unsigned s, unsigned ns)
{
  struct itimerspec it;
  it.it_interval.tv_sec = s;
  it.it_interval.tv_nsec = ns;
  it.it_value.tv_sec = s;
  it.it_value.tv_nsec = ns;
  if(timerfd_settime(fd, 0, &it, NULL)) {
    log_error("Could not set timerfd %d. The timer will not fire.", fd);
    return;
  }
}

static void
register_fd(int epfd, struct netio_event_context* ctx, int flags)
{
    struct epoll_event ev;
    ev.events = flags;
    ev.data.ptr = ctx;
    int rc = fcntl(ctx->fd, F_SETFL, fcntl(ctx->fd, F_GETFL) | O_NONBLOCK );
    if (rc < 0) {
      log_error("Failed to change flags (incl. O_NONBLOCK) of file descriptor %d.", ctx->fd);
    }
    log_dbg("Adding %d to epoll %d", ctx->fd, epfd);
    if(epoll_ctl(epfd, EPOLL_CTL_ADD, ctx->fd, &ev))
    {
      log_error("Could not add file descriptor %d to epoll. Events from this resource will be neglected.", ctx->fd);
      return;
    }
}

// PRIVATE FUNCTIONS ///////////////////////////////////////////////////////////

void
netio_timer_callback(int fd, void* data)
{
    log_trc("timer event on FD %d.", fd);
    struct netio_timer* timer = (struct netio_timer*)data;
    uint64_t buf;
    if(8 != read(fd, &buf, 8)) {
        log_dbg("Did not read 8 bytes.");
    }
    if(timer->cb)
        timer->cb(timer->data);
}

void
netio_signal_callback(int fd, void* data)
{
    log_dbg("signal event on FD %d.", fd);

    struct netio_signal* signal = (struct netio_signal*)data;
    uint64_t buf;
    if(8 != read(fd, &buf, 8)) {
        log_info("Did not read 8 bytes.");
    }
    log_dbg("Count = %lu", buf);
    if(signal->cb)
        signal->cb(signal->data);
}

void
netio_error_connection_refused_callback(int fd, void* data)
{
    log_dbg("error event on FD %d.", fd);

    struct netio_event_context* ev_ctx = (struct netio_event_context*)data;
    struct netio_send_socket* socket;
    if(8 != read(fd, &socket, 8)) {
        log_dbg("Did not read 8 bytes.");
    }
    if (socket->cb_error_connection_refused) {
        socket->cb_error_connection_refused(socket);
    } else {
        log_error("Send socket %p has no connection refused callback set. Resources not freed.", socket);
    }
    free(ev_ctx);
}

void
netio_error_bind_refused_callback(int fd, void* data)
{
    log_dbg("error event on FD %d.", fd);

    struct netio_event_context* ev_ctx = (struct netio_event_context*)data;
    struct netio_listen_socket* socket;
    if(8 != read(fd, &socket, 8)) {
      log_dbg("Did not read 8 bytes.");
    }
    if (socket->cb_error_bind_refused) {
      socket->cb_error_bind_refused(socket);
    } else {
      log_error("Listen socket %p has no bind refused callback set. Resources not freed.", socket);
    }
    free(ev_ctx);
}

// INTERNAL RESOURCES ///////////////////////////////////////////////////////////////

void
init_polled_fids(struct polled_fids* pfids, int initial_size){
  pfids->size = initial_size;
  pfids->count = 0;
  pfids->fabric = NULL;
  pfids->fid_set = malloc(pfids->size*sizeof(struct fid*));
  pfids->data  = malloc(pfids->size*sizeof(struct polled_fids_data));
}

void
init_openfds(struct open_fds* fds, int initial_size){
  fds->size = initial_size;
  fds->count = 0;
  fds->data  = malloc(fds->size*sizeof(struct open_fd_data));
}


void
print_polled_fids(struct polled_fids* pfids){
  printf("Number of polled fds %u \n", pfids->count);
  printf("FD \t FID \n");
  for(unsigned int i=0; i < pfids->count; ++i){
    printf("%d \t %p \n", pfids->data[i].fd, pfids->fid_set[i]);
  }
  printf("-------------------\n");
}

void
print_openfds(struct open_fds* fds){
  printf("Number of open fds %u \n", fds->count);
  printf("===============================================\n");
  printf("FD \t RESOURCE \t SOCKET \t OBJ ADDR \n");
  printf("-----------------------------------------------\n");
  for(unsigned int i=0; i < fds->count; ++i){
    int r = fds->data[i].rtype;
    int s = fds->data[i].stype;
    printf("%d\t%s\t%s\t%p\n", fds->data[i].fd, resource_name[r], socket_name[s], fds->data[i].object);
  }
  printf("===============================================\n");
}


void
add_polled_fid(struct polled_fids* pfids, struct fid_fabric* fab, struct fid* fid, int fd, void* socket, void (*cb)(int,void*)){
  if(pfids->size <= pfids->count){
    log_dbg("Reallocing polled fids");
    pfids->fid_set = realloc(pfids->fid_set, 2*(pfids->size)*sizeof(struct fid*));
    pfids->data = realloc(pfids->data, 2*(pfids->size)*sizeof(struct polled_fids_data));
    pfids->size *= 2;
  };
  log_dbg("Polled_fids %p Adding polled fd %d fid %p.", pfids, fd, fid);
  pfids->fabric = fab;
  pfids->fid_set[pfids->count] = fid;
  pfids->data[pfids->count].fd = fd;
  pfids->data[pfids->count].socket = socket;
  pfids->data[pfids->count].cb = cb;
  pfids->count++;
  //print_polled_fids(pfids);
};

void
add_open_fd(struct open_fds* fds, int fd, enum resource_type rtype, enum socket_type stype, void* object){
  if(fds->size <= fds->count){
    log_dbg("Reallocing open fds");
    fds->data = realloc(fds->data, 2*(fds->size)*sizeof(struct open_fd_data));
    fds->size *= 2;
  };
  log_dbg("New open fd %d res type %s socket type %s", fd, resource_name[rtype], socket_name[stype]);
  for(unsigned int i=0; i<fds->count; ++i){
    if (fd == fds->data[i].fd){
      log_error("Adding again fd % to open fds!", fd);
    }
  }
  fds->data[fds->count].fd = fd;
  fds->data[fds->count].object = object;
  fds->data[fds->count].rtype = rtype;
  fds->data[fds->count].stype = stype;
  fds->count++;
}


void
remove_polled_fid(struct polled_fids* pfids, int fd){
  log_dbg("Polled_fids %p removing polled fd %d.", pfids, fd);
  for(unsigned int i = 0; i < pfids->count; i++){
    if(fd == pfids->data[i].fd){
      log_dbg("FD %d FID %p removed.", fd, pfids->fid_set[i]);
      for(unsigned int j = i; j < pfids->count-1; j++){
        pfids->fid_set[j] = pfids->fid_set[j+1];
        pfids->data[j] = pfids->data[j+1];
      }
      pfids->count -= 1;
      break;
    }
  }
  //print_polled_fids(pfids);
}


void
remove_open_fd(struct netio_eventloop* ev, int fd){
  struct open_fds* fds = &ev->openfds;
  for(unsigned int i = 0; i < fds->count; i++){
    if(fd == fds->data[i].fd){
      log_dbg("Removing from open fd record fd %d res type %s socket type %s. Current registered fds %u", fd, resource_name[fds->data[i].rtype], socket_name[fds->data[i].stype],fds->count);
      for(unsigned int j = i; j < fds->count-1; j++){
        fds->data[j] = fds->data[j+1];
      }
      fds->count -= 1;
      break;
    }
  }
  add_closed_fd(&ev->closedfds, fd);
}


int
check_open_fd_exists(struct open_fds* fds, int fd){
  for(unsigned int i = 0; i < fds->count; i++){
    if(fd == fds->data[i].fd){
      return true;
    }
  }
  return false;
}


// API FUNCTIONS ///////////////////////////////////////////////////////////////

void
netio_eventloop_init(struct netio_eventloop* evloop)
{
    evloop->epollfd = epoll_create1(0); // no flag passed, same behaviour as epoll_create
    evloop->events = malloc(sizeof(struct epoll_event)*NETIO_MAX_EPOLL_EVENTS);
    init_polled_fids(&evloop->pfids, NETIO_MAX_POLLED_FIDS);
    init_openfds(&evloop->openfds, NETIO_MAX_POLLED_FIDS);
    reset_closed_fds(&evloop->closedfds);
    log_dbg("Creating a new eventloop with fd %d", evloop->epollfd);

    if(evloop->epollfd == -1) {
      log_fatal("Could not create epoll fd. Exit.");
      exit(2);
    }

    //termination signal
    evloop->stop_signal.data = evloop;
    evloop->stop_signal.cb = netio_stop;
    netio_signal_init(evloop, &(evloop->stop_signal));
    log_dbg("stop signal initialised with fd %d", evloop->stop_signal.ev_ctx.fd );
}

void
netio_timer_init(struct netio_eventloop* evloop, struct netio_timer* timer)
{
  timer->ev_ctx.fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
  timer->ev_ctx.data = timer;
  timer->ev_ctx.cb = netio_timer_callback;
  if(timer->ev_ctx.fd == -1)
  {
    log_error("Could not create timerfd. The timer will not fire.");
    return;
  }
  log_dbg("registering timerfd %d to %d", timer->ev_ctx.fd, evloop->epollfd);
  netio_register_read_fd(evloop, &timer->ev_ctx);
#if defined TRACK_ALL_FD
  add_open_fd(&evloop->openfds, timer->ev_ctx.fd, NETIO_TIMER, NOSOCKET, timer);
#endif
}

void
netio_timer_close(struct netio_eventloop* evloop, struct netio_timer* timer)
{
  netio_timer_stop(timer);
  epoll_ctl(evloop->epollfd, EPOLL_CTL_DEL, timer->ev_ctx.fd, NULL);
  log_dbg("netio_timer_close: deregistered timer eventfd %d", timer->ev_ctx.fd);
#if defined TRACK_ALL_FD
  int ret = close(timer->ev_ctx.fd);
  if(!ret){
    remove_open_fd(&evloop, timer->ev_ctx.fd);
  }
#else
  close(timer->ev_ctx.fd);
#endif
}

void
netio_timer_start_s(struct netio_timer* timer, unsigned long long seconds)
{
  set_timerfd(timer->ev_ctx.fd, seconds, 0);
}

void
netio_timer_start_ms(struct netio_timer* timer, unsigned long long milliseconds)
{
  set_timerfd(timer->ev_ctx.fd, milliseconds/1000, (milliseconds%1000)*1000*1000);
}

void
netio_timer_start_us(struct netio_timer* timer, unsigned long long microseconds)
{
  set_timerfd(timer->ev_ctx.fd, microseconds/(1000*1000), (microseconds%(1000*1000)*1000));
}

void
netio_timer_start_ns(struct netio_timer* timer, unsigned long long nanoseconds)
{
  set_timerfd(timer->ev_ctx.fd, nanoseconds/(1000*1000*1000), nanoseconds%(1000*1000*1000));
}

void
netio_timer_stop(struct netio_timer* timer) {
    set_timerfd(timer->ev_ctx.fd, 0, 0);
}


void
netio_signal_init(struct netio_eventloop* evloop, struct netio_signal* signal)
{
    signal->ev_ctx.fd = eventfd(0, EFD_NONBLOCK | EFD_SEMAPHORE);
    signal->ev_ctx.data = signal;
    if(signal->ev_ctx.fd == -1)
    {
      log_fatal("Could not open eventfd");
      exit(2);
    }

    signal->ev_ctx.cb = netio_signal_callback;
    signal->epollfd = evloop->epollfd;
    netio_register_read_fd(evloop, &signal->ev_ctx);
#if defined TRACK_ALL_FD
    add_open_fd(&evloop->openfds, signal->ev_ctx.fd, NETIO_SIGNAL, NOSOCKET, signal);
#endif
    log_dbg("Registering eventfd %d", signal->ev_ctx.fd);
}


void
netio_signal_no_semaphore_init(struct netio_eventloop* evloop, struct netio_signal* signal)
{
    signal->ev_ctx.fd = eventfd(0, EFD_NONBLOCK);
    signal->ev_ctx.data = signal;
    if(signal->ev_ctx.fd == -1)
    {
      log_fatal("Could not open eventfd");
      exit(2);
    }

    signal->ev_ctx.cb = netio_signal_callback;
    signal->epollfd = evloop->epollfd;
    netio_register_read_fd(evloop, &signal->ev_ctx);
#if defined TRACK_ALL_FD
    add_open_fd(&evloop->openfds, signal->ev_ctx.fd, NETIO_SIGNAL, NOSOCKET, signal);
#endif
    log_dbg("Registering eventfd %d", signal->ev_ctx.fd);
}




void
netio_signal_close(struct netio_eventloop* evloop, struct netio_signal* signal)
{
    int rc = epoll_ctl(evloop->epollfd, EPOLL_CTL_DEL, signal->ev_ctx.fd, NULL);
    if (rc) {
      log_warn("Cannot deregister signal fd %d from evloop %d, %s", signal->ev_ctx.fd, evloop->epollfd, strerror(errno));
    }
    log_dbg("netio_signal_close: deregistered signal eventfd %d, ret %d from evloop %d", signal->ev_ctx.fd, rc, signal->epollfd);
    int ret = close(signal->ev_ctx.fd);
    if(ret) {log_warn("Cannot close %d: %s", signal->ev_ctx.fd, strerror(errno));}
#if defined TRACK_ALL_FD
    remove_open_fd(&evloop, signal->ev_ctx.fd);
#endif
}


void
netio_signal_fire(struct netio_signal* signal)
{
    uint64_t buf = 1;
    int ret = write(signal->ev_ctx.fd, &buf, 8);
    if( ret !=8 ){
      log_error("Firing signal writing on fd %d, only %d / 8 bytes written. Errno %s", signal->ev_ctx.fd, ret, strerror(errno));
    }
}

void netio_error_connection_refused_fire(struct netio_send_socket* socket)
{
    struct netio_event_context* ev_ctx = malloc(sizeof(struct netio_event_context));
    ev_ctx->fd = eventfd(0, EFD_NONBLOCK);
    // event context is freed by the callback
    ev_ctx->data = ev_ctx;
    if (ev_ctx->fd == -1)
    {
      log_fatal("Could not open eventfd for send socket error_connection_refused");
      exit(2);
    }

    ev_ctx->cb = netio_error_connection_refused_callback;

    netio_register_read_fd(&socket->ctx->evloop, ev_ctx);
    add_open_fd(&socket->ctx->evloop.openfds, ev_ctx->fd, NETIO_SIGNAL, USEND, socket);
    log_dbg("listen_socket netio_error_connection_refused_fire: registering eventfd %d", ev_ctx->fd);

    // Socket is transmitted as data to the file descriptor
    // struct netio_send_socket* buf = socket;
    write(ev_ctx->fd, &socket, 8);
}

void netio_error_bind_refused_fire(struct netio_listen_socket* socket)
{
    struct netio_event_context* ev_ctx = malloc(sizeof(struct netio_event_context));
    ev_ctx->fd = eventfd(0, EFD_NONBLOCK);
    // event context is freed by the callback
    ev_ctx->data = ev_ctx;
    if (ev_ctx->fd == -1)
    {
      log_fatal("Could not open eventfd for listen socket error_bind_refused");
      exit(2);
    }

    ev_ctx->cb = netio_error_bind_refused_callback;

    netio_register_read_fd(&socket->ctx->evloop, ev_ctx);
    add_open_fd(&socket->ctx->evloop.openfds, ev_ctx->fd, NETIO_SIGNAL, ULISTEN, socket);
    log_dbg("listen_socket netio_error_bind_refused_fire: registering eventfd %d", ev_ctx->fd);

    // Socket is transmitted as data to the file descriptor
    // struct netio_listen_socket* buf = socket;
    write(ev_ctx->fd, &socket, 8);
}


void
netio_run(struct netio_eventloop* evloop)
{
  evloop->is_running = 1;
  int nevents;

  if(evloop->cb_init != NULL) {
      evloop->cb_init(evloop->data);
  }
  int running=1;
  while(running) {
    // don't want to block or wait too long if we're shutting down
    uint64_t timeout = evloop->is_running ? NETIO_EPOLL_TIMEOUT : 10;
    nevents = epoll_wait(evloop->epollfd, evloop->events, NETIO_MAX_EPOLL_EVENTS,
          timeout);
    log_trc("epoll wait: %d events to process", nevents);

    for(int i = 0; i < nevents; ++i)
    {
      log_trc("event type: %x from fd %d", evloop->events[i].events, ((struct netio_event_context*)evloop->events[i].data.ptr)->fd);
      process_event((struct netio_event_context*)(evloop->events[i].data.ptr), &evloop->closedfds);
      if(evloop->events[i].events & EPOLLRDHUP)
      {
        struct netio_event_context* c = (struct netio_event_context*)(evloop->events[i].data.ptr);
        log_dbg("EPOLLRDHUP on fd %d, removing it from epoll_wait", c->fd);
        epoll_ctl(evloop->epollfd, EPOLL_CTL_DEL, c->fd, NULL);
      }
    }
    if (evloop->is_running==0 && nevents==0) {
       running=0;
    }
    reset_closed_fds(&evloop->closedfds);
    if(unlikely(nevents == -1))
    {
      int errsv = errno;
      if(errsv==EINTR) {
        log_dbg("Eventloop: non-blocking epoll_wait returned -1: %s", strerror(errsv));
        continue;
      }
      else {
        log_fatal("Eventloop: non-blocking epoll_wait returned -1: %s", strerror(errsv));
        free(evloop->events);
        exit(1);
      }
    }
  }//end of while running

  log_dbg("Cleaning up eventloop");
  close(evloop->epollfd);
  for(unsigned int i=0; i < evloop->openfds.count; ++i){
    struct open_fd_data* data = &evloop->openfds.data[i];
    log_warn("Evloop terminated, closing fd %d type %s socket type %s", data->fd, resource_name[data->rtype], socket_name[data->stype]);
    close(data->fd);
  }
  free(evloop->openfds.data);
  free(evloop->pfids.data);
  free(evloop->pfids.fid_set);
  free(evloop->events);
}


void
netio_terminate(struct netio_eventloop* evloop)
{
  if (evloop->is_running == 1){
    netio_stop((void*)evloop);
  }
}


void
netio_terminate_signal(struct netio_eventloop* evloop)
{
  if (evloop->is_running == 1){
    log_info("Firing termination signal");
    netio_signal_fire(&evloop->stop_signal);
  } else {
    log_warn("netio_terminate_signal called but evloop not running");
  }
}

void
netio_stop(void* ptr)
{
  struct netio_eventloop* evloop = (struct netio_eventloop*)ptr;
  netio_signal_close(evloop, &evloop->stop_signal);
  evloop->pfids.count = 0;
  evloop->is_running = 0;
}

void
netio_register_read_fd(struct netio_eventloop* evloop, struct netio_event_context* ctx)
{
  register_fd(evloop->epollfd, ctx, EPOLLIN | EPOLLRDHUP);
}


void
netio_close_socket(struct netio_eventloop* evloop, void* socket, enum socket_type type)
{
  //Sockets that contain more than one other socket
  switch (type){
    case BPUB: {
      struct netio_publish_socket* pub = (struct netio_publish_socket*)socket;
      struct netio_socket_list* it = pub->subscription_table.socket_list;
      while(it != NULL){
        if(it->socket){
          struct netio_buffered_send_socket* bs_socket = (struct netio_buffered_send_socket*)it->socket;
          netio_disconnect(&(bs_socket->send_socket)); //Do I need to disconnect for the sake of the other side?
          free(it->socket);
          if(it->addrlen > 0){ free(it->addr); }
        }
        struct netio_socket_list* tmp = it;
        it = it->next;
        free(tmp);
      }
      netio_close_socket(evloop, (void*)(&pub->lsocket), ULISTEN);
      return;
    }

    case UPUB: {
      struct netio_unbuffered_publish_socket* upub = (struct netio_unbuffered_publish_socket*)socket;
      struct netio_socket_list* uit = upub->subscription_table.socket_list;
      while(uit != NULL){
        if(uit->socket){
          struct netio_send_socket* s_socket = (struct netio_send_socket*)uit->socket;
          netio_disconnect(s_socket); //Do I need to disconnec for the sake of the other side?
          free(uit->socket);
          if(uit->addrlen > 0){ free (uit->addr); }
        }
        struct netio_socket_list* tmp = uit;
        uit = uit->next;
        free(tmp);
      }
      netio_close_socket(evloop, (void*)(&upub->lsocket), ULISTEN);
      struct netio_completion_stack* cs = &upub->completion_stack;
      free(cs->stack);
      free(cs->objects);
      free(cs->key_array);
      return;
    }

    case BSUB: {
      struct netio_subscribe_socket* sub_socket = (struct netio_subscribe_socket*)socket;
      netio_disconnect(&sub_socket->socket);
      netio_close_socket(evloop, &sub_socket->recv_socket, BLISTEN);
      if (sub_socket->remote_hostname) {
        free((void*)sub_socket->remote_hostname);
        sub_socket->remote_hostname=NULL;
      }
      return;
    }

    case USUB: {
      struct netio_unbuffered_subscribe_socket* usub_socket = (struct netio_unbuffered_subscribe_socket*)socket;
      netio_disconnect(&usub_socket->socket);
      netio_close_socket(evloop, &usub_socket->recv_socket, ULISTEN);
      if (usub_socket->remote_hostname) {
        free((void*)usub_socket->remote_hostname);
        usub_socket->remote_hostname=NULL;
      }
      return;
    }

    case NOSOCKET:
      return;

    default:
      ;
      //go on with the function
  }

  struct netio_signal* signal_close_socket = malloc(sizeof(struct netio_signal));
  struct signal_data* sd = malloc(sizeof(struct signal_data));
  sd->signal = signal_close_socket;
  sd->ptr = socket;
  sd->evloop = evloop;

  switch (type){
    case USEND:
      signal_close_socket->cb = close_send_socket;
      break;
    case BSEND:
      signal_close_socket->cb = close_buffered_send_socket;
      break;
    case URECV:
      signal_close_socket->cb = close_recv_socket;
      break;
    case BRECV:
      signal_close_socket->cb = close_buffered_recv_socket;
      break;
    case ULISTEN:
      signal_close_socket->cb = close_listen_socket;
      break;
    case BLISTEN: {
      struct netio_buffered_listen_socket* lsocket = (struct netio_buffered_listen_socket*)socket;
      sd->ptr = (void*)(&lsocket->listen_socket);
      signal_close_socket->cb = close_buffered_listen_socket;
      break;
    }
    case NOSOCKET:
      return;

    default:
      log_error("Could not delete socket: type unknown.");
      return;
  }
  signal_close_socket->data = sd;
  netio_signal_init(evloop, signal_close_socket);
  netio_signal_fire(signal_close_socket);
}


void netio_register_read_tcp_fd(struct netio_eventloop* evloop, struct netio_event_context* ctx)
{
  struct epoll_event ev;

  log_dbg("netio_register_read_tcp_fd.  Will register ctx->fd = %d with epoll", ctx->fd);
  ev.events = EPOLLIN | EPOLLRDHUP;
  ev.data.ptr = ctx;

  if(epoll_ctl(evloop->epollfd, EPOLL_CTL_ADD, ctx->fd, &ev))
  {
    if (errno != EEXIST) {
      log_error("Could not add FD %d to epoll. %s", ctx->fd, strerror(errno));
      return;
    }
    if(epoll_ctl(evloop->epollfd, EPOLL_CTL_MOD, ctx->fd, &ev))
    {
      log_error("Could not modify FD %d in epoll %s", ctx->fd, strerror(errno));
      exit(-2);
    }
  }
  log_dbg("Done");
}


void netio_register_write_tcp_fd(struct netio_eventloop* evloop, struct netio_event_context* ctx)
{
  struct epoll_event ev;

  log_dbg("netio_register_write_tcp_fd: registering ctx->fd = %d with epoll", ctx->fd);
  ev.events = EPOLLOUT | EPOLLRDHUP | EPOLLET;
  ev.data.ptr = ctx;

  if(epoll_ctl(evloop->epollfd, EPOLL_CTL_ADD, ctx->fd, &ev))
  {
    if (errno != EEXIST) {
      log_error("Could not add FD %d to epoll. errno=%d", ctx->fd, errno);
      return;
    }
    if(epoll_ctl(evloop->epollfd, EPOLL_CTL_MOD, ctx->fd, &ev))
    {
      log_error("Could not modify FD %d in epoll, errno=%d", ctx->fd, errno);
      exit(-2);
    }
  }
  log_dbg("Done");
}