Line data Source code
1 : #include <stdio.h>
2 : #include <unistd.h>
3 : #include <string.h>
4 : #include <sys/timerfd.h>
5 : #include <sys/eventfd.h>
6 : #include <sys/stat.h>
7 : #include <stdio.h>
8 : #include "log.h"
9 :
10 : #include "netio/netio.h"
11 :
12 : const char* resource_name[] = { "NETIO_TIMER", "NETIO_SIGNAL", "NETIO_CQ", "NETIO_EQ", "NETIO_TCP"};
13 : const char* socket_name[] = {"BSEND", "USEND", "BRECV", "URECV", "BSUB", "USUB", "BPUB", "UPUB", "BLISTEN", "ULISTEN", "NOSOCKET"};
14 :
15 : #if defined DEBUG || defined DEBUG_EV
16 : #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
17 : #define log_dbg(...) log_log(LOG_DEBUG, __FILENAME__, __LINE__, __VA_ARGS__)
18 : #define log_trc(...) log_log(LOG_TRACE, __FILENAME__, __LINE__, __VA_ARGS__)
19 : #else
20 : #define log_dbg(...)
21 : #define log_trc(...)
22 : #endif
23 :
24 :
25 : //#define TRACK_ALL_FD //This option is not thread-safe, see FLX-2022.
26 :
27 : /**
28 : * @file
29 : * Functions for the event loop.
30 : */
31 :
32 :
33 : // STATIC FUNCTIONS ////////////////////////////////////////////////////////////
34 :
35 88 : static int check_fd_is_closed(struct closed_fds* closedfds, int fd){
36 261 : for(int i = 0; i < closedfds->count; i++){
37 174 : if(closedfds->fds[i] == fd){
38 : return true;
39 : }
40 : }
41 : return false;
42 : }
43 :
44 :
45 : static void
46 39844865 : process_event(struct netio_event_context* evc, struct closed_fds* closedfds)
47 : {
48 39844865 : if(closedfds->count == 0 || !check_fd_is_closed(closedfds, evc->fd)){
49 39844864 : if(evc->cb != NULL) {
50 39844864 : evc->cb(evc->fd, evc->data);
51 : }
52 : }
53 39844828 : }
54 :
55 1384 : static void add_closed_fd(struct closed_fds* closedfds, int fd){
56 1384 : if (closedfds->count +1 >= 3*NETIO_MAX_POLLED_FIDS){log_warn("Cannot add FD %d to list of closed FDs because Array is full.", fd); return;}
57 1384 : closedfds->fds[closedfds->count++] = fd;
58 : }
59 :
60 38453863 : static void reset_closed_fds(struct closed_fds* closedfds){
61 38453863 : closedfds->count = 0;
62 38453863 : }
63 :
64 : static void
65 134438 : set_timerfd(int fd, unsigned s, unsigned ns)
66 : {
67 134438 : struct itimerspec it;
68 134438 : it.it_interval.tv_sec = s;
69 134438 : it.it_interval.tv_nsec = ns;
70 134438 : it.it_value.tv_sec = s;
71 134438 : it.it_value.tv_nsec = ns;
72 134438 : if(timerfd_settime(fd, 0, &it, NULL)) {
73 177 : log_error("Could not set timerfd %d. The timer will not fire.", fd);
74 177 : return;
75 : }
76 : }
77 :
78 : static void
79 5062 : register_fd(int epfd, struct netio_event_context* ctx, int flags)
80 : {
81 5062 : struct epoll_event ev;
82 5062 : ev.events = flags;
83 5062 : ev.data.ptr = ctx;
84 5062 : int rc = fcntl(ctx->fd, F_SETFL, fcntl(ctx->fd, F_GETFL) | O_NONBLOCK );
85 5064 : if (rc < 0) {
86 0 : log_error("Failed to change flags (incl. O_NONBLOCK) of file descriptor %d.", ctx->fd);
87 : }
88 5064 : log_dbg("Adding %d to epoll %d", ctx->fd, epfd);
89 5064 : if(epoll_ctl(epfd, EPOLL_CTL_ADD, ctx->fd, &ev))
90 : {
91 0 : log_error("Could not add file descriptor %d to epoll. Events from this resource will be neglected.", ctx->fd);
92 0 : return;
93 : }
94 : }
95 :
96 : // PRIVATE FUNCTIONS ///////////////////////////////////////////////////////////
97 :
98 : void
99 1456872 : netio_timer_callback(int fd, void* data)
100 : {
101 1456872 : log_trc("timer event on FD %d.", fd);
102 1456872 : struct netio_timer* timer = (struct netio_timer*)data;
103 1456872 : uint64_t buf;
104 1456872 : if(8 != read(fd, &buf, 8)) {
105 1456872 : log_dbg("Did not read 8 bytes.");
106 : }
107 1456872 : if(timer->cb)
108 1456872 : timer->cb(timer->data);
109 1456872 : }
110 :
111 : void
112 8738899 : netio_signal_callback(int fd, void* data)
113 : {
114 8738899 : log_dbg("signal event on FD %d.", fd);
115 :
116 8738899 : struct netio_signal* signal = (struct netio_signal*)data;
117 8738899 : uint64_t buf;
118 8738899 : if(8 != read(fd, &buf, 8)) {
119 0 : log_info("Did not read 8 bytes.");
120 : }
121 8738862 : log_dbg("Count = %lu", buf);
122 8738862 : if(signal->cb)
123 8738772 : signal->cb(signal->data);
124 8739001 : }
125 :
126 : void
127 8 : netio_error_connection_refused_callback(int fd, void* data)
128 : {
129 8 : log_dbg("error event on FD %d.", fd);
130 :
131 8 : struct netio_event_context* ev_ctx = (struct netio_event_context*)data;
132 8 : struct netio_send_socket* socket;
133 8 : if(8 != read(fd, &socket, 8)) {
134 8 : log_dbg("Did not read 8 bytes.");
135 : }
136 8 : if (socket->cb_error_connection_refused) {
137 8 : socket->cb_error_connection_refused(socket);
138 : } else {
139 0 : log_error("Send socket %p has no connection refused callback set. Resources not freed.", socket);
140 : }
141 8 : free(ev_ctx);
142 8 : }
143 :
144 : void
145 6 : netio_error_bind_refused_callback(int fd, void* data)
146 : {
147 6 : log_dbg("error event on FD %d.", fd);
148 :
149 6 : struct netio_event_context* ev_ctx = (struct netio_event_context*)data;
150 6 : struct netio_listen_socket* socket;
151 6 : if(8 != read(fd, &socket, 8)) {
152 6 : log_dbg("Did not read 8 bytes.");
153 : }
154 6 : if (socket->cb_error_bind_refused) {
155 6 : socket->cb_error_bind_refused(socket);
156 : } else {
157 0 : log_error("Listen socket %p has no bind refused callback set. Resources not freed.", socket);
158 : }
159 6 : free(ev_ctx);
160 6 : }
161 :
162 : // INTERNAL RESOURCES ///////////////////////////////////////////////////////////////
163 :
164 : void
165 558 : init_polled_fids(struct polled_fids* pfids, int initial_size){
166 558 : pfids->size = initial_size;
167 558 : pfids->count = 0;
168 558 : pfids->fabric = NULL;
169 558 : pfids->fid_set = malloc(pfids->size*sizeof(struct fid*));
170 558 : pfids->data = malloc(pfids->size*sizeof(struct polled_fids_data));
171 558 : }
172 :
173 : void
174 552 : init_openfds(struct open_fds* fds, int initial_size){
175 552 : fds->size = initial_size;
176 552 : fds->count = 0;
177 552 : fds->data = malloc(fds->size*sizeof(struct open_fd_data));
178 552 : }
179 :
180 :
181 : void
182 0 : print_polled_fids(struct polled_fids* pfids){
183 0 : printf("Number of polled fds %u \n", pfids->count);
184 0 : printf("FD \t FID \n");
185 0 : for(unsigned int i=0; i < pfids->count; ++i){
186 0 : printf("%d \t %p \n", pfids->data[i].fd, pfids->fid_set[i]);
187 : }
188 0 : printf("-------------------\n");
189 0 : }
190 :
191 : void
192 0 : print_openfds(struct open_fds* fds){
193 0 : printf("Number of open fds %u \n", fds->count);
194 0 : printf("===============================================\n");
195 0 : printf("FD \t RESOURCE \t SOCKET \t OBJ ADDR \n");
196 0 : printf("-----------------------------------------------\n");
197 0 : for(unsigned int i=0; i < fds->count; ++i){
198 0 : int r = fds->data[i].rtype;
199 0 : int s = fds->data[i].stype;
200 0 : printf("%d\t%s\t%s\t%p\n", fds->data[i].fd, resource_name[r], socket_name[s], fds->data[i].object);
201 : }
202 0 : printf("===============================================\n");
203 0 : }
204 :
205 :
206 : void
207 1494 : add_polled_fid(struct polled_fids* pfids, struct fid_fabric* fab, struct fid* fid, int fd, void* socket, void (*cb)(int,void*)){
208 1494 : if(pfids->size <= pfids->count){
209 1 : log_dbg("Reallocing polled fids");
210 1 : pfids->fid_set = realloc(pfids->fid_set, 2*(pfids->size)*sizeof(struct fid*));
211 1 : pfids->data = realloc(pfids->data, 2*(pfids->size)*sizeof(struct polled_fids_data));
212 1 : pfids->size *= 2;
213 1494 : };
214 1494 : log_dbg("Polled_fids %p Adding polled fd %d fid %p.", pfids, fd, fid);
215 1494 : pfids->fabric = fab;
216 1494 : pfids->fid_set[pfids->count] = fid;
217 1494 : pfids->data[pfids->count].fd = fd;
218 1494 : pfids->data[pfids->count].socket = socket;
219 1494 : pfids->data[pfids->count].cb = cb;
220 1494 : pfids->count++;
221 : //print_polled_fids(pfids);
222 1494 : };
223 :
224 : void
225 1957 : add_open_fd(struct open_fds* fds, int fd, enum resource_type rtype, enum socket_type stype, void* object){
226 1957 : if(fds->size <= fds->count){
227 0 : log_dbg("Reallocing open fds");
228 0 : fds->data = realloc(fds->data, 2*(fds->size)*sizeof(struct open_fd_data));
229 0 : fds->size *= 2;
230 : };
231 : log_dbg("New open fd %d res type %s socket type %s", fd, resource_name[rtype], socket_name[stype]);
232 5448 : for(unsigned int i=0; i<fds->count; ++i){
233 3491 : if (fd == fds->data[i].fd){
234 0 : log_error("Adding again fd % to open fds!", fd);
235 : }
236 : }
237 1957 : fds->data[fds->count].fd = fd;
238 1957 : fds->data[fds->count].object = object;
239 1957 : fds->data[fds->count].rtype = rtype;
240 1957 : fds->data[fds->count].stype = stype;
241 1957 : fds->count++;
242 1957 : }
243 :
244 :
245 : void
246 1231 : remove_polled_fid(struct polled_fids* pfids, int fd){
247 1231 : log_dbg("Polled_fids %p removing polled fd %d.", pfids, fd);
248 1945 : for(unsigned int i = 0; i < pfids->count; i++){
249 1881 : if(fd == pfids->data[i].fd){
250 : log_dbg("FD %d FID %p removed.", fd, pfids->fid_set[i]);
251 2026 : for(unsigned int j = i; j < pfids->count-1; j++){
252 859 : pfids->fid_set[j] = pfids->fid_set[j+1];
253 859 : pfids->data[j] = pfids->data[j+1];
254 : }
255 1167 : pfids->count -= 1;
256 1167 : break;
257 : }
258 : }
259 : //print_polled_fids(pfids);
260 1231 : }
261 :
262 :
263 : void
264 1384 : remove_open_fd(struct netio_eventloop* ev, int fd){
265 1384 : struct open_fds* fds = &ev->openfds;
266 2703 : for(unsigned int i = 0; i < fds->count; i++){
267 2701 : if(fd == fds->data[i].fd){
268 : log_dbg("Removing from open fd record fd %d res type %s socket type %s. Current registered fds %u", fd, resource_name[fds->data[i].rtype], socket_name[fds->data[i].stype],fds->count);
269 2559 : for(unsigned int j = i; j < fds->count-1; j++){
270 1177 : fds->data[j] = fds->data[j+1];
271 : }
272 1382 : fds->count -= 1;
273 1382 : break;
274 : }
275 : }
276 1384 : add_closed_fd(&ev->closedfds, fd);
277 1384 : }
278 :
279 :
280 : int
281 0 : check_open_fd_exists(struct open_fds* fds, int fd){
282 0 : for(unsigned int i = 0; i < fds->count; i++){
283 0 : if(fd == fds->data[i].fd){
284 : return true;
285 : }
286 : }
287 : return false;
288 : }
289 :
290 :
291 : // API FUNCTIONS ///////////////////////////////////////////////////////////////
292 :
293 : /*! \brief Initializes a NetIO event loop.
294 : *
295 : * In the background this creates an epoll file descriptor handle.
296 : *
297 : * @param evloop The event loop to initialize
298 : */
299 : void
300 552 : netio_eventloop_init(struct netio_eventloop* evloop)
301 : {
302 552 : evloop->epollfd = epoll_create1(0); // no flag passed, same behaviour as epoll_create
303 552 : evloop->events = malloc(sizeof(struct epoll_event)*NETIO_MAX_EPOLL_EVENTS);
304 552 : init_polled_fids(&evloop->pfids, NETIO_MAX_POLLED_FIDS);
305 552 : init_openfds(&evloop->openfds, NETIO_MAX_POLLED_FIDS);
306 552 : reset_closed_fds(&evloop->closedfds);
307 552 : log_dbg("Creating a new eventloop with fd %d", evloop->epollfd);
308 :
309 552 : if(evloop->epollfd == -1) {
310 0 : log_fatal("Could not create epoll fd. Exit.");
311 0 : exit(2);
312 : }
313 :
314 : //termination signal
315 552 : evloop->stop_signal.data = evloop;
316 552 : evloop->stop_signal.cb = netio_stop;
317 552 : netio_signal_init(evloop, &(evloop->stop_signal));
318 552 : log_dbg("stop signal initialised with fd %d", evloop->stop_signal.ev_ctx.fd );
319 552 : }
320 :
321 : /**
322 : * Initializes a timer and registers it with the event loop.
323 : *
324 : * Internally, timers are implemented using `timerfd`.
325 : *
326 : * @param evloop The event loop in which the timer will be registered
327 : */
328 : void
329 1192 : netio_timer_init(struct netio_eventloop* evloop, struct netio_timer* timer)
330 : {
331 1192 : timer->ev_ctx.fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
332 1191 : timer->ev_ctx.data = timer;
333 1191 : timer->ev_ctx.cb = netio_timer_callback;
334 1191 : if(timer->ev_ctx.fd == -1)
335 : {
336 0 : log_error("Could not create timerfd. The timer will not fire.");
337 0 : return;
338 : }
339 1191 : log_dbg("registering timerfd %d to %d", timer->ev_ctx.fd, evloop->epollfd);
340 1191 : netio_register_read_fd(evloop, &timer->ev_ctx);
341 : #if defined TRACK_ALL_FD
342 : add_open_fd(&evloop->openfds, timer->ev_ctx.fd, NETIO_TIMER, NOSOCKET, timer);
343 : #endif
344 : }
345 :
346 : /**
347 : * Deregisters a timer from the event loop and closes its file descriptor.
348 : *
349 : * @param evloop The event loop in which the timer is registered
350 : * @param timer The timer to unregister and close
351 : */
352 : void
353 1111 : netio_timer_close(struct netio_eventloop* evloop, struct netio_timer* timer)
354 : {
355 1111 : netio_timer_stop(timer);
356 1111 : epoll_ctl(evloop->epollfd, EPOLL_CTL_DEL, timer->ev_ctx.fd, NULL);
357 1111 : log_dbg("netio_timer_close: deregistered timer eventfd %d", timer->ev_ctx.fd);
358 : #if defined TRACK_ALL_FD
359 : int ret = close(timer->ev_ctx.fd);
360 : if(!ret){
361 : remove_open_fd(&evloop, timer->ev_ctx.fd);
362 : }
363 : #else
364 1111 : close(timer->ev_ctx.fd);
365 : #endif
366 1111 : }
367 :
368 : /**
369 : * Starts a timer with the defined period (given in seconds).
370 : *
371 : * The period is given in seconds. The timer callback is executed at the
372 : * defined frequency until it is explicitly stopped.
373 : *
374 : * @param timer The timer to start
375 : * @param seconds The timer period, given in seconds
376 : */
377 : void
378 80 : netio_timer_start_s(struct netio_timer* timer, unsigned long long seconds)
379 : {
380 80 : set_timerfd(timer->ev_ctx.fd, seconds, 0);
381 80 : }
382 :
383 : /**
384 : * Start a timer with the defined period (given in milliseconds).
385 : * @see netio_timer_start_s
386 : */
387 : void
388 66616 : netio_timer_start_ms(struct netio_timer* timer, unsigned long long milliseconds)
389 : {
390 66616 : set_timerfd(timer->ev_ctx.fd, milliseconds/1000, (milliseconds%1000)*1000*1000);
391 66616 : }
392 :
393 : /**
394 : * Start a timer with the defined period (given in microseconds).
395 : * @see netio_timer_start_s
396 : */
397 : void
398 27 : netio_timer_start_us(struct netio_timer* timer, unsigned long long microseconds)
399 : {
400 27 : set_timerfd(timer->ev_ctx.fd, microseconds/(1000*1000), (microseconds%(1000*1000)*1000));
401 27 : }
402 :
403 : /**
404 : * Start a timer with the defined period (given in nanoseconds).
405 : * @see netio_timer_start_s
406 : */
407 : void
408 0 : netio_timer_start_ns(struct netio_timer* timer, unsigned long long nanoseconds)
409 : {
410 0 : set_timerfd(timer->ev_ctx.fd, nanoseconds/(1000*1000*1000), nanoseconds%(1000*1000*1000));
411 0 : }
412 :
413 : /**
414 : * Stops a timer.
415 : *
416 : * The timer will not execute callbacks anymore until it is started again.
417 : *
418 : * @param timer The timer to stop
419 : */
420 : void
421 67715 : netio_timer_stop(struct netio_timer* timer) {
422 67715 : set_timerfd(timer->ev_ctx.fd, 0, 0);
423 67715 : }
424 :
425 :
426 : /**
427 : * Initializes a signal and registers it in the event loop.
428 : *
429 : * Internally, signals are implemented using `eventfd`.
430 : *
431 : * @param evloop The event loop in which the signal will be registered
432 : * @param signal The signal to initialize
433 : */
434 : void
435 2132 : netio_signal_init(struct netio_eventloop* evloop, struct netio_signal* signal)
436 : {
437 2132 : signal->ev_ctx.fd = eventfd(0, EFD_NONBLOCK | EFD_SEMAPHORE);
438 2132 : signal->ev_ctx.data = signal;
439 2132 : if(signal->ev_ctx.fd == -1)
440 : {
441 0 : log_fatal("Could not open eventfd");
442 0 : exit(2);
443 : }
444 :
445 2132 : signal->ev_ctx.cb = netio_signal_callback;
446 2132 : signal->epollfd = evloop->epollfd;
447 2132 : netio_register_read_fd(evloop, &signal->ev_ctx);
448 : #if defined TRACK_ALL_FD
449 : add_open_fd(&evloop->openfds, signal->ev_ctx.fd, NETIO_SIGNAL, NOSOCKET, signal);
450 : #endif
451 2132 : log_dbg("Registering eventfd %d", signal->ev_ctx.fd);
452 2132 : }
453 :
454 :
455 : void
456 0 : netio_signal_no_semaphore_init(struct netio_eventloop* evloop, struct netio_signal* signal)
457 : {
458 0 : signal->ev_ctx.fd = eventfd(0, EFD_NONBLOCK);
459 0 : signal->ev_ctx.data = signal;
460 0 : if(signal->ev_ctx.fd == -1)
461 : {
462 0 : log_fatal("Could not open eventfd");
463 0 : exit(2);
464 : }
465 :
466 0 : signal->ev_ctx.cb = netio_signal_callback;
467 0 : signal->epollfd = evloop->epollfd;
468 0 : netio_register_read_fd(evloop, &signal->ev_ctx);
469 : #if defined TRACK_ALL_FD
470 : add_open_fd(&evloop->openfds, signal->ev_ctx.fd, NETIO_SIGNAL, NOSOCKET, signal);
471 : #endif
472 0 : log_dbg("Registering eventfd %d", signal->ev_ctx.fd);
473 0 : }
474 :
475 :
476 :
477 :
478 : /**
479 : * Deregisters a signal from the event loop and closes its file descriptor.
480 : *
481 : * @param evloop The event loop in which the signal will be registered
482 : * @param signal The signal to initialize
483 : */
484 : void
485 1797 : netio_signal_close(struct netio_eventloop* evloop, struct netio_signal* signal)
486 : {
487 1797 : int rc = epoll_ctl(evloop->epollfd, EPOLL_CTL_DEL, signal->ev_ctx.fd, NULL);
488 1797 : if (rc) {
489 160 : log_warn("Cannot deregister signal fd %d from evloop %d, %s", signal->ev_ctx.fd, evloop->epollfd, strerror(errno));
490 : }
491 1797 : log_dbg("netio_signal_close: deregistered signal eventfd %d, ret %d from evloop %d", signal->ev_ctx.fd, rc, signal->epollfd);
492 1797 : int ret = close(signal->ev_ctx.fd);
493 1797 : if(ret) {log_warn("Cannot close %d: %s", signal->ev_ctx.fd, strerror(errno));}
494 : #if defined TRACK_ALL_FD
495 : remove_open_fd(&evloop, signal->ev_ctx.fd);
496 : #endif
497 1797 : }
498 :
499 :
500 : /**
501 : * Fires a signal.
502 : *
503 : * Firing the signal triggers the execution of the signal's callback. Firing
504 : * a signal is thread-safe.
505 : *
506 : * @param signal The signal to fire
507 : */
508 : void
509 8739042 : netio_signal_fire(struct netio_signal* signal)
510 : {
511 8739042 : uint64_t buf = 1;
512 8739042 : int ret = write(signal->ev_ctx.fd, &buf, 8);
513 8739004 : if( ret !=8 ){
514 0 : log_error("Firing signal writing on fd %d, only %d / 8 bytes written. Errno %s", signal->ev_ctx.fd, ret, strerror(errno));
515 : }
516 8739004 : }
517 :
518 : /**
519 : * Fires a callback for error_connection_refused.
520 : *
521 : * Firing triggers the execution of the error_connection_refused callback.
522 : *
523 : * @param socket The socket to use as parameter for the callback
524 : */
525 8 : void netio_error_connection_refused_fire(struct netio_send_socket* socket)
526 : {
527 8 : struct netio_event_context* ev_ctx = malloc(sizeof(struct netio_event_context));
528 8 : ev_ctx->fd = eventfd(0, EFD_NONBLOCK);
529 : // event context is freed by the callback
530 8 : ev_ctx->data = ev_ctx;
531 8 : if (ev_ctx->fd == -1)
532 : {
533 0 : log_fatal("Could not open eventfd for send socket error_connection_refused");
534 0 : exit(2);
535 : }
536 :
537 8 : ev_ctx->cb = netio_error_connection_refused_callback;
538 :
539 8 : netio_register_read_fd(&socket->ctx->evloop, ev_ctx);
540 8 : add_open_fd(&socket->ctx->evloop.openfds, ev_ctx->fd, NETIO_SIGNAL, USEND, socket);
541 8 : log_dbg("listen_socket netio_error_connection_refused_fire: registering eventfd %d", ev_ctx->fd);
542 :
543 : // Socket is transmitted as data to the file descriptor
544 : // struct netio_send_socket* buf = socket;
545 8 : write(ev_ctx->fd, &socket, 8);
546 8 : }
547 :
548 : /**
549 : * Fires a callback for error_bind_refused.
550 : *
551 : * Firing triggers the execution of the error_bind_refused callback.
552 : *
553 : * @param socket The socket to use as parameter for the callback
554 : */
555 6 : void netio_error_bind_refused_fire(struct netio_listen_socket* socket)
556 : {
557 6 : struct netio_event_context* ev_ctx = malloc(sizeof(struct netio_event_context));
558 6 : ev_ctx->fd = eventfd(0, EFD_NONBLOCK);
559 : // event context is freed by the callback
560 6 : ev_ctx->data = ev_ctx;
561 6 : if (ev_ctx->fd == -1)
562 : {
563 0 : log_fatal("Could not open eventfd for listen socket error_bind_refused");
564 0 : exit(2);
565 : }
566 :
567 6 : ev_ctx->cb = netio_error_bind_refused_callback;
568 :
569 6 : netio_register_read_fd(&socket->ctx->evloop, ev_ctx);
570 6 : add_open_fd(&socket->ctx->evloop.openfds, ev_ctx->fd, NETIO_SIGNAL, ULISTEN, socket);
571 6 : log_dbg("listen_socket netio_error_bind_refused_fire: registering eventfd %d", ev_ctx->fd);
572 :
573 : // Socket is transmitted as data to the file descriptor
574 : // struct netio_listen_socket* buf = socket;
575 6 : write(ev_ctx->fd, &socket, 8);
576 6 : }
577 :
578 :
579 : /**
580 : * Executes the event loop.
581 : *
582 : * The event loop is executed in an endless loop until it is explicitly
583 : * terminated by `netio_terminate`. Before any processing any other event,
584 : * `netio_run` will execute this initialization callback, if one was specified.
585 : * The core of the event loop is epoll_wait.
586 : * Note that epoll_wait returns only one event per fd, so MAX_EPOLL_EVENTS actually
587 : * translates into the maximum number of fds that are processed in one iteration
588 : * (the remaining fds are processed in a round-robin fashion in the next iteration).
589 : *
590 : * @param evloop The event loop to execute.
591 : */
592 : void
593 552 : netio_run(struct netio_eventloop* evloop)
594 : {
595 552 : evloop->is_running = 1;
596 552 : int nevents;
597 :
598 552 : if(evloop->cb_init != NULL) {
599 523 : evloop->cb_init(evloop->data);
600 : }
601 : int running=1;
602 38453675 : while(running) {
603 : // don't want to block or wait too long if we're shutting down
604 38453224 : uint64_t timeout = evloop->is_running ? NETIO_EPOLL_TIMEOUT : 10;
605 38453224 : nevents = epoll_wait(evloop->epollfd, evloop->events, NETIO_MAX_EPOLL_EVENTS,
606 : timeout);
607 38453224 : log_trc("epoll wait: %d events to process", nevents);
608 :
609 78297935 : for(int i = 0; i < nevents; ++i)
610 : {
611 39844969 : log_trc("event type: %x from fd %d", evloop->events[i].events, ((struct netio_event_context*)evloop->events[i].data.ptr)->fd);
612 39844969 : process_event((struct netio_event_context*)(evloop->events[i].data.ptr), &evloop->closedfds);
613 39844711 : if(evloop->events[i].events & EPOLLRDHUP)
614 : {
615 21 : struct netio_event_context* c = (struct netio_event_context*)(evloop->events[i].data.ptr);
616 21 : log_dbg("EPOLLRDHUP on fd %d, removing it from epoll_wait", c->fd);
617 21 : epoll_ctl(evloop->epollfd, EPOLL_CTL_DEL, c->fd, NULL);
618 : }
619 : }
620 38453307 : if (evloop->is_running==0 && nevents==0) {
621 451 : running=0;
622 : }
623 38453307 : reset_closed_fds(&evloop->closedfds);
624 38453123 : if(unlikely(nevents == -1))
625 : {
626 4 : int errsv = errno;
627 4 : if(errsv==EINTR) {
628 4 : log_dbg("Eventloop: non-blocking epoll_wait returned -1: %s", strerror(errsv));
629 4 : continue;
630 : }
631 : else {
632 0 : log_fatal("Eventloop: non-blocking epoll_wait returned -1: %s", strerror(errsv));
633 0 : free(evloop->events);
634 0 : exit(1);
635 : }
636 : }
637 : }//end of while running
638 :
639 451 : log_dbg("Cleaning up eventloop");
640 451 : close(evloop->epollfd);
641 825 : for(unsigned int i=0; i < evloop->openfds.count; ++i){
642 374 : struct open_fd_data* data = &evloop->openfds.data[i];
643 374 : log_warn("Evloop terminated, closing fd %d type %s socket type %s", data->fd, resource_name[data->rtype], socket_name[data->stype]);
644 374 : close(data->fd);
645 : }
646 451 : free(evloop->openfds.data);
647 451 : free(evloop->pfids.data);
648 451 : free(evloop->pfids.fid_set);
649 451 : free(evloop->events);
650 451 : }
651 :
652 :
653 : /**
654 : * Terminates the event loop.
655 : *
656 : * `netio_terminate` will stop the execution of the event loop. This will *not*
657 : * interrupt any event processing code or user callback that is currently being
658 : * executed. Event loop execution will halt after the processing of the current
659 : * event has completed.
660 : *
661 : * @param evloop The event loop to terminate
662 : */
663 : void
664 614 : netio_terminate(struct netio_eventloop* evloop)
665 : {
666 614 : if (evloop->is_running == 1){
667 104 : netio_stop((void*)evloop);
668 : }
669 614 : }
670 :
671 :
672 : /**
673 : * Terminates the event loop.
674 : *
675 : * `netio_terminate_signal` will add a signal to the eventloop
676 : * such that queued events, including those related to resources deallocation,
677 : * can be executed.
678 : *
679 : * @param evloop The event loop to terminate
680 : */
681 : void
682 452 : netio_terminate_signal(struct netio_eventloop* evloop)
683 : {
684 452 : if (evloop->is_running == 1){
685 376 : log_info("Firing termination signal");
686 376 : netio_signal_fire(&evloop->stop_signal);
687 : } else {
688 76 : log_warn("netio_terminate_signal called but evloop not running");
689 : }
690 452 : }
691 :
692 : void
693 451 : netio_stop(void* ptr)
694 : {
695 451 : struct netio_eventloop* evloop = (struct netio_eventloop*)ptr;
696 451 : netio_signal_close(evloop, &evloop->stop_signal);
697 451 : evloop->pfids.count = 0;
698 451 : evloop->is_running = 0;
699 451 : }
700 :
701 : void
702 5064 : netio_register_read_fd(struct netio_eventloop* evloop, struct netio_event_context* ctx)
703 : {
704 5064 : register_fd(evloop->epollfd, ctx, EPOLLIN | EPOLLRDHUP);
705 5063 : }
706 :
707 :
708 : /**
709 : * Closes a socket.
710 : *
711 : * `netio_close_socket` will deallocate the resources associated to a socket
712 : * and close the corresponding file descriptor. The type of socket is specified
713 : * by the third argument. The socket is not freed as the function ignores whether
714 : * it had been allocated statically or dynamically.
715 : *
716 : * @param evloop The event loop to which the socket belongs
717 : * @param socket Pointer to the socket
718 : * @param type Type of socket as defined by enum socket_type.
719 : */
720 : void
721 206 : netio_close_socket(struct netio_eventloop* evloop, void* socket, enum socket_type type)
722 : {
723 : //Sockets that contain more than one other socket
724 206 : switch (type){
725 0 : case BPUB: {
726 0 : struct netio_publish_socket* pub = (struct netio_publish_socket*)socket;
727 0 : struct netio_socket_list* it = pub->subscription_table.socket_list;
728 0 : while(it != NULL){
729 0 : if(it->socket){
730 0 : struct netio_buffered_send_socket* bs_socket = (struct netio_buffered_send_socket*)it->socket;
731 0 : netio_disconnect(&(bs_socket->send_socket)); //Do I need to disconnect for the sake of the other side?
732 0 : free(it->socket);
733 0 : if(it->addrlen > 0){ free(it->addr); }
734 : }
735 0 : struct netio_socket_list* tmp = it;
736 0 : it = it->next;
737 0 : free(tmp);
738 : }
739 0 : netio_close_socket(evloop, (void*)(&pub->lsocket), ULISTEN);
740 0 : return;
741 : }
742 :
743 0 : case UPUB: {
744 0 : struct netio_unbuffered_publish_socket* upub = (struct netio_unbuffered_publish_socket*)socket;
745 0 : struct netio_socket_list* uit = upub->subscription_table.socket_list;
746 0 : while(uit != NULL){
747 0 : if(uit->socket){
748 0 : struct netio_send_socket* s_socket = (struct netio_send_socket*)uit->socket;
749 0 : netio_disconnect(s_socket); //Do I need to disconnec for the sake of the other side?
750 0 : free(uit->socket);
751 0 : if(uit->addrlen > 0){ free (uit->addr); }
752 : }
753 0 : struct netio_socket_list* tmp = uit;
754 0 : uit = uit->next;
755 0 : free(tmp);
756 : }
757 0 : netio_close_socket(evloop, (void*)(&upub->lsocket), ULISTEN);
758 0 : struct netio_completion_stack* cs = &upub->completion_stack;
759 0 : free(cs->stack);
760 0 : free(cs->objects);
761 0 : free(cs->key_array);
762 0 : return;
763 : }
764 :
765 61 : case BSUB: {
766 61 : struct netio_subscribe_socket* sub_socket = (struct netio_subscribe_socket*)socket;
767 61 : netio_disconnect(&sub_socket->socket);
768 61 : netio_close_socket(evloop, &sub_socket->recv_socket, BLISTEN);
769 61 : if (sub_socket->remote_hostname) {
770 61 : free((void*)sub_socket->remote_hostname);
771 61 : sub_socket->remote_hostname=NULL;
772 : }
773 : return;
774 : }
775 :
776 41 : case USUB: {
777 41 : struct netio_unbuffered_subscribe_socket* usub_socket = (struct netio_unbuffered_subscribe_socket*)socket;
778 41 : netio_disconnect(&usub_socket->socket);
779 41 : netio_close_socket(evloop, &usub_socket->recv_socket, ULISTEN);
780 41 : if (usub_socket->remote_hostname) {
781 41 : free((void*)usub_socket->remote_hostname);
782 41 : usub_socket->remote_hostname=NULL;
783 : }
784 : return;
785 : }
786 :
787 : case NOSOCKET:
788 : return;
789 :
790 104 : default:
791 104 : ;
792 : //go on with the function
793 : }
794 :
795 104 : struct netio_signal* signal_close_socket = malloc(sizeof(struct netio_signal));
796 104 : struct signal_data* sd = malloc(sizeof(struct signal_data));
797 104 : sd->signal = signal_close_socket;
798 104 : sd->ptr = socket;
799 104 : sd->evloop = evloop;
800 :
801 104 : switch (type){
802 0 : case USEND:
803 0 : signal_close_socket->cb = close_send_socket;
804 0 : break;
805 0 : case BSEND:
806 0 : signal_close_socket->cb = close_buffered_send_socket;
807 0 : break;
808 2 : case URECV:
809 2 : signal_close_socket->cb = close_recv_socket;
810 2 : break;
811 0 : case BRECV:
812 0 : signal_close_socket->cb = close_buffered_recv_socket;
813 0 : break;
814 41 : case ULISTEN:
815 41 : signal_close_socket->cb = close_listen_socket;
816 41 : break;
817 61 : case BLISTEN: {
818 61 : struct netio_buffered_listen_socket* lsocket = (struct netio_buffered_listen_socket*)socket;
819 61 : sd->ptr = (void*)(&lsocket->listen_socket);
820 61 : signal_close_socket->cb = close_buffered_listen_socket;
821 61 : break;
822 : }
823 : case NOSOCKET:
824 : return;
825 :
826 0 : default:
827 0 : log_error("Could not delete socket: type unknown.");
828 0 : return;
829 : }
830 104 : signal_close_socket->data = sd;
831 104 : netio_signal_init(evloop, signal_close_socket);
832 104 : netio_signal_fire(signal_close_socket);
833 : }
834 :
835 :
836 358 : void netio_register_read_tcp_fd(struct netio_eventloop* evloop, struct netio_event_context* ctx)
837 : {
838 358 : struct epoll_event ev;
839 :
840 358 : log_dbg("netio_register_read_tcp_fd. Will register ctx->fd = %d with epoll", ctx->fd);
841 358 : ev.events = EPOLLIN | EPOLLRDHUP;
842 358 : ev.data.ptr = ctx;
843 :
844 358 : if(epoll_ctl(evloop->epollfd, EPOLL_CTL_ADD, ctx->fd, &ev))
845 : {
846 158 : if (errno != EEXIST) {
847 0 : log_error("Could not add FD %d to epoll. %s", ctx->fd, strerror(errno));
848 0 : return;
849 : }
850 158 : if(epoll_ctl(evloop->epollfd, EPOLL_CTL_MOD, ctx->fd, &ev))
851 : {
852 0 : log_error("Could not modify FD %d in epoll %s", ctx->fd, strerror(errno));
853 0 : exit(-2);
854 : }
855 : }
856 358 : log_dbg("Done");
857 : }
858 :
859 :
860 234 : void netio_register_write_tcp_fd(struct netio_eventloop* evloop, struct netio_event_context* ctx)
861 : {
862 234 : struct epoll_event ev;
863 :
864 234 : log_dbg("netio_register_write_tcp_fd: registering ctx->fd = %d with epoll", ctx->fd);
865 234 : ev.events = EPOLLOUT | EPOLLRDHUP | EPOLLET;
866 234 : ev.data.ptr = ctx;
867 :
868 234 : if(epoll_ctl(evloop->epollfd, EPOLL_CTL_ADD, ctx->fd, &ev))
869 : {
870 117 : if (errno != EEXIST) {
871 0 : log_error("Could not add FD %d to epoll. errno=%d", ctx->fd, errno);
872 0 : return;
873 : }
874 117 : if(epoll_ctl(evloop->epollfd, EPOLL_CTL_MOD, ctx->fd, &ev))
875 : {
876 0 : log_error("Could not modify FD %d in epoll, errno=%d", ctx->fd, errno);
877 0 : exit(-2);
878 : }
879 : }
880 234 : log_dbg("Done");
881 : }
|