.. _program_listing_file_eventloop.c: Program Listing for File eventloop.c ==================================== |exhale_lsh| :ref:`Return to documentation for file ` (``eventloop.c``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #include #include #include #include #include #include #include #include "log.h" #include "netio/netio.h" const char* resource_name[] = { "NETIO_TIMER", "NETIO_SIGNAL", "NETIO_CQ", "NETIO_EQ", "NETIO_TCP"}; const char* socket_name[] = {"BSEND", "USEND", "BRECV", "URECV", "BSUB", "USUB", "BPUB", "UPUB", "BLISTEN", "ULISTEN", "NOSOCKET"}; #if defined DEBUG || defined DEBUG_EV #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__) #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__) #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__) #else #define log_dbg(...) #define log_trc(...) #endif //#define TRACK_ALL_FD //This option is not thread-safe, see FLX-2022. // STATIC FUNCTIONS //////////////////////////////////////////////////////////// static int check_fd_is_closed(struct closed_fds* closedfds, int fd){ for(int i = 0; i < closedfds->count; i++){ if(closedfds->fds[i] == fd){ return true; } } return false; } static void process_event(struct netio_event_context* evc, struct closed_fds* closedfds) { if(closedfds->count == 0 || !check_fd_is_closed(closedfds, evc->fd)){ if(evc->cb != NULL) { evc->cb(evc->fd, evc->data); } } } static void add_closed_fd(struct closed_fds* closedfds, int fd){ if (closedfds->count +1 >= 3*NETIO_MAX_POLLED_FIDS){log_warn("Cannot add FD %d to list of closed FDs because Array is full.", fd); return;} closedfds->fds[closedfds->count++] = fd; } static void reset_closed_fds(struct closed_fds* closedfds){ closedfds->count = 0; } static void set_timerfd(int fd, unsigned s, unsigned ns) { struct itimerspec it; it.it_interval.tv_sec = s; it.it_interval.tv_nsec = ns; it.it_value.tv_sec = s; it.it_value.tv_nsec = ns; if(timerfd_settime(fd, 0, &it, NULL)) { log_error("Could not set timerfd %d. The timer will not fire.", fd); return; } } static void register_fd(int epfd, struct netio_event_context* ctx, int flags) { struct epoll_event ev; ev.events = flags; ev.data.ptr = ctx; int rc = fcntl(ctx->fd, F_SETFL, fcntl(ctx->fd, F_GETFL) | O_NONBLOCK ); if (rc < 0) { log_error("Failed to change flags (incl. O_NONBLOCK) of file descriptor %d.", ctx->fd); } log_dbg("Adding %d to epoll %d", ctx->fd, epfd); if(epoll_ctl(epfd, EPOLL_CTL_ADD, ctx->fd, &ev)) { log_error("Could not add file descriptor %d to epoll. Events from this resource will be neglected.", ctx->fd); return; } } // PRIVATE FUNCTIONS /////////////////////////////////////////////////////////// void netio_timer_callback(int fd, void* data) { log_trc("timer event on FD %d.", fd); struct netio_timer* timer = (struct netio_timer*)data; uint64_t buf; if(8 != read(fd, &buf, 8)) { log_dbg("Did not read 8 bytes."); } if(timer->cb) timer->cb(timer->data); } void netio_signal_callback(int fd, void* data) { log_dbg("signal event on FD %d.", fd); struct netio_signal* signal = (struct netio_signal*)data; uint64_t buf; if(8 != read(fd, &buf, 8)) { log_info("Did not read 8 bytes."); } log_dbg("Count = %lu", buf); if(signal->cb) signal->cb(signal->data); } void netio_error_connection_refused_callback(int fd, void* data) { log_dbg("error event on FD %d.", fd); struct netio_event_context* ev_ctx = (struct netio_event_context*)data; struct netio_send_socket* socket; if(8 != read(fd, &socket, 8)) { log_dbg("Did not read 8 bytes."); } if (socket->cb_error_connection_refused) { socket->cb_error_connection_refused(socket); } else { log_error("Send socket %p has no connection refused callback set. Resources not freed.", socket); } free(ev_ctx); } void netio_error_bind_refused_callback(int fd, void* data) { log_dbg("error event on FD %d.", fd); struct netio_event_context* ev_ctx = (struct netio_event_context*)data; struct netio_listen_socket* socket; if(8 != read(fd, &socket, 8)) { log_dbg("Did not read 8 bytes."); } if (socket->cb_error_bind_refused) { socket->cb_error_bind_refused(socket); } else { log_error("Listen socket %p has no bind refused callback set. Resources not freed.", socket); } free(ev_ctx); } // INTERNAL RESOURCES /////////////////////////////////////////////////////////////// void init_polled_fids(struct polled_fids* pfids, int initial_size){ pfids->size = initial_size; pfids->count = 0; pfids->fabric = NULL; pfids->fid_set = malloc(pfids->size*sizeof(struct fid*)); pfids->data = malloc(pfids->size*sizeof(struct polled_fids_data)); } void init_openfds(struct open_fds* fds, int initial_size){ fds->size = initial_size; fds->count = 0; fds->data = malloc(fds->size*sizeof(struct open_fd_data)); } void print_polled_fids(struct polled_fids* pfids){ printf("Number of polled fds %u \n", pfids->count); printf("FD \t FID \n"); for(unsigned int i=0; i < pfids->count; ++i){ printf("%d \t %p \n", pfids->data[i].fd, pfids->fid_set[i]); } printf("-------------------\n"); } void print_openfds(struct open_fds* fds){ printf("Number of open fds %u \n", fds->count); printf("===============================================\n"); printf("FD \t RESOURCE \t SOCKET \t OBJ ADDR \n"); printf("-----------------------------------------------\n"); for(unsigned int i=0; i < fds->count; ++i){ int r = fds->data[i].rtype; int s = fds->data[i].stype; printf("%d\t%s\t%s\t%p\n", fds->data[i].fd, resource_name[r], socket_name[s], fds->data[i].object); } printf("===============================================\n"); } void add_polled_fid(struct polled_fids* pfids, struct fid_fabric* fab, struct fid* fid, int fd, void* socket, void (*cb)(int,void*)){ if(pfids->size <= pfids->count){ log_dbg("Reallocing polled fids"); pfids->fid_set = realloc(pfids->fid_set, 2*(pfids->size)*sizeof(struct fid*)); pfids->data = realloc(pfids->data, 2*(pfids->size)*sizeof(struct polled_fids_data)); pfids->size *= 2; }; log_dbg("Polled_fids %p Adding polled fd %d fid %p.", pfids, fd, fid); pfids->fabric = fab; pfids->fid_set[pfids->count] = fid; pfids->data[pfids->count].fd = fd; pfids->data[pfids->count].socket = socket; pfids->data[pfids->count].cb = cb; pfids->count++; //print_polled_fids(pfids); }; void add_open_fd(struct open_fds* fds, int fd, enum resource_type rtype, enum socket_type stype, void* object){ if(fds->size <= fds->count){ log_dbg("Reallocing open fds"); fds->data = realloc(fds->data, 2*(fds->size)*sizeof(struct open_fd_data)); fds->size *= 2; }; log_dbg("New open fd %d res type %s socket type %s", fd, resource_name[rtype], socket_name[stype]); for(unsigned int i=0; icount; ++i){ if (fd == fds->data[i].fd){ log_error("Adding again fd % to open fds!", fd); } } fds->data[fds->count].fd = fd; fds->data[fds->count].object = object; fds->data[fds->count].rtype = rtype; fds->data[fds->count].stype = stype; fds->count++; } void remove_polled_fid(struct polled_fids* pfids, int fd){ log_dbg("Polled_fids %p removing polled fd %d.", pfids, fd); for(unsigned int i = 0; i < pfids->count; i++){ if(fd == pfids->data[i].fd){ log_dbg("FD %d FID %p removed.", fd, pfids->fid_set[i]); for(unsigned int j = i; j < pfids->count-1; j++){ pfids->fid_set[j] = pfids->fid_set[j+1]; pfids->data[j] = pfids->data[j+1]; } pfids->count -= 1; break; } } //print_polled_fids(pfids); } void remove_open_fd(struct netio_eventloop* ev, int fd){ struct open_fds* fds = &ev->openfds; for(unsigned int i = 0; i < fds->count; i++){ if(fd == fds->data[i].fd){ log_dbg("Removing from open fd record fd %d res type %s socket type %s. Current registered fds %u", fd, resource_name[fds->data[i].rtype], socket_name[fds->data[i].stype],fds->count); for(unsigned int j = i; j < fds->count-1; j++){ fds->data[j] = fds->data[j+1]; } fds->count -= 1; break; } } add_closed_fd(&ev->closedfds, fd); } int check_open_fd_exists(struct open_fds* fds, int fd){ for(unsigned int i = 0; i < fds->count; i++){ if(fd == fds->data[i].fd){ return true; } } return false; } // API FUNCTIONS /////////////////////////////////////////////////////////////// void netio_eventloop_init(struct netio_eventloop* evloop) { evloop->epollfd = epoll_create1(0); // no flag passed, same behaviour as epoll_create evloop->events = malloc(sizeof(struct epoll_event)*NETIO_MAX_EPOLL_EVENTS); init_polled_fids(&evloop->pfids, NETIO_MAX_POLLED_FIDS); init_openfds(&evloop->openfds, NETIO_MAX_POLLED_FIDS); reset_closed_fds(&evloop->closedfds); log_dbg("Creating a new eventloop with fd %d", evloop->epollfd); if(evloop->epollfd == -1) { log_fatal("Could not create epoll fd. Exit."); exit(2); } //termination signal evloop->stop_signal.data = evloop; evloop->stop_signal.cb = netio_stop; netio_signal_init(evloop, &(evloop->stop_signal)); log_dbg("stop signal initialised with fd %d", evloop->stop_signal.ev_ctx.fd ); } void netio_timer_init(struct netio_eventloop* evloop, struct netio_timer* timer) { timer->ev_ctx.fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); timer->ev_ctx.data = timer; timer->ev_ctx.cb = netio_timer_callback; if(timer->ev_ctx.fd == -1) { log_error("Could not create timerfd. The timer will not fire."); return; } log_dbg("registering timerfd %d to %d", timer->ev_ctx.fd, evloop->epollfd); netio_register_read_fd(evloop, &timer->ev_ctx); #if defined TRACK_ALL_FD add_open_fd(&evloop->openfds, timer->ev_ctx.fd, NETIO_TIMER, NOSOCKET, timer); #endif } void netio_timer_close(struct netio_eventloop* evloop, struct netio_timer* timer) { netio_timer_stop(timer); epoll_ctl(evloop->epollfd, EPOLL_CTL_DEL, timer->ev_ctx.fd, NULL); log_dbg("netio_timer_close: deregistered timer eventfd %d", timer->ev_ctx.fd); #if defined TRACK_ALL_FD int ret = close(timer->ev_ctx.fd); if(!ret){ remove_open_fd(&evloop, timer->ev_ctx.fd); } #else close(timer->ev_ctx.fd); #endif } void netio_timer_start_s(struct netio_timer* timer, unsigned long long seconds) { set_timerfd(timer->ev_ctx.fd, seconds, 0); } void netio_timer_start_ms(struct netio_timer* timer, unsigned long long milliseconds) { set_timerfd(timer->ev_ctx.fd, milliseconds/1000, (milliseconds%1000)*1000*1000); } void netio_timer_start_us(struct netio_timer* timer, unsigned long long microseconds) { set_timerfd(timer->ev_ctx.fd, microseconds/(1000*1000), (microseconds%(1000*1000)*1000)); } void netio_timer_start_ns(struct netio_timer* timer, unsigned long long nanoseconds) { set_timerfd(timer->ev_ctx.fd, nanoseconds/(1000*1000*1000), nanoseconds%(1000*1000*1000)); } void netio_timer_stop(struct netio_timer* timer) { set_timerfd(timer->ev_ctx.fd, 0, 0); } void netio_signal_init(struct netio_eventloop* evloop, struct netio_signal* signal) { signal->ev_ctx.fd = eventfd(0, EFD_NONBLOCK | EFD_SEMAPHORE); signal->ev_ctx.data = signal; if(signal->ev_ctx.fd == -1) { log_fatal("Could not open eventfd"); exit(2); } signal->ev_ctx.cb = netio_signal_callback; signal->epollfd = evloop->epollfd; netio_register_read_fd(evloop, &signal->ev_ctx); #if defined TRACK_ALL_FD add_open_fd(&evloop->openfds, signal->ev_ctx.fd, NETIO_SIGNAL, NOSOCKET, signal); #endif log_dbg("Registering eventfd %d", signal->ev_ctx.fd); } void netio_signal_no_semaphore_init(struct netio_eventloop* evloop, struct netio_signal* signal) { signal->ev_ctx.fd = eventfd(0, EFD_NONBLOCK); signal->ev_ctx.data = signal; if(signal->ev_ctx.fd == -1) { log_fatal("Could not open eventfd"); exit(2); } signal->ev_ctx.cb = netio_signal_callback; signal->epollfd = evloop->epollfd; netio_register_read_fd(evloop, &signal->ev_ctx); #if defined TRACK_ALL_FD add_open_fd(&evloop->openfds, signal->ev_ctx.fd, NETIO_SIGNAL, NOSOCKET, signal); #endif log_dbg("Registering eventfd %d", signal->ev_ctx.fd); } void netio_signal_close(struct netio_eventloop* evloop, struct netio_signal* signal) { int rc = epoll_ctl(evloop->epollfd, EPOLL_CTL_DEL, signal->ev_ctx.fd, NULL); if (rc) { log_warn("Cannot deregister signal fd %d from evloop %d, %s", signal->ev_ctx.fd, evloop->epollfd, strerror(errno)); } log_dbg("netio_signal_close: deregistered signal eventfd %d, ret %d from evloop %d", signal->ev_ctx.fd, rc, signal->epollfd); int ret = close(signal->ev_ctx.fd); if(ret) {log_warn("Cannot close %d: %s", signal->ev_ctx.fd, strerror(errno));} #if defined TRACK_ALL_FD remove_open_fd(&evloop, signal->ev_ctx.fd); #endif } void netio_signal_fire(struct netio_signal* signal) { uint64_t buf = 1; int ret = write(signal->ev_ctx.fd, &buf, 8); if( ret !=8 ){ log_error("Firing signal writing on fd %d, only %d / 8 bytes written. Errno %s", signal->ev_ctx.fd, ret, strerror(errno)); } } void netio_error_connection_refused_fire(struct netio_send_socket* socket) { struct netio_event_context* ev_ctx = malloc(sizeof(struct netio_event_context)); ev_ctx->fd = eventfd(0, EFD_NONBLOCK); // event context is freed by the callback ev_ctx->data = ev_ctx; if (ev_ctx->fd == -1) { log_fatal("Could not open eventfd for send socket error_connection_refused"); exit(2); } ev_ctx->cb = netio_error_connection_refused_callback; netio_register_read_fd(&socket->ctx->evloop, ev_ctx); add_open_fd(&socket->ctx->evloop.openfds, ev_ctx->fd, NETIO_SIGNAL, USEND, socket); log_dbg("listen_socket netio_error_connection_refused_fire: registering eventfd %d", ev_ctx->fd); // Socket is transmitted as data to the file descriptor // struct netio_send_socket* buf = socket; write(ev_ctx->fd, &socket, 8); } void netio_error_bind_refused_fire(struct netio_listen_socket* socket) { struct netio_event_context* ev_ctx = malloc(sizeof(struct netio_event_context)); ev_ctx->fd = eventfd(0, EFD_NONBLOCK); // event context is freed by the callback ev_ctx->data = ev_ctx; if (ev_ctx->fd == -1) { log_fatal("Could not open eventfd for listen socket error_bind_refused"); exit(2); } ev_ctx->cb = netio_error_bind_refused_callback; netio_register_read_fd(&socket->ctx->evloop, ev_ctx); add_open_fd(&socket->ctx->evloop.openfds, ev_ctx->fd, NETIO_SIGNAL, ULISTEN, socket); log_dbg("listen_socket netio_error_bind_refused_fire: registering eventfd %d", ev_ctx->fd); // Socket is transmitted as data to the file descriptor // struct netio_listen_socket* buf = socket; write(ev_ctx->fd, &socket, 8); } void netio_run(struct netio_eventloop* evloop) { evloop->is_running = 1; int nevents; if(evloop->cb_init != NULL) { evloop->cb_init(evloop->data); } int running=1; while(running) { // don't want to block or wait too long if we're shutting down uint64_t timeout = evloop->is_running ? NETIO_EPOLL_TIMEOUT : 10; nevents = epoll_wait(evloop->epollfd, evloop->events, NETIO_MAX_EPOLL_EVENTS, timeout); log_trc("epoll wait: %d events to process", nevents); for(int i = 0; i < nevents; ++i) { log_trc("event type: %x from fd %d", evloop->events[i].events, ((struct netio_event_context*)evloop->events[i].data.ptr)->fd); process_event((struct netio_event_context*)(evloop->events[i].data.ptr), &evloop->closedfds); if(evloop->events[i].events & EPOLLRDHUP) { struct netio_event_context* c = (struct netio_event_context*)(evloop->events[i].data.ptr); log_dbg("EPOLLRDHUP on fd %d, removing it from epoll_wait", c->fd); epoll_ctl(evloop->epollfd, EPOLL_CTL_DEL, c->fd, NULL); } } if (evloop->is_running==0 && nevents==0) { running=0; } reset_closed_fds(&evloop->closedfds); if(unlikely(nevents == -1)) { int errsv = errno; if(errsv==EINTR) { log_dbg("Eventloop: non-blocking epoll_wait returned -1: %s", strerror(errsv)); continue; } else { log_fatal("Eventloop: non-blocking epoll_wait returned -1: %s", strerror(errsv)); free(evloop->events); exit(1); } } }//end of while running log_dbg("Cleaning up eventloop"); close(evloop->epollfd); for(unsigned int i=0; i < evloop->openfds.count; ++i){ struct open_fd_data* data = &evloop->openfds.data[i]; log_warn("Evloop terminated, closing fd %d type %s socket type %s", data->fd, resource_name[data->rtype], socket_name[data->stype]); close(data->fd); } free(evloop->openfds.data); free(evloop->pfids.data); free(evloop->pfids.fid_set); free(evloop->events); } void netio_terminate(struct netio_eventloop* evloop) { if (evloop->is_running == 1){ netio_stop((void*)evloop); } } void netio_terminate_signal(struct netio_eventloop* evloop) { if (evloop->is_running == 1){ log_info("Firing termination signal"); netio_signal_fire(&evloop->stop_signal); } else { log_warn("netio_terminate_signal called but evloop not running"); } } void netio_stop(void* ptr) { struct netio_eventloop* evloop = (struct netio_eventloop*)ptr; netio_signal_close(evloop, &evloop->stop_signal); evloop->pfids.count = 0; evloop->is_running = 0; } void netio_register_read_fd(struct netio_eventloop* evloop, struct netio_event_context* ctx) { register_fd(evloop->epollfd, ctx, EPOLLIN | EPOLLRDHUP); } void netio_close_socket(struct netio_eventloop* evloop, void* socket, enum socket_type type) { //Sockets that contain more than one other socket switch (type){ case BPUB: { struct netio_publish_socket* pub = (struct netio_publish_socket*)socket; struct netio_socket_list* it = pub->subscription_table.socket_list; while(it != NULL){ if(it->socket){ struct netio_buffered_send_socket* bs_socket = (struct netio_buffered_send_socket*)it->socket; netio_disconnect(&(bs_socket->send_socket)); //Do I need to disconnect for the sake of the other side? free(it->socket); if(it->addrlen > 0){ free(it->addr); } } struct netio_socket_list* tmp = it; it = it->next; free(tmp); } netio_close_socket(evloop, (void*)(&pub->lsocket), ULISTEN); return; } case UPUB: { struct netio_unbuffered_publish_socket* upub = (struct netio_unbuffered_publish_socket*)socket; struct netio_socket_list* uit = upub->subscription_table.socket_list; while(uit != NULL){ if(uit->socket){ struct netio_send_socket* s_socket = (struct netio_send_socket*)uit->socket; netio_disconnect(s_socket); //Do I need to disconnec for the sake of the other side? free(uit->socket); if(uit->addrlen > 0){ free (uit->addr); } } struct netio_socket_list* tmp = uit; uit = uit->next; free(tmp); } netio_close_socket(evloop, (void*)(&upub->lsocket), ULISTEN); struct netio_completion_stack* cs = &upub->completion_stack; free(cs->stack); free(cs->objects); free(cs->key_array); return; } case BSUB: { struct netio_subscribe_socket* sub_socket = (struct netio_subscribe_socket*)socket; netio_disconnect(&sub_socket->socket); netio_close_socket(evloop, &sub_socket->recv_socket, BLISTEN); if (sub_socket->remote_hostname) { free((void*)sub_socket->remote_hostname); sub_socket->remote_hostname=NULL; } return; } case USUB: { struct netio_unbuffered_subscribe_socket* usub_socket = (struct netio_unbuffered_subscribe_socket*)socket; netio_disconnect(&usub_socket->socket); netio_close_socket(evloop, &usub_socket->recv_socket, ULISTEN); if (usub_socket->remote_hostname) { free((void*)usub_socket->remote_hostname); usub_socket->remote_hostname=NULL; } return; } case NOSOCKET: return; default: ; //go on with the function } struct netio_signal* signal_close_socket = malloc(sizeof(struct netio_signal)); struct signal_data* sd = malloc(sizeof(struct signal_data)); sd->signal = signal_close_socket; sd->ptr = socket; sd->evloop = evloop; switch (type){ case USEND: signal_close_socket->cb = close_send_socket; break; case BSEND: signal_close_socket->cb = close_buffered_send_socket; break; case URECV: signal_close_socket->cb = close_recv_socket; break; case BRECV: signal_close_socket->cb = close_buffered_recv_socket; break; case ULISTEN: signal_close_socket->cb = close_listen_socket; break; case BLISTEN: { struct netio_buffered_listen_socket* lsocket = (struct netio_buffered_listen_socket*)socket; sd->ptr = (void*)(&lsocket->listen_socket); signal_close_socket->cb = close_buffered_listen_socket; break; } case NOSOCKET: return; default: log_error("Could not delete socket: type unknown."); return; } signal_close_socket->data = sd; netio_signal_init(evloop, signal_close_socket); netio_signal_fire(signal_close_socket); } void netio_register_read_tcp_fd(struct netio_eventloop* evloop, struct netio_event_context* ctx) { struct epoll_event ev; log_dbg("netio_register_read_tcp_fd. Will register ctx->fd = %d with epoll", ctx->fd); ev.events = EPOLLIN | EPOLLRDHUP; ev.data.ptr = ctx; if(epoll_ctl(evloop->epollfd, EPOLL_CTL_ADD, ctx->fd, &ev)) { if (errno != EEXIST) { log_error("Could not add FD %d to epoll. %s", ctx->fd, strerror(errno)); return; } if(epoll_ctl(evloop->epollfd, EPOLL_CTL_MOD, ctx->fd, &ev)) { log_error("Could not modify FD %d in epoll %s", ctx->fd, strerror(errno)); exit(-2); } } log_dbg("Done"); } void netio_register_write_tcp_fd(struct netio_eventloop* evloop, struct netio_event_context* ctx) { struct epoll_event ev; log_dbg("netio_register_write_tcp_fd: registering ctx->fd = %d with epoll", ctx->fd); ev.events = EPOLLOUT | EPOLLRDHUP | EPOLLET; ev.data.ptr = ctx; if(epoll_ctl(evloop->epollfd, EPOLL_CTL_ADD, ctx->fd, &ev)) { if (errno != EEXIST) { log_error("Could not add FD %d to epoll. errno=%d", ctx->fd, errno); return; } if(epoll_ctl(evloop->epollfd, EPOLL_CTL_MOD, ctx->fd, &ev)) { log_error("Could not modify FD %d in epoll, errno=%d", ctx->fd, errno); exit(-2); } } log_dbg("Done"); }