LCOV - code coverage report
Current view: top level - felix-bus-fs/src - felixbus.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 167 208 80.3 %
Date: 2025-08-12 04:15:35 Functions: 13 13 100.0 %

          Line data    Source code
       1             : #include <errno.h>
       2             : #include <fcntl.h>
       3             : #include <libgen.h>
       4             : #include <limits.h>
       5             : #include <pthread.h>
       6             : #include <pwd.h>
       7             : #include <signal.h>
       8             : #include <stddef.h>
       9             : #include <stdio.h>
      10             : #include <stdlib.h>
      11             : #include <string.h>
      12             : #include <time.h>
      13             : #include <unistd.h>
      14             : 
      15             : #include <sys/file.h>
      16             : #include <sys/stat.h>
      17             : #include <sys/types.h>
      18             : 
      19             : #include "jWrite.h"
      20             : #include "felixbus/felixbus.h"
      21             : 
      22             : // // NOTE: possible different solution for locking files, not used by FELIX currently
      23             : // // from: https://stackoverflow.com/questions/29067893/flock-is-it-possible-to-merely-check-if-the-file-is-already-locked-without-a
      24             : 
      25             : // /* Open and exclusive-lock file, creating it (-rw-------)
      26             : //  * if necessary. If fdptr is not NULL, the descriptor is
      27             : //  * saved there. The descriptor is never one of the standard
      28             : //  * descriptors STDIN_FILENO, STDOUT_FILENO, or STDERR_FILENO.
      29             : //  * If successful, the function returns 0.
      30             : //  * Otherwise, the function returns nonzero errno:
      31             : //  *     EINVAL: Invalid lock file path
      32             : //  *     EMFILE: Too many open files
      33             : //  *     EALREADY: Already locked
      34             : //  * or one of the open(2)/creat(2) errors.
      35             : // */
      36             : // static int lockfile(const char *const filepath, int *const fdptr)
      37             : // {
      38             : //     struct flock lock;
      39             : //     int used = 0; /* Bits 0 to 2: stdin, stdout, stderr */
      40             : //     int fd;
      41             : 
      42             : //     /* In case the caller is interested in the descriptor,
      43             : //      * initialize it to -1 (invalid). */
      44             : //     if (fdptr)
      45             : //         *fdptr = -1;
      46             : 
      47             : //     /* Invalid path? */
      48             : //     if (filepath == NULL || *filepath == '\0')
      49             : //         return errno = EINVAL;
      50             : 
      51             : //     /* Open the file. */
      52             : //     do {
      53             : //         fd = open(filepath, O_RDWR | O_CREAT, 0600);
      54             : //     } while (fd == -1 && errno == EINTR);
      55             : //     if (fd == -1) {
      56             : //         if (errno == EALREADY)
      57             : //             errno = EIO;
      58             : //         return errno;
      59             : //     }
      60             : 
      61             : //     /* Move fd away from the standard descriptors. */
      62             : //     while (1)
      63             : //         if (fd == STDIN_FILENO) {
      64             : //             used |= 1;
      65             : //             fd = dup(fd);
      66             : //         } else
      67             : //         if (fd == STDOUT_FILENO) {
      68             : //             used |= 2;
      69             : //             fd = dup(fd);
      70             : //         } else
      71             : //         if (fd == STDERR_FILENO) {
      72             : //             used |= 4;
      73             : //             fd = dup(fd);
      74             : //         } else
      75             : //             break;
      76             : 
      77             : //     /* Close the standard descriptors we temporarily used. */
      78             : //     if (used & 1)
      79             : //         close(STDIN_FILENO);
      80             : //     if (used & 2)
      81             : //         close(STDOUT_FILENO);
      82             : //     if (used & 4)
      83             : //         close(STDERR_FILENO);
      84             : 
      85             : //     /* Did we run out of descriptors? */
      86             : //     if (fd == -1)
      87             : //         return errno = EMFILE;
      88             : 
      89             : //     /* Exclusive lock, cover the entire file (regardless of size). */
      90             : //     lock.l_type = F_WRLCK;
      91             : //     lock.l_whence = SEEK_SET;
      92             : //     lock.l_start = 0;
      93             : //     lock.l_len = 0;
      94             : //     if (fcntl(fd, F_SETLK, &lock) == -1) {
      95             : //         /* Lock failed. Close file and report locking failure. */
      96             : //         close(fd);
      97             : //         return errno = EALREADY;
      98             : //     }
      99             : 
     100             : //     /* Save descriptor, if the caller wants it. */
     101             : //     if (fdptr)
     102             : //         *fdptr = fd;
     103             : 
     104             : //     return 0;
     105             : // }
     106             : 
     107             : // void lock_example() {
     108             : //     int result;
     109             : 
     110             : //     result = lockfile("YOUR_LOCKFILE_PATH", NULL);
     111             : //     if (result == 0) {
     112             : //         /* Have an exclusive lock on YOUR_LOCKFILE_PATH */
     113             : //     } else
     114             : //     if (result == EALREADY) {
     115             : //         /* YOUR_LOCKFILE_PATH is already locked by another process */
     116             : //     } else {
     117             : //         /* Cannot lock YOUR_LOCKFILE_PATH, see strerror(result). */
     118             : //     }
     119             : // }
     120             : 
     121             : // // END NOTE
     122             : 
     123             : #define FELIX_BUS_TOUCH_INTERVAL 10 /* seconds */
     124             : 
     125             : struct felix_bus_s {
     126             :     int fd;
     127             :     char* path;
     128             :     timer_t timer;
     129             :     char host[256];
     130             :     pid_t pid;
     131             :     char* user;
     132             : };
     133             : 
     134             : static int felix_bus_verbose = 0;
     135             : static int felix_bus_cleanup = 1;
     136             : #define BUFFER_SIZE (500000)
     137             : static char buffer[BUFFER_SIZE];
     138             : 
     139         230 : int mkpath(char *dir, mode_t mode) {
     140         230 :     struct stat sb;
     141             : 
     142         230 :     if (!dir) {
     143           0 :         errno = EINVAL;
     144           0 :         return 1;
     145             :     }
     146             : 
     147         230 :     if (!stat(dir, &sb))
     148             :         return 0;
     149             : 
     150         129 :     mkpath(dirname(strdupa(dir)), mode);
     151             : 
     152         129 :     return mkdir(dir, mode);
     153             : }
     154             : 
     155           3 : void on_timer(union sigval val) {
     156           3 :     felix_bus bus = (felix_bus)val.sival_ptr;
     157           3 :     if (felix_bus_verbose)
     158           0 :         printf("on_timer felix-bus-fs\n");
     159             : 
     160           3 :     int rc = felix_bus_touch(bus);
     161           3 :     if (rc < 0) {
     162           0 :         printf("ERROR: Failed to touch bus file: %s\n", bus->path);
     163           0 :         printf("errno=%d str=%s\n", errno, strerror(errno));
     164             :     }
     165           3 : }
     166             : 
     167          41 : timer_t periodic_timer(int seconds, void* ptr) {
     168          41 :     pthread_attr_t attr;
     169          41 :     pthread_attr_init( &attr );
     170             : 
     171          41 :     struct sched_param parm;
     172          41 :     parm.sched_priority = 255;
     173          41 :     pthread_attr_setschedparam(&attr, &parm);
     174             : 
     175          41 :     struct sigevent sig;
     176          41 :     sig.sigev_notify = SIGEV_THREAD;
     177          41 :     sig.sigev_notify_function = on_timer;
     178          41 :     sig.sigev_value.sival_ptr = ptr;
     179          41 :     sig.sigev_notify_attributes = &attr;
     180             : 
     181          41 :     timer_t timer_id;
     182          41 :     int rc = timer_create(CLOCK_REALTIME, &sig, &timer_id);
     183          41 :     if (rc < 0) return NULL;
     184             : 
     185          41 :     struct itimerspec in;
     186          41 :     in.it_value.tv_sec = seconds;
     187          41 :     in.it_value.tv_nsec = 0;
     188          41 :     in.it_interval.tv_sec = seconds;
     189          41 :     in.it_interval.tv_nsec = 0;
     190             : 
     191          41 :     rc = timer_settime(timer_id, 0, &in, NULL);
     192          41 :     if (rc < 0) {
     193           0 :         timer_delete(timer_id);
     194           0 :         return NULL;
     195             :     }
     196             : 
     197          41 :     return timer_id;
     198             : }
     199             : 
     200             : // void felix_bus_on_exit(int exit_code, void *arg) {
     201             : // }
     202             : 
     203       26225 : void felix_bus_set_verbose(int verbose) {
     204       26225 :     felix_bus_verbose = verbose;
     205       26225 : }
     206             : 
     207         101 : void felix_bus_set_cleanup(int cleanup) {
     208         101 :     felix_bus_cleanup = cleanup;
     209         101 : }
     210             : 
     211         101 : char* felix_bus_path(const char* bus_path_prefix, const char* groupname, uint8_t vid, uint8_t did, uint32_t cid, const char* bus_filename) {
     212         101 :     if (felix_bus_verbose)
     213          10 :         printf("felix_bus_path\n");
     214             : 
     215         101 :     char* bus_path;
     216             : 
     217             :     // to be freed by caller
     218         101 :     bus_path = (char*)malloc(PIPE_BUF*sizeof(char));
     219             : 
     220             :     // for future compatibility, DID and CID are encoded as shortest hexadecimal, VID is only used in decoding the FID
     221         101 :     int count = snprintf(bus_path, PIPE_BUF, "%s/%s/%x/%x/%s.ndjson", bus_path_prefix, groupname, did, cid, bus_filename);
     222         101 :     if ((count < 0) || (count >= PIPE_BUF)) {
     223           0 :         printf("ERROR: Cannot create bus_path\n");
     224           0 :         free(bus_path);
     225           0 :         return NULL;
     226             :     }
     227             : 
     228         101 :     char* bus_path_copy = strdup(bus_path);
     229         101 :     if (bus_path_copy == NULL) return NULL;
     230             : 
     231         101 :     char* dir = dirname(bus_path_copy);
     232             : 
     233             :     // Retry a few times for automounted directories
     234         101 :     int retry = 0;
     235         101 :     int rc = -1;
     236         101 :     while ((retry < 5) && (rc < 0)) {
     237         101 :         rc = mkpath(dir, S_IWUSR | S_IRUSR | S_IXUSR | S_IRGRP | S_IWGRP | S_IXGRP | S_IROTH | S_IXOTH);
     238         101 :         if (rc >= 0) break;
     239           0 :         retry++;
     240           0 :         printf("Retrying after %d second\n", retry);
     241           0 :         sleep(retry);
     242             :     }
     243             : 
     244         101 :     free(bus_path_copy);
     245         101 :     if (rc < 0) {
     246           0 :         printf("ERROR: Cannot create bus_path: '%s'\n", bus_path);
     247           0 :         return NULL;
     248             :     }
     249             : 
     250         101 :     if (felix_bus_verbose)
     251          10 :         printf("bus_path: %s\n", bus_path);
     252             : 
     253             :     return bus_path;
     254             : }
     255             : 
     256          10 : int felix_bus_locked(const char* bus_path) {
     257             :     /* Open file */
     258             :     /* NOTE: needs to be WR for it to be lockable */
     259             :     /* Relatively slow, use stale for faster status */
     260          10 :     int fd = open(bus_path, O_RDWR, 0);
     261          10 :     if (fd < 0) return 0;
     262             : 
     263             :     /* Try Lock file */
     264          10 :     int rc = flock(fd, LOCK_EX | LOCK_NB);
     265          10 :     if ((rc < 0) && (errno==EWOULDBLOCK)) {
     266          10 :         close(fd);
     267          10 :         return 1;
     268             :     }
     269             :     return 0;
     270             : }
     271             : 
     272       58499 : int felix_bus_stale(const char* bus_path) {
     273       58499 :     struct stat sb;
     274             : 
     275       58499 :     int rc = stat(bus_path, &sb);
     276       58499 :     if (rc < 0) {
     277           0 :         printf("ERROR: Failed to stat bus file: %s\n", bus_path);
     278           0 :         printf("errno=%d str=%s\n", errno, strerror(errno));
     279           0 :         return 1;
     280             :     }
     281             : 
     282       58499 :     time_t now = time(0);
     283       58499 :     if (now < 0) {
     284           0 :         printf("ERROR: Failed to stat bus file: %s\n", bus_path);
     285           0 :         printf("errno=%d str=%s\n", errno, strerror(errno));
     286           0 :         return 1;
     287             :     }
     288             : 
     289       58499 :     double diff_time = difftime(now, sb.st_mtime);
     290             : 
     291             :     // if (felix_bus_verbose) {
     292             :     //     printf("Mod: %lu %s", sb.st_mtime, ctime(&sb.st_mtime));
     293             :     //     printf("Now: %lu %s", now, ctime(&now));
     294             :     //     printf("Dif: %f\n", diff_time);
     295             :     // }
     296             : 
     297       58499 :     return diff_time > 2*FELIX_BUS_TOUCH_INTERVAL;
     298             : }
     299             : 
     300          42 : felix_bus felix_bus_open(const char* bus_path) {
     301          42 :     int rc;
     302             : 
     303             :     /* Open file */
     304          42 :     int fd = open(bus_path, O_CREAT | O_WRONLY | O_SYNC, S_IWUSR | S_IRUSR | S_IRGRP | S_IWGRP | S_IROTH );
     305          42 :     if (fd < 0) return NULL;
     306             : 
     307             :     /* Lock file */
     308          42 :     rc = flock(fd, LOCK_EX | LOCK_NB);
     309          42 :     if (rc < 0) return NULL;
     310             : 
     311             :     /* Truncate the file */
     312          41 :     rc = ftruncate(fd, 0);
     313          41 :     if (rc < 0) return NULL;
     314             : 
     315             :     /* create bus */
     316          41 :     felix_bus bus;
     317          41 :     bus = (felix_bus)malloc(sizeof(struct felix_bus_s));
     318             : 
     319             :     /* Process ID */
     320          41 :     bus->pid = getpid();
     321             : 
     322             :     /* Hostname */
     323          41 :     rc = gethostname(bus->host, 256);
     324          41 :     if (rc < 0) {
     325           0 :         free(bus);
     326           0 :         return NULL;
     327             :     }
     328             : 
     329             :     /* Username */
     330          41 :     long user_bufsize = sysconf(_SC_GETPW_R_SIZE_MAX);
     331          41 :     if (user_bufsize == -1) {
     332           0 :         user_bufsize = 16384;
     333             :     }
     334          41 :     char *user_buf = malloc(user_bufsize);
     335          41 :     if (user_buf == NULL) {
     336           0 :         free(bus);
     337           0 :         return NULL;
     338             :     }
     339          41 :     uid_t uid = geteuid();
     340          41 :     struct passwd pwd;
     341          41 :     struct passwd *user;
     342          41 :     rc = getpwuid_r(uid, &pwd, user_buf, user_bufsize, &user);
     343          41 :     if ((rc < 0) || (user == NULL)) {
     344           0 :         free(bus);
     345           0 :         return NULL;
     346             :     }
     347          41 :     bus->user = strdup(pwd.pw_name);
     348             : 
     349             :     /* Free username memory */
     350          41 :     free(user_buf);
     351             : 
     352             :     /* Start a timer to keep the file current */
     353          41 :     timer_t timer = periodic_timer(FELIX_BUS_TOUCH_INTERVAL, bus);
     354          41 :     if (timer == NULL) {
     355           0 :         free(bus);
     356           0 :         return NULL;
     357             :     }
     358             : 
     359             :     /* Make sure we delete file and timer at the end */
     360             :     // rc = on_exit(felix_bus_on_exit, (void *)bus);
     361             :     // if (rc < 0) {
     362             :     //     free(timer);
     363             :     //     free(bus);
     364             :     //     return NULL;
     365             :     // }
     366             : 
     367          41 :     bus->fd = fd;
     368          41 :     bus->path = strdup(bus_path);
     369          41 :     bus->timer = timer;
     370             : 
     371          41 :     if (felix_bus_verbose)
     372           1 :         printf("felix_bus_open(%d) for %s\n", fd, bus_path);
     373             : 
     374             :     return bus;
     375             : }
     376             : 
     377         100 : int felix_bus_write(felix_bus bus, uint64_t fid, const struct felix_bus_info* info) {
     378             :     // FIXME check vid, cid and did fall within range
     379             : 
     380         100 :     char hex_fid[19];
     381         100 :     snprintf(hex_fid, 19, "0x%016lx", fid);
     382             : 
     383         100 :     struct jWriteControl jwc;
     384         100 :     jwOpen(&jwc, buffer, BUFFER_SIZE, JW_OBJECT, JW_NDJSON);
     385             : 
     386         100 :     jwObj_string(&jwc, BUS_HFID, hex_fid);
     387         100 :     jwObj_ulong(&jwc, BUS_FID, fid);
     388         100 :     jwObj_string(&jwc, BUS_IP, info->ip);
     389         100 :     jwObj_int(&jwc, BUS_PORT, info->port);
     390         100 :     jwObj_bool(&jwc, BUS_UNBUFFERED, info->unbuffered);
     391         100 :     jwObj_bool(&jwc, BUS_PUBSUB, info->pubsub);
     392         100 :     jwObj_bool(&jwc, BUS_RAW_TCP, info->raw_tcp);
     393         100 :     jwObj_bool(&jwc, BUS_STREAM, info->stream);
     394         100 :     jwObj_int(&jwc, BUS_NETIO_PAGES, info->netio_pages);
     395         100 :     jwObj_int(&jwc, BUS_NETIO_PAGESIZE, info->netio_pagesize);
     396             : 
     397         100 :     jwObj_string(&jwc, BUS_HOST, bus->host);
     398         100 :     jwObj_int(&jwc, BUS_PID, bus->pid);
     399         100 :     jwObj_string(&jwc, BUS_USER, bus->user);
     400             : 
     401         100 :     int rc = jwClose(&jwc);
     402         100 :     if (rc != 0) {
     403           0 :         printf("ERROR: Failed to write to: %s\n", bus->path);
     404           0 :         printf("jwError=%d str=%s\n", rc, jwErrorToString(rc));
     405           0 :         return rc;
     406             :     }
     407             : 
     408         100 :     rc = write(bus->fd, buffer, strlen(buffer));
     409         100 :     if (rc < 0) {
     410           0 :         printf("ERROR: Failed to write to: %s\n", bus->path);
     411           0 :         printf("errno=%d str=%s\n", errno, strerror(errno));
     412             :     }
     413             : 
     414         100 :     return rc < 0 ? rc : 0;
     415             : }
     416             : 
     417           3 : int felix_bus_touch(felix_bus bus) {
     418           3 :     return bus->fd >= 0 ? futimens(bus->fd, NULL) : 0;
     419             : }
     420             : 
     421          42 : int felix_bus_close(felix_bus bus) {
     422             :     // Don't really close the file, otherwise we cannot keep the lock or touch the file
     423          42 :     return 0;
     424             : }
     425             : 
     426          42 : int felix_bus_release(felix_bus bus) {
     427          42 :     int rc;
     428             : 
     429          42 :     if (bus == NULL) return 0;
     430             : 
     431             :     // delete timer
     432          41 :     timer_t timer = bus->timer;
     433          41 :     if (felix_bus_verbose)
     434           1 :         printf("Deleting timer(%d) for: %s\n", bus->fd, bus->path);
     435          41 :     if (timer != NULL) {
     436          41 :         rc = timer_delete(timer);
     437          41 :         if (rc < 0) {
     438           0 :             printf("ERROR: Failed to delete timer for: %s\n", bus->path);
     439           0 :             printf("errno=%d str=%s\n", errno, strerror(errno));
     440             :         }
     441             :     }
     442             : 
     443             :     // close and remove file
     444          41 :     if (felix_bus_verbose)
     445           1 :         printf("Closing bus file(%d): %s\n", bus->fd, bus->path);
     446          41 :     rc = close(bus->fd);
     447          41 :     if (rc < 0) {
     448           0 :         printf("ERROR: Failed to remove bus file: %s\n", bus->path);
     449           0 :         printf("errno=%d str=%s\n", errno, strerror(errno));
     450             :     }
     451          41 :     bus->fd = -1;
     452             : 
     453             :     // cleanup files
     454          41 :     if (felix_bus_cleanup) {
     455          31 :         if (felix_bus_verbose)
     456           1 :             printf("Removing bus file: %s\n", bus->path);
     457          31 :         rc = unlink(bus->path);
     458          31 :         if (rc < 0) {
     459           0 :             printf("ERROR: Failed to remove bus file: %s\n", bus->path);
     460           0 :             printf("errno=%d str=%s\n", errno, strerror(errno));
     461             :         }
     462             : 
     463          31 :         char* dir = dirname(bus->path);
     464          31 :         if (felix_bus_verbose) printf("Trying to remove cid dir if empty: %s\n", dir);
     465          31 :         rmdir(dir);
     466             : 
     467          31 :         dir = dirname(dir);
     468          31 :         if (felix_bus_verbose) printf("Trying to remove did dir if empty: %s\n", dir);
     469          31 :         rmdir(dir);
     470             : 
     471          31 :         dir = dirname(dir);
     472          31 :         if (felix_bus_verbose) printf("Trying to remove version dir if empty: %s\n", dir);
     473          31 :         rmdir(dir);
     474             : 
     475          31 :         dir = dirname(dir);
     476          31 :         if (felix_bus_verbose) printf("Trying to remove group_name dir if empty: %s\n", dir);
     477          31 :         rmdir(dir);
     478             :     }
     479             : 
     480          41 :     free(bus->path);
     481          41 :     free(bus->user);
     482          41 :     free(bus);
     483             : 
     484          41 :     return 0;
     485             : }

Generated by: LCOV version 1.0