LCOV - code coverage report
Current view: top level - netio-next/src - eventloop.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 318 429 74.1 %
Date: 2025-08-12 04:15:35 Functions: 36 41 87.8 %

          Line data    Source code
       1             : #include <stdio.h>
       2             : #include <unistd.h>
       3             : #include <string.h>
       4             : #include <sys/timerfd.h>
       5             : #include <sys/eventfd.h>
       6             : #include <sys/stat.h>
       7             : #include <stdio.h>
       8             : #include "log.h"
       9             : 
      10             : #include "netio/netio.h"
      11             : 
      12             : const char* resource_name[] = { "NETIO_TIMER", "NETIO_SIGNAL", "NETIO_CQ", "NETIO_EQ", "NETIO_TCP"};
      13             : const char* socket_name[] = {"BSEND", "USEND", "BRECV", "URECV", "BSUB", "USUB", "BPUB", "UPUB", "BLISTEN", "ULISTEN", "NOSOCKET"};
      14             : 
      15             : #if defined DEBUG || defined DEBUG_EV
      16             : #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
      17             : #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
      18             : #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
      19             : #else
      20             : #define log_dbg(...)
      21             : #define log_trc(...)
      22             : #endif
      23             : 
      24             : 
      25             : //#define TRACK_ALL_FD  //This option is not thread-safe, see FLX-2022.
      26             : 
      27             : /**
      28             :  * @file
      29             :  * Functions for the event loop.
      30             :  */
      31             : 
      32             : 
      33             : // STATIC FUNCTIONS ////////////////////////////////////////////////////////////
      34             : 
      35          88 : static int check_fd_is_closed(struct closed_fds* closedfds, int fd){
      36         261 :   for(int i = 0; i < closedfds->count; i++){
      37         174 :     if(closedfds->fds[i] == fd){
      38             :       return true;
      39             :     }
      40             :   }
      41             :   return false;
      42             : }
      43             : 
      44             : 
      45             : static void
      46    39844865 : process_event(struct netio_event_context* evc, struct closed_fds* closedfds)
      47             : {
      48    39844865 :   if(closedfds->count == 0 || !check_fd_is_closed(closedfds, evc->fd)){
      49    39844864 :     if(evc->cb != NULL) {
      50    39844864 :       evc->cb(evc->fd, evc->data);
      51             :     }
      52             :   }
      53    39844828 : }
      54             : 
      55        1384 : static void add_closed_fd(struct closed_fds* closedfds, int fd){
      56        1384 :   if (closedfds->count +1 >= 3*NETIO_MAX_POLLED_FIDS){log_warn("Cannot add FD %d to list of closed FDs because Array is full.", fd); return;}
      57        1384 :   closedfds->fds[closedfds->count++] = fd;
      58             : }
      59             : 
      60    38453863 : static void reset_closed_fds(struct closed_fds* closedfds){
      61    38453863 :   closedfds->count = 0;
      62    38453863 : }
      63             : 
      64             : static void
      65      134438 : set_timerfd(int fd, unsigned s, unsigned ns)
      66             : {
      67      134438 :   struct itimerspec it;
      68      134438 :   it.it_interval.tv_sec = s;
      69      134438 :   it.it_interval.tv_nsec = ns;
      70      134438 :   it.it_value.tv_sec = s;
      71      134438 :   it.it_value.tv_nsec = ns;
      72      134438 :   if(timerfd_settime(fd, 0, &it, NULL)) {
      73         177 :     log_error("Could not set timerfd %d. The timer will not fire.", fd);
      74         177 :     return;
      75             :   }
      76             : }
      77             : 
      78             : static void
      79        5062 : register_fd(int epfd, struct netio_event_context* ctx, int flags)
      80             : {
      81        5062 :     struct epoll_event ev;
      82        5062 :     ev.events = flags;
      83        5062 :     ev.data.ptr = ctx;
      84        5062 :     int rc = fcntl(ctx->fd, F_SETFL, fcntl(ctx->fd, F_GETFL) | O_NONBLOCK );
      85        5064 :     if (rc < 0) {
      86           0 :       log_error("Failed to change flags (incl. O_NONBLOCK) of file descriptor %d.", ctx->fd);
      87             :     }
      88        5064 :     log_dbg("Adding %d to epoll %d", ctx->fd, epfd);
      89        5064 :     if(epoll_ctl(epfd, EPOLL_CTL_ADD, ctx->fd, &ev))
      90             :     {
      91           0 :       log_error("Could not add file descriptor %d to epoll. Events from this resource will be neglected.", ctx->fd);
      92           0 :       return;
      93             :     }
      94             : }
      95             : 
      96             : // PRIVATE FUNCTIONS ///////////////////////////////////////////////////////////
      97             : 
      98             : void
      99     1456872 : netio_timer_callback(int fd, void* data)
     100             : {
     101     1456872 :     log_trc("timer event on FD %d.", fd);
     102     1456872 :     struct netio_timer* timer = (struct netio_timer*)data;
     103     1456872 :     uint64_t buf;
     104     1456872 :     if(8 != read(fd, &buf, 8)) {
     105     1456872 :         log_dbg("Did not read 8 bytes.");
     106             :     }
     107     1456872 :     if(timer->cb)
     108     1456872 :         timer->cb(timer->data);
     109     1456872 : }
     110             : 
     111             : void
     112     8738899 : netio_signal_callback(int fd, void* data)
     113             : {
     114     8738899 :     log_dbg("signal event on FD %d.", fd);
     115             : 
     116     8738899 :     struct netio_signal* signal = (struct netio_signal*)data;
     117     8738899 :     uint64_t buf;
     118     8738899 :     if(8 != read(fd, &buf, 8)) {
     119           0 :         log_info("Did not read 8 bytes.");
     120             :     }
     121     8738862 :     log_dbg("Count = %lu", buf);
     122     8738862 :     if(signal->cb)
     123     8738772 :         signal->cb(signal->data);
     124     8739001 : }
     125             : 
     126             : void
     127           8 : netio_error_connection_refused_callback(int fd, void* data)
     128             : {
     129           8 :     log_dbg("error event on FD %d.", fd);
     130             : 
     131           8 :     struct netio_event_context* ev_ctx = (struct netio_event_context*)data;
     132           8 :     struct netio_send_socket* socket;
     133           8 :     if(8 != read(fd, &socket, 8)) {
     134           8 :         log_dbg("Did not read 8 bytes.");
     135             :     }
     136           8 :     if (socket->cb_error_connection_refused) {
     137           8 :         socket->cb_error_connection_refused(socket);
     138             :     } else {
     139           0 :         log_error("Send socket %p has no connection refused callback set. Resources not freed.", socket);
     140             :     }
     141           8 :     free(ev_ctx);
     142           8 : }
     143             : 
     144             : void
     145           6 : netio_error_bind_refused_callback(int fd, void* data)
     146             : {
     147           6 :     log_dbg("error event on FD %d.", fd);
     148             : 
     149           6 :     struct netio_event_context* ev_ctx = (struct netio_event_context*)data;
     150           6 :     struct netio_listen_socket* socket;
     151           6 :     if(8 != read(fd, &socket, 8)) {
     152           6 :       log_dbg("Did not read 8 bytes.");
     153             :     }
     154           6 :     if (socket->cb_error_bind_refused) {
     155           6 :       socket->cb_error_bind_refused(socket);
     156             :     } else {
     157           0 :       log_error("Listen socket %p has no bind refused callback set. Resources not freed.", socket);
     158             :     }
     159           6 :     free(ev_ctx);
     160           6 : }
     161             : 
     162             : // INTERNAL RESOURCES ///////////////////////////////////////////////////////////////
     163             : 
     164             : void
     165         558 : init_polled_fids(struct polled_fids* pfids, int initial_size){
     166         558 :   pfids->size = initial_size;
     167         558 :   pfids->count = 0;
     168         558 :   pfids->fabric = NULL;
     169         558 :   pfids->fid_set = malloc(pfids->size*sizeof(struct fid*));
     170         558 :   pfids->data  = malloc(pfids->size*sizeof(struct polled_fids_data));
     171         558 : }
     172             : 
     173             : void
     174         552 : init_openfds(struct open_fds* fds, int initial_size){
     175         552 :   fds->size = initial_size;
     176         552 :   fds->count = 0;
     177         552 :   fds->data  = malloc(fds->size*sizeof(struct open_fd_data));
     178         552 : }
     179             : 
     180             : 
     181             : void
     182           0 : print_polled_fids(struct polled_fids* pfids){
     183           0 :   printf("Number of polled fds %u \n", pfids->count);
     184           0 :   printf("FD \t FID \n");
     185           0 :   for(unsigned int i=0; i < pfids->count; ++i){
     186           0 :     printf("%d \t %p \n", pfids->data[i].fd, pfids->fid_set[i]);
     187             :   }
     188           0 :   printf("-------------------\n");
     189           0 : }
     190             : 
     191             : void
     192           0 : print_openfds(struct open_fds* fds){
     193           0 :   printf("Number of open fds %u \n", fds->count);
     194           0 :   printf("===============================================\n");
     195           0 :   printf("FD \t RESOURCE \t SOCKET \t OBJ ADDR \n");
     196           0 :   printf("-----------------------------------------------\n");
     197           0 :   for(unsigned int i=0; i < fds->count; ++i){
     198           0 :     int r = fds->data[i].rtype;
     199           0 :     int s = fds->data[i].stype;
     200           0 :     printf("%d\t%s\t%s\t%p\n", fds->data[i].fd, resource_name[r], socket_name[s], fds->data[i].object);
     201             :   }
     202           0 :   printf("===============================================\n");
     203           0 : }
     204             : 
     205             : 
     206             : void
     207        1494 : add_polled_fid(struct polled_fids* pfids, struct fid_fabric* fab, struct fid* fid, int fd, void* socket, void (*cb)(int,void*)){
     208        1494 :   if(pfids->size <= pfids->count){
     209           1 :     log_dbg("Reallocing polled fids");
     210           1 :     pfids->fid_set = realloc(pfids->fid_set, 2*(pfids->size)*sizeof(struct fid*));
     211           1 :     pfids->data = realloc(pfids->data, 2*(pfids->size)*sizeof(struct polled_fids_data));
     212           1 :     pfids->size *= 2;
     213        1494 :   };
     214        1494 :   log_dbg("Polled_fids %p Adding polled fd %d fid %p.", pfids, fd, fid);
     215        1494 :   pfids->fabric = fab;
     216        1494 :   pfids->fid_set[pfids->count] = fid;
     217        1494 :   pfids->data[pfids->count].fd = fd;
     218        1494 :   pfids->data[pfids->count].socket = socket;
     219        1494 :   pfids->data[pfids->count].cb = cb;
     220        1494 :   pfids->count++;
     221             :   //print_polled_fids(pfids);
     222        1494 : };
     223             : 
     224             : void
     225        1957 : add_open_fd(struct open_fds* fds, int fd, enum resource_type rtype, enum socket_type stype, void* object){
     226        1957 :   if(fds->size <= fds->count){
     227           0 :     log_dbg("Reallocing open fds");
     228           0 :     fds->data = realloc(fds->data, 2*(fds->size)*sizeof(struct open_fd_data));
     229           0 :     fds->size *= 2;
     230             :   };
     231             :   log_dbg("New open fd %d res type %s socket type %s", fd, resource_name[rtype], socket_name[stype]);
     232        5448 :   for(unsigned int i=0; i<fds->count; ++i){
     233        3491 :     if (fd == fds->data[i].fd){
     234           0 :       log_error("Adding again fd % to open fds!", fd);
     235             :     }
     236             :   }
     237        1957 :   fds->data[fds->count].fd = fd;
     238        1957 :   fds->data[fds->count].object = object;
     239        1957 :   fds->data[fds->count].rtype = rtype;
     240        1957 :   fds->data[fds->count].stype = stype;
     241        1957 :   fds->count++;
     242        1957 : }
     243             : 
     244             : 
     245             : void
     246        1231 : remove_polled_fid(struct polled_fids* pfids, int fd){
     247        1231 :   log_dbg("Polled_fids %p removing polled fd %d.", pfids, fd);
     248        1945 :   for(unsigned int i = 0; i < pfids->count; i++){
     249        1881 :     if(fd == pfids->data[i].fd){
     250             :       log_dbg("FD %d FID %p removed.", fd, pfids->fid_set[i]);
     251        2026 :       for(unsigned int j = i; j < pfids->count-1; j++){
     252         859 :         pfids->fid_set[j] = pfids->fid_set[j+1];
     253         859 :         pfids->data[j] = pfids->data[j+1];
     254             :       }
     255        1167 :       pfids->count -= 1;
     256        1167 :       break;
     257             :     }
     258             :   }
     259             :   //print_polled_fids(pfids);
     260        1231 : }
     261             : 
     262             : 
     263             : void
     264        1384 : remove_open_fd(struct netio_eventloop* ev, int fd){
     265        1384 :   struct open_fds* fds = &ev->openfds;
     266        2703 :   for(unsigned int i = 0; i < fds->count; i++){
     267        2701 :     if(fd == fds->data[i].fd){
     268             :       log_dbg("Removing from open fd record fd %d res type %s socket type %s. Current registered fds %u", fd, resource_name[fds->data[i].rtype], socket_name[fds->data[i].stype],fds->count);
     269        2559 :       for(unsigned int j = i; j < fds->count-1; j++){
     270        1177 :         fds->data[j] = fds->data[j+1];
     271             :       }
     272        1382 :       fds->count -= 1;
     273        1382 :       break;
     274             :     }
     275             :   }
     276        1384 :   add_closed_fd(&ev->closedfds, fd);
     277        1384 : }
     278             : 
     279             : 
     280             : int
     281           0 : check_open_fd_exists(struct open_fds* fds, int fd){
     282           0 :   for(unsigned int i = 0; i < fds->count; i++){
     283           0 :     if(fd == fds->data[i].fd){
     284             :       return true;
     285             :     }
     286             :   }
     287             :   return false;
     288             : }
     289             : 
     290             : 
     291             : // API FUNCTIONS ///////////////////////////////////////////////////////////////
     292             : 
     293             : /*! \brief Initializes a NetIO event loop.
     294             :  *
     295             :  *  In the background this creates an epoll file descriptor handle.
     296             :  *
     297             :  * @param evloop The event loop to initialize
     298             :  */
     299             : void
     300         552 : netio_eventloop_init(struct netio_eventloop* evloop)
     301             : {
     302         552 :     evloop->epollfd = epoll_create1(0); // no flag passed, same behaviour as epoll_create
     303         552 :     evloop->events = malloc(sizeof(struct epoll_event)*NETIO_MAX_EPOLL_EVENTS);
     304         552 :     init_polled_fids(&evloop->pfids, NETIO_MAX_POLLED_FIDS);
     305         552 :     init_openfds(&evloop->openfds, NETIO_MAX_POLLED_FIDS);
     306         552 :     reset_closed_fds(&evloop->closedfds);
     307         552 :     log_dbg("Creating a new eventloop with fd %d", evloop->epollfd);
     308             : 
     309         552 :     if(evloop->epollfd == -1) {
     310           0 :       log_fatal("Could not create epoll fd. Exit.");
     311           0 :       exit(2);
     312             :     }
     313             : 
     314             :     //termination signal
     315         552 :     evloop->stop_signal.data = evloop;
     316         552 :     evloop->stop_signal.cb = netio_stop;
     317         552 :     netio_signal_init(evloop, &(evloop->stop_signal));
     318         552 :     log_dbg("stop signal initialised with fd %d", evloop->stop_signal.ev_ctx.fd );
     319         552 : }
     320             : 
     321             : /**
     322             :  * Initializes a timer and registers it with the event loop.
     323             :  *
     324             :  * Internally, timers are implemented using `timerfd`.
     325             :  *
     326             :  * @param evloop The event loop in which the timer will be registered
     327             :  */
     328             : void
     329        1192 : netio_timer_init(struct netio_eventloop* evloop, struct netio_timer* timer)
     330             : {
     331        1192 :   timer->ev_ctx.fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
     332        1191 :   timer->ev_ctx.data = timer;
     333        1191 :   timer->ev_ctx.cb = netio_timer_callback;
     334        1191 :   if(timer->ev_ctx.fd == -1)
     335             :   {
     336           0 :     log_error("Could not create timerfd. The timer will not fire.");
     337           0 :     return;
     338             :   }
     339        1191 :   log_dbg("registering timerfd %d to %d", timer->ev_ctx.fd, evloop->epollfd);
     340        1191 :   netio_register_read_fd(evloop, &timer->ev_ctx);
     341             : #if defined TRACK_ALL_FD
     342             :   add_open_fd(&evloop->openfds, timer->ev_ctx.fd, NETIO_TIMER, NOSOCKET, timer);
     343             : #endif
     344             : }
     345             : 
     346             : /**
     347             :  * Deregisters a timer from the event loop and closes its file descriptor.
     348             :  *
     349             :  * @param evloop The event loop in which the timer is registered
     350             :  * @param timer The timer to unregister and close
     351             :  */
     352             : void
     353        1111 : netio_timer_close(struct netio_eventloop* evloop, struct netio_timer* timer)
     354             : {
     355        1111 :   netio_timer_stop(timer);
     356        1111 :   epoll_ctl(evloop->epollfd, EPOLL_CTL_DEL, timer->ev_ctx.fd, NULL);
     357        1111 :   log_dbg("netio_timer_close: deregistered timer eventfd %d", timer->ev_ctx.fd);
     358             : #if defined TRACK_ALL_FD
     359             :   int ret = close(timer->ev_ctx.fd);
     360             :   if(!ret){
     361             :     remove_open_fd(&evloop, timer->ev_ctx.fd);
     362             :   }
     363             : #else
     364        1111 :   close(timer->ev_ctx.fd);
     365             : #endif
     366        1111 : }
     367             : 
     368             : /**
     369             :  * Starts a timer with the defined period (given in seconds).
     370             :  *
     371             :  * The period is given in seconds. The timer callback is executed at the
     372             :  * defined frequency until it is explicitly stopped.
     373             :  *
     374             :  * @param timer The timer to start
     375             :  * @param seconds The timer period, given in seconds
     376             :  */
     377             : void
     378          80 : netio_timer_start_s(struct netio_timer* timer, unsigned long long seconds)
     379             : {
     380          80 :   set_timerfd(timer->ev_ctx.fd, seconds, 0);
     381          80 : }
     382             : 
     383             : /**
     384             :  * Start a timer with the defined period (given in milliseconds).
     385             :  * @see netio_timer_start_s
     386             :  */
     387             : void
     388       66616 : netio_timer_start_ms(struct netio_timer* timer, unsigned long long milliseconds)
     389             : {
     390       66616 :   set_timerfd(timer->ev_ctx.fd, milliseconds/1000, (milliseconds%1000)*1000*1000);
     391       66616 : }
     392             : 
     393             : /**
     394             :  * Start a timer with the defined period (given in microseconds).
     395             :  * @see netio_timer_start_s
     396             :  */
     397             : void
     398          27 : netio_timer_start_us(struct netio_timer* timer, unsigned long long microseconds)
     399             : {
     400          27 :   set_timerfd(timer->ev_ctx.fd, microseconds/(1000*1000), (microseconds%(1000*1000)*1000));
     401          27 : }
     402             : 
     403             : /**
     404             :  * Start a timer with the defined period (given in nanoseconds).
     405             :  * @see netio_timer_start_s
     406             :  */
     407             : void
     408           0 : netio_timer_start_ns(struct netio_timer* timer, unsigned long long nanoseconds)
     409             : {
     410           0 :   set_timerfd(timer->ev_ctx.fd, nanoseconds/(1000*1000*1000), nanoseconds%(1000*1000*1000));
     411           0 : }
     412             : 
     413             : /**
     414             :  * Stops a timer.
     415             :  *
     416             :  * The timer will not execute callbacks anymore until it is started again.
     417             :  *
     418             :  * @param timer The timer to stop
     419             :  */
     420             : void
     421       67715 : netio_timer_stop(struct netio_timer* timer) {
     422       67715 :     set_timerfd(timer->ev_ctx.fd, 0, 0);
     423       67715 : }
     424             : 
     425             : 
     426             : /**
     427             :  * Initializes a signal and registers it in the event loop.
     428             :  *
     429             :  * Internally, signals are implemented using `eventfd`.
     430             :  *
     431             :  * @param evloop The event loop in which the signal will be registered
     432             :  * @param signal The signal to initialize
     433             :  */
     434             : void
     435        2132 : netio_signal_init(struct netio_eventloop* evloop, struct netio_signal* signal)
     436             : {
     437        2132 :     signal->ev_ctx.fd = eventfd(0, EFD_NONBLOCK | EFD_SEMAPHORE);
     438        2132 :     signal->ev_ctx.data = signal;
     439        2132 :     if(signal->ev_ctx.fd == -1)
     440             :     {
     441           0 :       log_fatal("Could not open eventfd");
     442           0 :       exit(2);
     443             :     }
     444             : 
     445        2132 :     signal->ev_ctx.cb = netio_signal_callback;
     446        2132 :     signal->epollfd = evloop->epollfd;
     447        2132 :     netio_register_read_fd(evloop, &signal->ev_ctx);
     448             : #if defined TRACK_ALL_FD
     449             :     add_open_fd(&evloop->openfds, signal->ev_ctx.fd, NETIO_SIGNAL, NOSOCKET, signal);
     450             : #endif
     451        2132 :     log_dbg("Registering eventfd %d", signal->ev_ctx.fd);
     452        2132 : }
     453             : 
     454             : 
     455             : void
     456           0 : netio_signal_no_semaphore_init(struct netio_eventloop* evloop, struct netio_signal* signal)
     457             : {
     458           0 :     signal->ev_ctx.fd = eventfd(0, EFD_NONBLOCK);
     459           0 :     signal->ev_ctx.data = signal;
     460           0 :     if(signal->ev_ctx.fd == -1)
     461             :     {
     462           0 :       log_fatal("Could not open eventfd");
     463           0 :       exit(2);
     464             :     }
     465             : 
     466           0 :     signal->ev_ctx.cb = netio_signal_callback;
     467           0 :     signal->epollfd = evloop->epollfd;
     468           0 :     netio_register_read_fd(evloop, &signal->ev_ctx);
     469             : #if defined TRACK_ALL_FD
     470             :     add_open_fd(&evloop->openfds, signal->ev_ctx.fd, NETIO_SIGNAL, NOSOCKET, signal);
     471             : #endif
     472           0 :     log_dbg("Registering eventfd %d", signal->ev_ctx.fd);
     473           0 : }
     474             : 
     475             : 
     476             : 
     477             : 
     478             : /**
     479             :  * Deregisters a signal from the event loop and closes its file descriptor.
     480             :  *
     481             :  * @param evloop The event loop in which the signal will be registered
     482             :  * @param signal The signal to initialize
     483             :  */
     484             : void
     485        1797 : netio_signal_close(struct netio_eventloop* evloop, struct netio_signal* signal)
     486             : {
     487        1797 :     int rc = epoll_ctl(evloop->epollfd, EPOLL_CTL_DEL, signal->ev_ctx.fd, NULL);
     488        1797 :     if (rc) {
     489         160 :       log_warn("Cannot deregister signal fd %d from evloop %d, %s", signal->ev_ctx.fd, evloop->epollfd, strerror(errno));
     490             :     }
     491        1797 :     log_dbg("netio_signal_close: deregistered signal eventfd %d, ret %d from evloop %d", signal->ev_ctx.fd, rc, signal->epollfd);
     492        1797 :     int ret = close(signal->ev_ctx.fd);
     493        1797 :     if(ret) {log_warn("Cannot close %d: %s", signal->ev_ctx.fd, strerror(errno));}
     494             : #if defined TRACK_ALL_FD
     495             :     remove_open_fd(&evloop, signal->ev_ctx.fd);
     496             : #endif
     497        1797 : }
     498             : 
     499             : 
     500             : /**
     501             :  * Fires a signal.
     502             :  *
     503             :  * Firing the signal triggers the execution of the signal's callback. Firing
     504             :  * a signal is thread-safe.
     505             :  *
     506             :  * @param signal The signal to fire
     507             :  */
     508             : void
     509     8739042 : netio_signal_fire(struct netio_signal* signal)
     510             : {
     511     8739042 :     uint64_t buf = 1;
     512     8739042 :     int ret = write(signal->ev_ctx.fd, &buf, 8);
     513     8739004 :     if( ret !=8 ){
     514           0 :       log_error("Firing signal writing on fd %d, only %d / 8 bytes written. Errno %s", signal->ev_ctx.fd, ret, strerror(errno));
     515             :     }
     516     8739004 : }
     517             : 
     518             : /**
     519             :  * Fires a callback for error_connection_refused.
     520             :  *
     521             :  * Firing triggers the execution of the error_connection_refused callback.
     522             :  *
     523             :  * @param socket The socket to use as parameter for the callback
     524             :  */
     525           8 : void netio_error_connection_refused_fire(struct netio_send_socket* socket)
     526             : {
     527           8 :     struct netio_event_context* ev_ctx = malloc(sizeof(struct netio_event_context));
     528           8 :     ev_ctx->fd = eventfd(0, EFD_NONBLOCK);
     529             :     // event context is freed by the callback
     530           8 :     ev_ctx->data = ev_ctx;
     531           8 :     if (ev_ctx->fd == -1)
     532             :     {
     533           0 :       log_fatal("Could not open eventfd for send socket error_connection_refused");
     534           0 :       exit(2);
     535             :     }
     536             : 
     537           8 :     ev_ctx->cb = netio_error_connection_refused_callback;
     538             : 
     539           8 :     netio_register_read_fd(&socket->ctx->evloop, ev_ctx);
     540           8 :     add_open_fd(&socket->ctx->evloop.openfds, ev_ctx->fd, NETIO_SIGNAL, USEND, socket);
     541           8 :     log_dbg("listen_socket netio_error_connection_refused_fire: registering eventfd %d", ev_ctx->fd);
     542             : 
     543             :     // Socket is transmitted as data to the file descriptor
     544             :     // struct netio_send_socket* buf = socket;
     545           8 :     write(ev_ctx->fd, &socket, 8);
     546           8 : }
     547             : 
     548             : /**
     549             :  * Fires a callback for error_bind_refused.
     550             :  *
     551             :  * Firing triggers the execution of the error_bind_refused callback.
     552             :  *
     553             :  * @param socket The socket to use as parameter for the callback
     554             :  */
     555           6 : void netio_error_bind_refused_fire(struct netio_listen_socket* socket)
     556             : {
     557           6 :     struct netio_event_context* ev_ctx = malloc(sizeof(struct netio_event_context));
     558           6 :     ev_ctx->fd = eventfd(0, EFD_NONBLOCK);
     559             :     // event context is freed by the callback
     560           6 :     ev_ctx->data = ev_ctx;
     561           6 :     if (ev_ctx->fd == -1)
     562             :     {
     563           0 :       log_fatal("Could not open eventfd for listen socket error_bind_refused");
     564           0 :       exit(2);
     565             :     }
     566             : 
     567           6 :     ev_ctx->cb = netio_error_bind_refused_callback;
     568             : 
     569           6 :     netio_register_read_fd(&socket->ctx->evloop, ev_ctx);
     570           6 :     add_open_fd(&socket->ctx->evloop.openfds, ev_ctx->fd, NETIO_SIGNAL, ULISTEN, socket);
     571           6 :     log_dbg("listen_socket netio_error_bind_refused_fire: registering eventfd %d", ev_ctx->fd);
     572             : 
     573             :     // Socket is transmitted as data to the file descriptor
     574             :     // struct netio_listen_socket* buf = socket;
     575           6 :     write(ev_ctx->fd, &socket, 8);
     576           6 : }
     577             : 
     578             : 
     579             : /**
     580             :  * Executes the event loop.
     581             :  *
     582             :  * The event loop is executed in an endless loop until it is explicitly
     583             :  * terminated by `netio_terminate`. Before any processing any other event,
     584             :  * `netio_run` will execute this initialization callback, if one was specified.
     585             :  * The core of the event loop is epoll_wait.
     586             :  * Note that epoll_wait returns only one event per fd, so MAX_EPOLL_EVENTS actually
     587             :  * translates into the maximum number of fds that are processed in one iteration
     588             :  * (the remaining fds are processed in a round-robin fashion in the next iteration).
     589             :  *
     590             :  * @param evloop The event loop to execute.
     591             :  */
     592             : void
     593         552 : netio_run(struct netio_eventloop* evloop)
     594             : {
     595         552 :   evloop->is_running = 1;
     596         552 :   int nevents;
     597             : 
     598         552 :   if(evloop->cb_init != NULL) {
     599         523 :       evloop->cb_init(evloop->data);
     600             :   }
     601             :   int running=1;
     602    38453675 :   while(running) {
     603             :     // don't want to block or wait too long if we're shutting down
     604    38453224 :     uint64_t timeout = evloop->is_running ? NETIO_EPOLL_TIMEOUT : 10;
     605    38453224 :     nevents = epoll_wait(evloop->epollfd, evloop->events, NETIO_MAX_EPOLL_EVENTS,
     606             :           timeout);
     607    38453224 :     log_trc("epoll wait: %d events to process", nevents);
     608             : 
     609    78297935 :     for(int i = 0; i < nevents; ++i)
     610             :     {
     611    39844969 :       log_trc("event type: %x from fd %d", evloop->events[i].events, ((struct netio_event_context*)evloop->events[i].data.ptr)->fd);
     612    39844969 :       process_event((struct netio_event_context*)(evloop->events[i].data.ptr), &evloop->closedfds);
     613    39844711 :       if(evloop->events[i].events & EPOLLRDHUP)
     614             :       {
     615          21 :         struct netio_event_context* c = (struct netio_event_context*)(evloop->events[i].data.ptr);
     616          21 :         log_dbg("EPOLLRDHUP on fd %d, removing it from epoll_wait", c->fd);
     617          21 :         epoll_ctl(evloop->epollfd, EPOLL_CTL_DEL, c->fd, NULL);
     618             :       }
     619             :     }
     620    38453307 :     if (evloop->is_running==0 && nevents==0) {
     621         451 :        running=0;
     622             :     }
     623    38453307 :     reset_closed_fds(&evloop->closedfds);
     624    38453123 :     if(unlikely(nevents == -1))
     625             :     {
     626           4 :       int errsv = errno;
     627           4 :       if(errsv==EINTR) {
     628           4 :         log_dbg("Eventloop: non-blocking epoll_wait returned -1: %s", strerror(errsv));
     629           4 :         continue;
     630             :       }
     631             :       else {
     632           0 :         log_fatal("Eventloop: non-blocking epoll_wait returned -1: %s", strerror(errsv));
     633           0 :         free(evloop->events);
     634           0 :         exit(1);
     635             :       }
     636             :     }
     637             :   }//end of while running
     638             : 
     639         451 :   log_dbg("Cleaning up eventloop");
     640         451 :   close(evloop->epollfd);
     641         825 :   for(unsigned int i=0; i < evloop->openfds.count; ++i){
     642         374 :     struct open_fd_data* data = &evloop->openfds.data[i];
     643         374 :     log_warn("Evloop terminated, closing fd %d type %s socket type %s", data->fd, resource_name[data->rtype], socket_name[data->stype]);
     644         374 :     close(data->fd);
     645             :   }
     646         451 :   free(evloop->openfds.data);
     647         451 :   free(evloop->pfids.data);
     648         451 :   free(evloop->pfids.fid_set);
     649         451 :   free(evloop->events);
     650         451 : }
     651             : 
     652             : 
     653             : /**
     654             :  * Terminates the event loop.
     655             :  *
     656             :  * `netio_terminate` will stop the execution of the event loop. This will *not*
     657             :  * interrupt any event processing code or user callback that is currently being
     658             :  * executed. Event loop execution will halt after the processing of the  current
     659             :  * event has completed.
     660             :  *
     661             :  * @param evloop The event loop to terminate
     662             :  */
     663             : void
     664         614 : netio_terminate(struct netio_eventloop* evloop)
     665             : {
     666         614 :   if (evloop->is_running == 1){
     667         104 :     netio_stop((void*)evloop);
     668             :   }
     669         614 : }
     670             : 
     671             : 
     672             : /**
     673             :  * Terminates the event loop.
     674             :  *
     675             :  * `netio_terminate_signal` will add a signal to the eventloop
     676             :  *  such that queued events, including those related to resources deallocation,
     677             :  *  can be executed.
     678             :  *
     679             :  * @param evloop The event loop to terminate
     680             :  */
     681             : void
     682         452 : netio_terminate_signal(struct netio_eventloop* evloop)
     683             : {
     684         452 :   if (evloop->is_running == 1){
     685         376 :     log_info("Firing termination signal");
     686         376 :     netio_signal_fire(&evloop->stop_signal);
     687             :   } else {
     688          76 :     log_warn("netio_terminate_signal called but evloop not running");
     689             :   }
     690         452 : }
     691             : 
     692             : void
     693         451 : netio_stop(void* ptr)
     694             : {
     695         451 :   struct netio_eventloop* evloop = (struct netio_eventloop*)ptr;
     696         451 :   netio_signal_close(evloop, &evloop->stop_signal);
     697         451 :   evloop->pfids.count = 0;
     698         451 :   evloop->is_running = 0;
     699         451 : }
     700             : 
     701             : void
     702        5064 : netio_register_read_fd(struct netio_eventloop* evloop, struct netio_event_context* ctx)
     703             : {
     704        5064 :   register_fd(evloop->epollfd, ctx, EPOLLIN | EPOLLRDHUP);
     705        5063 : }
     706             : 
     707             : 
     708             : /**
     709             :  * Closes a socket.
     710             :  *
     711             :  * `netio_close_socket` will deallocate the resources associated to a socket
     712             :  * and close the corresponding file descriptor. The type of socket is specified
     713             :  * by the third argument. The socket is not freed as the function ignores whether
     714             :  * it had been allocated statically or dynamically.
     715             :  *
     716             :  * @param evloop The event loop to which the socket belongs
     717             :  * @param socket Pointer to the socket
     718             :  * @param type Type of socket as defined by enum socket_type.
     719             :  */
     720             : void
     721         206 : netio_close_socket(struct netio_eventloop* evloop, void* socket, enum socket_type type)
     722             : {
     723             :   //Sockets that contain more than one other socket
     724         206 :   switch (type){
     725           0 :     case BPUB: {
     726           0 :       struct netio_publish_socket* pub = (struct netio_publish_socket*)socket;
     727           0 :       struct netio_socket_list* it = pub->subscription_table.socket_list;
     728           0 :       while(it != NULL){
     729           0 :         if(it->socket){
     730           0 :           struct netio_buffered_send_socket* bs_socket = (struct netio_buffered_send_socket*)it->socket;
     731           0 :           netio_disconnect(&(bs_socket->send_socket)); //Do I need to disconnect for the sake of the other side?
     732           0 :           free(it->socket);
     733           0 :           if(it->addrlen > 0){ free(it->addr); }
     734             :         }
     735           0 :         struct netio_socket_list* tmp = it;
     736           0 :         it = it->next;
     737           0 :         free(tmp);
     738             :       }
     739           0 :       netio_close_socket(evloop, (void*)(&pub->lsocket), ULISTEN);
     740           0 :       return;
     741             :     }
     742             : 
     743           0 :     case UPUB: {
     744           0 :       struct netio_unbuffered_publish_socket* upub = (struct netio_unbuffered_publish_socket*)socket;
     745           0 :       struct netio_socket_list* uit = upub->subscription_table.socket_list;
     746           0 :       while(uit != NULL){
     747           0 :         if(uit->socket){
     748           0 :           struct netio_send_socket* s_socket = (struct netio_send_socket*)uit->socket;
     749           0 :           netio_disconnect(s_socket); //Do I need to disconnec for the sake of the other side?
     750           0 :           free(uit->socket);
     751           0 :           if(uit->addrlen > 0){ free (uit->addr); }
     752             :         }
     753           0 :         struct netio_socket_list* tmp = uit;
     754           0 :         uit = uit->next;
     755           0 :         free(tmp);
     756             :       }
     757           0 :       netio_close_socket(evloop, (void*)(&upub->lsocket), ULISTEN);
     758           0 :       struct netio_completion_stack* cs = &upub->completion_stack;
     759           0 :       free(cs->stack);
     760           0 :       free(cs->objects);
     761           0 :       free(cs->key_array);
     762           0 :       return;
     763             :     }
     764             : 
     765          61 :     case BSUB: {
     766          61 :       struct netio_subscribe_socket* sub_socket = (struct netio_subscribe_socket*)socket;
     767          61 :       netio_disconnect(&sub_socket->socket);
     768          61 :       netio_close_socket(evloop, &sub_socket->recv_socket, BLISTEN);
     769          61 :       if (sub_socket->remote_hostname) {
     770          61 :         free((void*)sub_socket->remote_hostname);
     771          61 :         sub_socket->remote_hostname=NULL;
     772             :       }
     773             :       return;
     774             :     }
     775             : 
     776          41 :     case USUB: {
     777          41 :       struct netio_unbuffered_subscribe_socket* usub_socket = (struct netio_unbuffered_subscribe_socket*)socket;
     778          41 :       netio_disconnect(&usub_socket->socket);
     779          41 :       netio_close_socket(evloop, &usub_socket->recv_socket, ULISTEN);
     780          41 :       if (usub_socket->remote_hostname) {
     781          41 :         free((void*)usub_socket->remote_hostname);
     782          41 :         usub_socket->remote_hostname=NULL;
     783             :       }
     784             :       return;
     785             :     }
     786             : 
     787             :     case NOSOCKET:
     788             :       return;
     789             : 
     790         104 :     default:
     791         104 :       ;
     792             :       //go on with the function
     793             :   }
     794             : 
     795         104 :   struct netio_signal* signal_close_socket = malloc(sizeof(struct netio_signal));
     796         104 :   struct signal_data* sd = malloc(sizeof(struct signal_data));
     797         104 :   sd->signal = signal_close_socket;
     798         104 :   sd->ptr = socket;
     799         104 :   sd->evloop = evloop;
     800             : 
     801         104 :   switch (type){
     802           0 :     case USEND:
     803           0 :       signal_close_socket->cb = close_send_socket;
     804           0 :       break;
     805           0 :     case BSEND:
     806           0 :       signal_close_socket->cb = close_buffered_send_socket;
     807           0 :       break;
     808           2 :     case URECV:
     809           2 :       signal_close_socket->cb = close_recv_socket;
     810           2 :       break;
     811           0 :     case BRECV:
     812           0 :       signal_close_socket->cb = close_buffered_recv_socket;
     813           0 :       break;
     814          41 :     case ULISTEN:
     815          41 :       signal_close_socket->cb = close_listen_socket;
     816          41 :       break;
     817          61 :     case BLISTEN: {
     818          61 :       struct netio_buffered_listen_socket* lsocket = (struct netio_buffered_listen_socket*)socket;
     819          61 :       sd->ptr = (void*)(&lsocket->listen_socket);
     820          61 :       signal_close_socket->cb = close_buffered_listen_socket;
     821          61 :       break;
     822             :     }
     823             :     case NOSOCKET:
     824             :       return;
     825             : 
     826           0 :     default:
     827           0 :       log_error("Could not delete socket: type unknown.");
     828           0 :       return;
     829             :   }
     830         104 :   signal_close_socket->data = sd;
     831         104 :   netio_signal_init(evloop, signal_close_socket);
     832         104 :   netio_signal_fire(signal_close_socket);
     833             : }
     834             : 
     835             : 
     836         358 : void netio_register_read_tcp_fd(struct netio_eventloop* evloop, struct netio_event_context* ctx)
     837             : {
     838         358 :   struct epoll_event ev;
     839             : 
     840         358 :   log_dbg("netio_register_read_tcp_fd.  Will register ctx->fd = %d with epoll", ctx->fd);
     841         358 :   ev.events = EPOLLIN | EPOLLRDHUP;
     842         358 :   ev.data.ptr = ctx;
     843             : 
     844         358 :   if(epoll_ctl(evloop->epollfd, EPOLL_CTL_ADD, ctx->fd, &ev))
     845             :   {
     846         158 :     if (errno != EEXIST) {
     847           0 :       log_error("Could not add FD %d to epoll. %s", ctx->fd, strerror(errno));
     848           0 :       return;
     849             :     }
     850         158 :     if(epoll_ctl(evloop->epollfd, EPOLL_CTL_MOD, ctx->fd, &ev))
     851             :     {
     852           0 :       log_error("Could not modify FD %d in epoll %s", ctx->fd, strerror(errno));
     853           0 :       exit(-2);
     854             :     }
     855             :   }
     856         358 :   log_dbg("Done");
     857             : }
     858             : 
     859             : 
     860         234 : void netio_register_write_tcp_fd(struct netio_eventloop* evloop, struct netio_event_context* ctx)
     861             : {
     862         234 :   struct epoll_event ev;
     863             : 
     864         234 :   log_dbg("netio_register_write_tcp_fd: registering ctx->fd = %d with epoll", ctx->fd);
     865         234 :   ev.events = EPOLLOUT | EPOLLRDHUP | EPOLLET;
     866         234 :   ev.data.ptr = ctx;
     867             : 
     868         234 :   if(epoll_ctl(evloop->epollfd, EPOLL_CTL_ADD, ctx->fd, &ev))
     869             :   {
     870         117 :     if (errno != EEXIST) {
     871           0 :       log_error("Could not add FD %d to epoll. errno=%d", ctx->fd, errno);
     872           0 :       return;
     873             :     }
     874         117 :     if(epoll_ctl(evloop->epollfd, EPOLL_CTL_MOD, ctx->fd, &ev))
     875             :     {
     876           0 :       log_error("Could not modify FD %d in epoll, errno=%d", ctx->fd, errno);
     877           0 :       exit(-2);
     878             :     }
     879             :   }
     880         234 :   log_dbg("Done");
     881             : }

Generated by: LCOV version 1.0