LCOV - code coverage report
Current view: top level - felix-bus-fs/src - felixbus.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 165 206 80.1 %
Date: 2025-06-10 03:23:28 Functions: 13 13 100.0 %

          Line data    Source code
       1             : #include <errno.h>
       2             : #include <fcntl.h>
       3             : #include <libgen.h>
       4             : #include <limits.h>
       5             : #include <pthread.h>
       6             : #include <pwd.h>
       7             : #include <signal.h>
       8             : #include <stddef.h>
       9             : #include <stdio.h>
      10             : #include <stdlib.h>
      11             : #include <string.h>
      12             : #include <time.h>
      13             : #include <unistd.h>
      14             : 
      15             : #include <sys/file.h>
      16             : #include <sys/stat.h>
      17             : #include <sys/types.h>
      18             : 
      19             : #include "jWrite.h"
      20             : #include "felixbus/felixbus.h"
      21             : 
      22             : // // NOTE: possible different solution for locking files, not used by FELIX currently
      23             : // // from: https://stackoverflow.com/questions/29067893/flock-is-it-possible-to-merely-check-if-the-file-is-already-locked-without-a
      24             : 
      25             : // /* Open and exclusive-lock file, creating it (-rw-------)
      26             : //  * if necessary. If fdptr is not NULL, the descriptor is
      27             : //  * saved there. The descriptor is never one of the standard
      28             : //  * descriptors STDIN_FILENO, STDOUT_FILENO, or STDERR_FILENO.
      29             : //  * If successful, the function returns 0.
      30             : //  * Otherwise, the function returns nonzero errno:
      31             : //  *     EINVAL: Invalid lock file path
      32             : //  *     EMFILE: Too many open files
      33             : //  *     EALREADY: Already locked
      34             : //  * or one of the open(2)/creat(2) errors.
      35             : // */
      36             : // static int lockfile(const char *const filepath, int *const fdptr)
      37             : // {
      38             : //     struct flock lock;
      39             : //     int used = 0; /* Bits 0 to 2: stdin, stdout, stderr */
      40             : //     int fd;
      41             : 
      42             : //     /* In case the caller is interested in the descriptor,
      43             : //      * initialize it to -1 (invalid). */
      44             : //     if (fdptr)
      45             : //         *fdptr = -1;
      46             : 
      47             : //     /* Invalid path? */
      48             : //     if (filepath == NULL || *filepath == '\0')
      49             : //         return errno = EINVAL;
      50             : 
      51             : //     /* Open the file. */
      52             : //     do {
      53             : //         fd = open(filepath, O_RDWR | O_CREAT, 0600);
      54             : //     } while (fd == -1 && errno == EINTR);
      55             : //     if (fd == -1) {
      56             : //         if (errno == EALREADY)
      57             : //             errno = EIO;
      58             : //         return errno;
      59             : //     }
      60             : 
      61             : //     /* Move fd away from the standard descriptors. */
      62             : //     while (1)
      63             : //         if (fd == STDIN_FILENO) {
      64             : //             used |= 1;
      65             : //             fd = dup(fd);
      66             : //         } else
      67             : //         if (fd == STDOUT_FILENO) {
      68             : //             used |= 2;
      69             : //             fd = dup(fd);
      70             : //         } else
      71             : //         if (fd == STDERR_FILENO) {
      72             : //             used |= 4;
      73             : //             fd = dup(fd);
      74             : //         } else
      75             : //             break;
      76             : 
      77             : //     /* Close the standard descriptors we temporarily used. */
      78             : //     if (used & 1)
      79             : //         close(STDIN_FILENO);
      80             : //     if (used & 2)
      81             : //         close(STDOUT_FILENO);
      82             : //     if (used & 4)
      83             : //         close(STDERR_FILENO);
      84             : 
      85             : //     /* Did we run out of descriptors? */
      86             : //     if (fd == -1)
      87             : //         return errno = EMFILE;
      88             : 
      89             : //     /* Exclusive lock, cover the entire file (regardless of size). */
      90             : //     lock.l_type = F_WRLCK;
      91             : //     lock.l_whence = SEEK_SET;
      92             : //     lock.l_start = 0;
      93             : //     lock.l_len = 0;
      94             : //     if (fcntl(fd, F_SETLK, &lock) == -1) {
      95             : //         /* Lock failed. Close file and report locking failure. */
      96             : //         close(fd);
      97             : //         return errno = EALREADY;
      98             : //     }
      99             : 
     100             : //     /* Save descriptor, if the caller wants it. */
     101             : //     if (fdptr)
     102             : //         *fdptr = fd;
     103             : 
     104             : //     return 0;
     105             : // }
     106             : 
     107             : // void lock_example() {
     108             : //     int result;
     109             : 
     110             : //     result = lockfile("YOUR_LOCKFILE_PATH", NULL);
     111             : //     if (result == 0) {
     112             : //         /* Have an exclusive lock on YOUR_LOCKFILE_PATH */
     113             : //     } else
     114             : //     if (result == EALREADY) {
     115             : //         /* YOUR_LOCKFILE_PATH is already locked by another process */
     116             : //     } else {
     117             : //         /* Cannot lock YOUR_LOCKFILE_PATH, see strerror(result). */
     118             : //     }
     119             : // }
     120             : 
     121             : // // END NOTE
     122             : 
     123             : #define FELIX_BUS_TOUCH_INTERVAL 10 /* seconds */
     124             : 
     125             : struct felix_bus_s {
     126             :     int fd;
     127             :     char* path;
     128             :     timer_t timer;
     129             :     char host[256];
     130             :     pid_t pid;
     131             :     char* user;
     132             : };
     133             : 
     134             : static int felix_bus_verbose = 0;
     135             : static int felix_bus_cleanup = 1;
     136             : #define BUFFER_SIZE (500000)
     137             : static char buffer[BUFFER_SIZE];
     138             : 
     139         415 : int mkpath(char *dir, mode_t mode) {
     140         415 :     struct stat sb;
     141             : 
     142         415 :     if (!dir) {
     143           0 :         errno = EINVAL;
     144           0 :         return 1;
     145             :     }
     146             : 
     147         415 :     if (!stat(dir, &sb))
     148             :         return 0;
     149             : 
     150         302 :     mkpath(dirname(strdupa(dir)), mode);
     151             : 
     152         302 :     return mkdir(dir, mode);
     153             : }
     154             : 
     155          31 : void on_timer(union sigval val) {
     156          31 :     felix_bus bus = (felix_bus)val.sival_ptr;
     157          31 :     if (felix_bus_verbose)
     158           0 :         printf("on_timer\n");
     159             : 
     160          31 :     int rc = felix_bus_touch(bus);
     161          31 :     if (rc < 0) {
     162           0 :         printf("ERROR: Failed to touch bus file: %s\n", bus->path);
     163           0 :         printf("errno=%d str=%s\n", errno, strerror(errno));
     164             :     }
     165          31 : }
     166             : 
     167          93 : timer_t periodic_timer(int seconds, void* ptr) {
     168          93 :     pthread_attr_t attr;
     169          93 :     pthread_attr_init( &attr );
     170             : 
     171          93 :     struct sched_param parm;
     172          93 :     parm.sched_priority = 255;
     173          93 :     pthread_attr_setschedparam(&attr, &parm);
     174             : 
     175          93 :     struct sigevent sig;
     176          93 :     sig.sigev_notify = SIGEV_THREAD;
     177          93 :     sig.sigev_notify_function = on_timer;
     178          93 :     sig.sigev_value.sival_ptr = ptr;
     179          93 :     sig.sigev_notify_attributes = &attr;
     180             : 
     181          93 :     timer_t timer_id;
     182          93 :     int rc = timer_create(CLOCK_REALTIME, &sig, &timer_id);
     183          93 :     if (rc < 0) return NULL;
     184             : 
     185          93 :     struct itimerspec in;
     186          93 :     in.it_value.tv_sec = seconds;
     187          93 :     in.it_value.tv_nsec = 0;
     188          93 :     in.it_interval.tv_sec = seconds;
     189          93 :     in.it_interval.tv_nsec = 0;
     190             : 
     191          93 :     rc = timer_settime(timer_id, 0, &in, NULL);
     192          93 :     if (rc < 0) {
     193           0 :         timer_delete(timer_id);
     194           0 :         return NULL;
     195             :     }
     196             : 
     197          93 :     return timer_id;
     198             : }
     199             : 
     200             : // void felix_bus_on_exit(int exit_code, void *arg) {
     201             : // }
     202             : 
     203       48749 : void felix_bus_set_verbose(int verbose) {
     204       48749 :     felix_bus_verbose = verbose;
     205       48749 : }
     206             : 
     207          30 : void felix_bus_set_cleanup(int cleanup) {
     208          30 :     felix_bus_cleanup = cleanup;
     209          30 : }
     210             : 
     211         113 : char* felix_bus_path(const char* bus_path_prefix, const char* groupname, uint8_t vid, uint8_t did, uint32_t cid, const char* bus_filename) {
     212         113 :     if (felix_bus_verbose)
     213          20 :         printf("felix_bus_path\n");
     214             : 
     215         113 :     char* bus_path;
     216             : 
     217             :     // to be freed by caller
     218         113 :     bus_path = (char*)malloc(PIPE_BUF*sizeof(char));
     219             : 
     220             :     // for future compatibility, DID and CID are encoded as shortest hexadecimal, VID is only used in decoding the FID
     221         113 :     int count = snprintf(bus_path, PIPE_BUF, "%s/%s/%x/%x/%s.ndjson", bus_path_prefix, groupname, did, cid, bus_filename);
     222         113 :     if ((count < 0) || (count >= PIPE_BUF)) {
     223           0 :         printf("ERROR: Cannot create bus_path\n");
     224           0 :         free(bus_path);
     225           0 :         return NULL;
     226             :     }
     227             : 
     228         113 :     char* bus_path_copy = strdup(bus_path);
     229         113 :     if (bus_path_copy == NULL) return NULL;
     230             : 
     231         113 :     char* dir = dirname(bus_path_copy);
     232             : 
     233             :     // Retry a few times for automounted directories
     234         113 :     int retry = 0;
     235         113 :     int rc = -1;
     236         113 :     while ((retry < 5) && (rc < 0)) {
     237         113 :         rc = mkpath(dir, S_IWUSR | S_IRUSR | S_IXUSR | S_IRGRP | S_IWGRP | S_IXGRP | S_IROTH | S_IXOTH);
     238         113 :         if (rc >= 0) break;
     239           0 :         retry++;
     240           0 :         printf("Retrying after %d second\n", retry);
     241           0 :         sleep(retry);
     242             :     }
     243         113 :     free(bus_path_copy);
     244         113 :     if (rc < 0) {
     245           0 :         printf("ERROR: Cannot create bus_path: '%s'\n", bus_path);
     246           0 :         return NULL;
     247             :     }
     248             : 
     249         113 :     if (felix_bus_verbose)
     250          20 :         printf("bus_path: %s\n", bus_path);
     251             : 
     252             :     return bus_path;
     253             : }
     254             : 
     255           2 : int felix_bus_locked(const char* bus_path) {
     256             :     /* Open file */
     257             :     /* NOTE: needs to be WR for it to be lockable */
     258             :     /* Relatively slow, use stale for faster status */
     259           2 :     int fd = open(bus_path, O_RDWR, 0);
     260           2 :     if (fd < 0) return 0;
     261             : 
     262             :     /* Try Lock file */
     263           2 :     int rc = flock(fd, LOCK_EX | LOCK_NB);
     264           2 :     if ((rc < 0) && (errno==EWOULDBLOCK)) {
     265           2 :         close(fd);
     266           2 :         return 1;
     267             :     }
     268             :     return 0;
     269             : }
     270             : 
     271      114830 : int felix_bus_stale(const char* bus_path) {
     272      114830 :     struct stat sb;
     273             : 
     274      114830 :     int rc = stat(bus_path, &sb);
     275      114830 :     if (rc < 0) {
     276           0 :         printf("ERROR: Failed to stat bus file: %s\n", bus_path);
     277           0 :         printf("errno=%d str=%s\n", errno, strerror(errno));
     278           0 :         return 1;
     279             :     }
     280             : 
     281      114830 :     time_t now = time(0);
     282      114830 :     if (now < 0) {
     283           0 :         printf("ERROR: Failed to stat bus file: %s\n", bus_path);
     284           0 :         printf("errno=%d str=%s\n", errno, strerror(errno));
     285           0 :         return 1;
     286             :     }
     287             : 
     288      114830 :     double diff_time = difftime(now, sb.st_mtime);
     289             : 
     290             :     // if (felix_bus_verbose) {
     291             :     //     printf("Mod: %lu %s", sb.st_mtime, ctime(&sb.st_mtime));
     292             :     //     printf("Now: %lu %s", now, ctime(&now));
     293             :     //     printf("Dif: %f\n", diff_time);
     294             :     // }
     295             : 
     296      114830 :     return diff_time > 2*FELIX_BUS_TOUCH_INTERVAL;
     297             : }
     298             : 
     299          95 : felix_bus felix_bus_open(const char* bus_path) {
     300          95 :     int rc;
     301             : 
     302             :     /* Open file */
     303          95 :     int fd = open(bus_path, O_CREAT | O_WRONLY | O_SYNC, S_IWUSR | S_IRUSR | S_IRGRP | S_IWGRP | S_IROTH );
     304          95 :     if (fd < 0) return NULL;
     305             : 
     306             :     /* Lock file */
     307          95 :     rc = flock(fd, LOCK_EX | LOCK_NB);
     308          95 :     if (rc < 0) return NULL;
     309             : 
     310             :     /* Truncate the file */
     311          93 :     rc = ftruncate(fd, 0);
     312          93 :     if (rc < 0) return NULL;
     313             : 
     314             :     /* create bus */
     315          93 :     felix_bus bus;
     316          93 :     bus = (felix_bus)malloc(sizeof(struct felix_bus_s));
     317             : 
     318             :     /* Process ID */
     319          93 :     bus->pid = getpid();
     320             : 
     321             :     /* Hostname */
     322          93 :     rc = gethostname(bus->host, 256);
     323          93 :     if (rc < 0) {
     324           0 :         free(bus);
     325           0 :         return NULL;
     326             :     }
     327             : 
     328             :     /* Username */
     329          93 :     long user_bufsize = sysconf(_SC_GETPW_R_SIZE_MAX);
     330          93 :     if (user_bufsize == -1) {
     331           0 :         user_bufsize = 16384;
     332             :     }
     333          93 :     char *user_buf = malloc(user_bufsize);
     334          93 :     if (user_buf == NULL) {
     335           0 :         free(bus);
     336           0 :         return NULL;
     337             :     }
     338          93 :     uid_t uid = geteuid();
     339          93 :     struct passwd pwd;
     340          93 :     struct passwd *user;
     341          93 :     rc = getpwuid_r(uid, &pwd, user_buf, user_bufsize, &user);
     342          93 :     if ((rc < 0) || (user == NULL)) {
     343           0 :         free(bus);
     344           0 :         return NULL;
     345             :     }
     346          93 :     bus->user = strdup(pwd.pw_name);
     347             : 
     348             :     /* Free username memory */
     349          93 :     free(user_buf);
     350             : 
     351             :     /* Start a timer to keep the file current */
     352          93 :     timer_t timer = periodic_timer(FELIX_BUS_TOUCH_INTERVAL, bus);
     353          93 :     if (timer == NULL) {
     354           0 :         free(bus);
     355           0 :         return NULL;
     356             :     }
     357             : 
     358             :     /* Make sure we delete file and timer at the end */
     359             :     // rc = on_exit(felix_bus_on_exit, (void *)bus);
     360             :     // if (rc < 0) {
     361             :     //     free(timer);
     362             :     //     free(bus);
     363             :     //     return NULL;
     364             :     // }
     365             : 
     366          93 :     bus->fd = fd;
     367          93 :     bus->path = strdup(bus_path);
     368          93 :     bus->timer = timer;
     369             : 
     370          93 :     if (felix_bus_verbose)
     371           2 :         printf("felix_bus_open(%d) for %s\n", fd, bus_path);
     372             : 
     373             :     return bus;
     374             : }
     375             : 
     376        1175 : int felix_bus_write(felix_bus bus, uint64_t fid, const struct felix_bus_info* info) {
     377             :     // FIXME check vid, cid and did fall within range
     378             : 
     379        1175 :     char hex_fid[19];
     380        1175 :     snprintf(hex_fid, 19, "0x%016lx", fid);
     381             : 
     382        1175 :     struct jWriteControl jwc;
     383        1175 :     jwOpen(&jwc, buffer, BUFFER_SIZE, JW_OBJECT, JW_NDJSON);
     384             : 
     385        1175 :     jwObj_string(&jwc, BUS_HFID, hex_fid);
     386        1175 :     jwObj_ulong(&jwc, BUS_FID, fid);
     387        1175 :     jwObj_string(&jwc, BUS_IP, info->ip);
     388        1175 :     jwObj_int(&jwc, BUS_PORT, info->port);
     389        1175 :     jwObj_bool(&jwc, BUS_UNBUFFERED, info->unbuffered);
     390        1175 :     jwObj_bool(&jwc, BUS_PUBSUB, info->pubsub);
     391        1175 :     jwObj_int(&jwc, BUS_NETIO_PAGES, info->netio_pages);
     392        1175 :     jwObj_int(&jwc, BUS_NETIO_PAGESIZE, info->netio_pagesize);
     393             : 
     394        1175 :     jwObj_string(&jwc, BUS_HOST, bus->host);
     395        1175 :     jwObj_int(&jwc, BUS_PID, bus->pid);
     396        1175 :     jwObj_string(&jwc, BUS_USER, bus->user);
     397             : 
     398        1175 :     int rc = jwClose(&jwc);
     399        1175 :     if (rc != 0) {
     400           0 :         printf("ERROR: Failed to write to: %s\n", bus->path);
     401           0 :         printf("jwError=%d str=%s\n", rc, jwErrorToString(rc));
     402           0 :         return rc;
     403             :     }
     404             : 
     405        1175 :     rc = write(bus->fd, buffer, strlen(buffer));
     406        1175 :     if (rc < 0) {
     407           0 :         printf("ERROR: Failed to write to: %s\n", bus->path);
     408           0 :         printf("errno=%d str=%s\n", errno, strerror(errno));
     409             :     }
     410             : 
     411        1175 :     return rc < 0 ? rc : 0;
     412             : }
     413             : 
     414          31 : int felix_bus_touch(felix_bus bus) {
     415          31 :     return bus->fd >= 0 ? futimens(bus->fd, NULL) : 0;
     416             : }
     417             : 
     418          95 : int felix_bus_close(felix_bus bus) {
     419             :     // Don't really close the file, otherwise we cannot keep the lock or touch the file
     420          95 :     return 0;
     421             : }
     422             : 
     423          12 : int felix_bus_release(felix_bus bus) {
     424          12 :     int rc;
     425             : 
     426          12 :     if (bus == NULL) return 0;
     427             : 
     428             :     // delete timer
     429          10 :     timer_t timer = bus->timer;
     430          10 :     if (felix_bus_verbose)
     431           2 :         printf("Deleting timer(%d) for: %s\n", bus->fd, bus->path);
     432          10 :     if (timer != NULL) {
     433          10 :         rc = timer_delete(timer);
     434          10 :         if (rc < 0) {
     435           0 :             printf("ERROR: Failed to delete timer for: %s\n", bus->path);
     436           0 :             printf("errno=%d str=%s\n", errno, strerror(errno));
     437             :         }
     438             :     }
     439             : 
     440             :     // close and remove file
     441          10 :     if (felix_bus_verbose)
     442           2 :         printf("Closing bus file(%d): %s\n", bus->fd, bus->path);
     443          10 :     rc = close(bus->fd);
     444          10 :     if (rc < 0) {
     445           0 :         printf("ERROR: Failed to remove bus file: %s\n", bus->path);
     446           0 :         printf("errno=%d str=%s\n", errno, strerror(errno));
     447             :     }
     448          10 :     bus->fd = -1;
     449             : 
     450             :     // cleanup files
     451          10 :     if (felix_bus_cleanup) {
     452           8 :         if (felix_bus_verbose)
     453           2 :             printf("Removing bus file: %s\n", bus->path);
     454           8 :         rc = unlink(bus->path);
     455           8 :         if (rc < 0) {
     456           0 :             printf("ERROR: Failed to remove bus file: %s\n", bus->path);
     457           0 :             printf("errno=%d str=%s\n", errno, strerror(errno));
     458             :         }
     459             : 
     460           8 :         char* dir = dirname(bus->path);
     461           8 :         if (felix_bus_verbose) printf("Trying to remove cid dir if empty: %s\n", dir);
     462           8 :         rmdir(dir);
     463             : 
     464           8 :         dir = dirname(dir);
     465           8 :         if (felix_bus_verbose) printf("Trying to remove did dir if empty: %s\n", dir);
     466           8 :         rmdir(dir);
     467             : 
     468           8 :         dir = dirname(dir);
     469           8 :         if (felix_bus_verbose) printf("Trying to remove version dir if empty: %s\n", dir);
     470           8 :         rmdir(dir);
     471             : 
     472           8 :         dir = dirname(dir);
     473           8 :         if (felix_bus_verbose) printf("Trying to remove group_name dir if empty: %s\n", dir);
     474           8 :         rmdir(dir);
     475             :     }
     476             : 
     477          10 :     free(bus->path);
     478          10 :     free(bus->user);
     479          10 :     free(bus);
     480             : 
     481          10 :     return 0;
     482             : }

Generated by: LCOV version 1.0