Line data Source code
1 : #include <errno.h>
2 : #include <fcntl.h>
3 : #include <libgen.h>
4 : #include <limits.h>
5 : #include <pthread.h>
6 : #include <pwd.h>
7 : #include <signal.h>
8 : #include <stddef.h>
9 : #include <stdio.h>
10 : #include <stdlib.h>
11 : #include <string.h>
12 : #include <time.h>
13 : #include <unistd.h>
14 :
15 : #include <sys/file.h>
16 : #include <sys/stat.h>
17 : #include <sys/types.h>
18 :
19 : #include "jWrite.h"
20 : #include "felixbus/felixbus.h"
21 :
22 : // // NOTE: possible different solution for locking files, not used by FELIX currently
23 : // // from: https://stackoverflow.com/questions/29067893/flock-is-it-possible-to-merely-check-if-the-file-is-already-locked-without-a
24 :
25 : // /* Open and exclusive-lock file, creating it (-rw-------)
26 : // * if necessary. If fdptr is not NULL, the descriptor is
27 : // * saved there. The descriptor is never one of the standard
28 : // * descriptors STDIN_FILENO, STDOUT_FILENO, or STDERR_FILENO.
29 : // * If successful, the function returns 0.
30 : // * Otherwise, the function returns nonzero errno:
31 : // * EINVAL: Invalid lock file path
32 : // * EMFILE: Too many open files
33 : // * EALREADY: Already locked
34 : // * or one of the open(2)/creat(2) errors.
35 : // */
36 : // static int lockfile(const char *const filepath, int *const fdptr)
37 : // {
38 : // struct flock lock;
39 : // int used = 0; /* Bits 0 to 2: stdin, stdout, stderr */
40 : // int fd;
41 :
42 : // /* In case the caller is interested in the descriptor,
43 : // * initialize it to -1 (invalid). */
44 : // if (fdptr)
45 : // *fdptr = -1;
46 :
47 : // /* Invalid path? */
48 : // if (filepath == NULL || *filepath == '\0')
49 : // return errno = EINVAL;
50 :
51 : // /* Open the file. */
52 : // do {
53 : // fd = open(filepath, O_RDWR | O_CREAT, 0600);
54 : // } while (fd == -1 && errno == EINTR);
55 : // if (fd == -1) {
56 : // if (errno == EALREADY)
57 : // errno = EIO;
58 : // return errno;
59 : // }
60 :
61 : // /* Move fd away from the standard descriptors. */
62 : // while (1)
63 : // if (fd == STDIN_FILENO) {
64 : // used |= 1;
65 : // fd = dup(fd);
66 : // } else
67 : // if (fd == STDOUT_FILENO) {
68 : // used |= 2;
69 : // fd = dup(fd);
70 : // } else
71 : // if (fd == STDERR_FILENO) {
72 : // used |= 4;
73 : // fd = dup(fd);
74 : // } else
75 : // break;
76 :
77 : // /* Close the standard descriptors we temporarily used. */
78 : // if (used & 1)
79 : // close(STDIN_FILENO);
80 : // if (used & 2)
81 : // close(STDOUT_FILENO);
82 : // if (used & 4)
83 : // close(STDERR_FILENO);
84 :
85 : // /* Did we run out of descriptors? */
86 : // if (fd == -1)
87 : // return errno = EMFILE;
88 :
89 : // /* Exclusive lock, cover the entire file (regardless of size). */
90 : // lock.l_type = F_WRLCK;
91 : // lock.l_whence = SEEK_SET;
92 : // lock.l_start = 0;
93 : // lock.l_len = 0;
94 : // if (fcntl(fd, F_SETLK, &lock) == -1) {
95 : // /* Lock failed. Close file and report locking failure. */
96 : // close(fd);
97 : // return errno = EALREADY;
98 : // }
99 :
100 : // /* Save descriptor, if the caller wants it. */
101 : // if (fdptr)
102 : // *fdptr = fd;
103 :
104 : // return 0;
105 : // }
106 :
107 : // void lock_example() {
108 : // int result;
109 :
110 : // result = lockfile("YOUR_LOCKFILE_PATH", NULL);
111 : // if (result == 0) {
112 : // /* Have an exclusive lock on YOUR_LOCKFILE_PATH */
113 : // } else
114 : // if (result == EALREADY) {
115 : // /* YOUR_LOCKFILE_PATH is already locked by another process */
116 : // } else {
117 : // /* Cannot lock YOUR_LOCKFILE_PATH, see strerror(result). */
118 : // }
119 : // }
120 :
121 : // // END NOTE
122 :
123 : #define FELIX_BUS_TOUCH_INTERVAL 10 /* seconds */
124 :
125 : struct felix_bus_s {
126 : int fd;
127 : char* path;
128 : timer_t timer;
129 : char host[256];
130 : pid_t pid;
131 : char* user;
132 : };
133 :
134 : static int felix_bus_verbose = 0;
135 : static int felix_bus_cleanup = 1;
136 : #define BUFFER_SIZE (500000)
137 : static char buffer[BUFFER_SIZE];
138 :
139 230 : int mkpath(char *dir, mode_t mode) {
140 230 : struct stat sb;
141 :
142 230 : if (!dir) {
143 0 : errno = EINVAL;
144 0 : return 1;
145 : }
146 :
147 230 : if (!stat(dir, &sb))
148 : return 0;
149 :
150 129 : mkpath(dirname(strdupa(dir)), mode);
151 :
152 129 : return mkdir(dir, mode);
153 : }
154 :
155 3 : void on_timer(union sigval val) {
156 3 : felix_bus bus = (felix_bus)val.sival_ptr;
157 3 : if (felix_bus_verbose)
158 0 : printf("on_timer felix-bus-fs\n");
159 :
160 3 : int rc = felix_bus_touch(bus);
161 3 : if (rc < 0) {
162 0 : printf("ERROR: Failed to touch bus file: %s\n", bus->path);
163 0 : printf("errno=%d str=%s\n", errno, strerror(errno));
164 : }
165 3 : }
166 :
167 41 : timer_t periodic_timer(int seconds, void* ptr) {
168 41 : pthread_attr_t attr;
169 41 : pthread_attr_init( &attr );
170 :
171 41 : struct sched_param parm;
172 41 : parm.sched_priority = 255;
173 41 : pthread_attr_setschedparam(&attr, &parm);
174 :
175 41 : struct sigevent sig;
176 41 : sig.sigev_notify = SIGEV_THREAD;
177 41 : sig.sigev_notify_function = on_timer;
178 41 : sig.sigev_value.sival_ptr = ptr;
179 41 : sig.sigev_notify_attributes = &attr;
180 :
181 41 : timer_t timer_id;
182 41 : int rc = timer_create(CLOCK_REALTIME, &sig, &timer_id);
183 41 : if (rc < 0) return NULL;
184 :
185 41 : struct itimerspec in;
186 41 : in.it_value.tv_sec = seconds;
187 41 : in.it_value.tv_nsec = 0;
188 41 : in.it_interval.tv_sec = seconds;
189 41 : in.it_interval.tv_nsec = 0;
190 :
191 41 : rc = timer_settime(timer_id, 0, &in, NULL);
192 41 : if (rc < 0) {
193 0 : timer_delete(timer_id);
194 0 : return NULL;
195 : }
196 :
197 41 : return timer_id;
198 : }
199 :
200 : // void felix_bus_on_exit(int exit_code, void *arg) {
201 : // }
202 :
203 26225 : void felix_bus_set_verbose(int verbose) {
204 26225 : felix_bus_verbose = verbose;
205 26225 : }
206 :
207 101 : void felix_bus_set_cleanup(int cleanup) {
208 101 : felix_bus_cleanup = cleanup;
209 101 : }
210 :
211 101 : char* felix_bus_path(const char* bus_path_prefix, const char* groupname, uint8_t vid, uint8_t did, uint32_t cid, const char* bus_filename) {
212 101 : if (felix_bus_verbose)
213 10 : printf("felix_bus_path\n");
214 :
215 101 : char* bus_path;
216 :
217 : // to be freed by caller
218 101 : bus_path = (char*)malloc(PIPE_BUF*sizeof(char));
219 :
220 : // for future compatibility, DID and CID are encoded as shortest hexadecimal, VID is only used in decoding the FID
221 101 : int count = snprintf(bus_path, PIPE_BUF, "%s/%s/%x/%x/%s.ndjson", bus_path_prefix, groupname, did, cid, bus_filename);
222 101 : if ((count < 0) || (count >= PIPE_BUF)) {
223 0 : printf("ERROR: Cannot create bus_path\n");
224 0 : free(bus_path);
225 0 : return NULL;
226 : }
227 :
228 101 : char* bus_path_copy = strdup(bus_path);
229 101 : if (bus_path_copy == NULL) return NULL;
230 :
231 101 : char* dir = dirname(bus_path_copy);
232 :
233 : // Retry a few times for automounted directories
234 101 : int retry = 0;
235 101 : int rc = -1;
236 101 : while ((retry < 5) && (rc < 0)) {
237 101 : rc = mkpath(dir, S_IWUSR | S_IRUSR | S_IXUSR | S_IRGRP | S_IWGRP | S_IXGRP | S_IROTH | S_IXOTH);
238 101 : if (rc >= 0) break;
239 0 : retry++;
240 0 : printf("Retrying after %d second\n", retry);
241 0 : sleep(retry);
242 : }
243 :
244 101 : free(bus_path_copy);
245 101 : if (rc < 0) {
246 0 : printf("ERROR: Cannot create bus_path: '%s'\n", bus_path);
247 0 : return NULL;
248 : }
249 :
250 101 : if (felix_bus_verbose)
251 10 : printf("bus_path: %s\n", bus_path);
252 :
253 : return bus_path;
254 : }
255 :
256 10 : int felix_bus_locked(const char* bus_path) {
257 : /* Open file */
258 : /* NOTE: needs to be WR for it to be lockable */
259 : /* Relatively slow, use stale for faster status */
260 10 : int fd = open(bus_path, O_RDWR, 0);
261 10 : if (fd < 0) return 0;
262 :
263 : /* Try Lock file */
264 10 : int rc = flock(fd, LOCK_EX | LOCK_NB);
265 10 : if ((rc < 0) && (errno==EWOULDBLOCK)) {
266 10 : close(fd);
267 10 : return 1;
268 : }
269 : return 0;
270 : }
271 :
272 58499 : int felix_bus_stale(const char* bus_path) {
273 58499 : struct stat sb;
274 :
275 58499 : int rc = stat(bus_path, &sb);
276 58499 : if (rc < 0) {
277 0 : printf("ERROR: Failed to stat bus file: %s\n", bus_path);
278 0 : printf("errno=%d str=%s\n", errno, strerror(errno));
279 0 : return 1;
280 : }
281 :
282 58499 : time_t now = time(0);
283 58499 : if (now < 0) {
284 0 : printf("ERROR: Failed to stat bus file: %s\n", bus_path);
285 0 : printf("errno=%d str=%s\n", errno, strerror(errno));
286 0 : return 1;
287 : }
288 :
289 58499 : double diff_time = difftime(now, sb.st_mtime);
290 :
291 : // if (felix_bus_verbose) {
292 : // printf("Mod: %lu %s", sb.st_mtime, ctime(&sb.st_mtime));
293 : // printf("Now: %lu %s", now, ctime(&now));
294 : // printf("Dif: %f\n", diff_time);
295 : // }
296 :
297 58499 : return diff_time > 2*FELIX_BUS_TOUCH_INTERVAL;
298 : }
299 :
300 42 : felix_bus felix_bus_open(const char* bus_path) {
301 42 : int rc;
302 :
303 : /* Open file */
304 42 : int fd = open(bus_path, O_CREAT | O_WRONLY | O_SYNC, S_IWUSR | S_IRUSR | S_IRGRP | S_IWGRP | S_IROTH );
305 42 : if (fd < 0) return NULL;
306 :
307 : /* Lock file */
308 42 : rc = flock(fd, LOCK_EX | LOCK_NB);
309 42 : if (rc < 0) return NULL;
310 :
311 : /* Truncate the file */
312 41 : rc = ftruncate(fd, 0);
313 41 : if (rc < 0) return NULL;
314 :
315 : /* create bus */
316 41 : felix_bus bus;
317 41 : bus = (felix_bus)malloc(sizeof(struct felix_bus_s));
318 :
319 : /* Process ID */
320 41 : bus->pid = getpid();
321 :
322 : /* Hostname */
323 41 : rc = gethostname(bus->host, 256);
324 41 : if (rc < 0) {
325 0 : free(bus);
326 0 : return NULL;
327 : }
328 :
329 : /* Username */
330 41 : long user_bufsize = sysconf(_SC_GETPW_R_SIZE_MAX);
331 41 : if (user_bufsize == -1) {
332 0 : user_bufsize = 16384;
333 : }
334 41 : char *user_buf = malloc(user_bufsize);
335 41 : if (user_buf == NULL) {
336 0 : free(bus);
337 0 : return NULL;
338 : }
339 41 : uid_t uid = geteuid();
340 41 : struct passwd pwd;
341 41 : struct passwd *user;
342 41 : rc = getpwuid_r(uid, &pwd, user_buf, user_bufsize, &user);
343 41 : if ((rc < 0) || (user == NULL)) {
344 0 : free(bus);
345 0 : return NULL;
346 : }
347 41 : bus->user = strdup(pwd.pw_name);
348 :
349 : /* Free username memory */
350 41 : free(user_buf);
351 :
352 : /* Start a timer to keep the file current */
353 41 : timer_t timer = periodic_timer(FELIX_BUS_TOUCH_INTERVAL, bus);
354 41 : if (timer == NULL) {
355 0 : free(bus);
356 0 : return NULL;
357 : }
358 :
359 : /* Make sure we delete file and timer at the end */
360 : // rc = on_exit(felix_bus_on_exit, (void *)bus);
361 : // if (rc < 0) {
362 : // free(timer);
363 : // free(bus);
364 : // return NULL;
365 : // }
366 :
367 41 : bus->fd = fd;
368 41 : bus->path = strdup(bus_path);
369 41 : bus->timer = timer;
370 :
371 41 : if (felix_bus_verbose)
372 1 : printf("felix_bus_open(%d) for %s\n", fd, bus_path);
373 :
374 : return bus;
375 : }
376 :
377 100 : int felix_bus_write(felix_bus bus, uint64_t fid, const struct felix_bus_info* info) {
378 : // FIXME check vid, cid and did fall within range
379 :
380 100 : char hex_fid[19];
381 100 : snprintf(hex_fid, 19, "0x%016lx", fid);
382 :
383 100 : struct jWriteControl jwc;
384 100 : jwOpen(&jwc, buffer, BUFFER_SIZE, JW_OBJECT, JW_NDJSON);
385 :
386 100 : jwObj_string(&jwc, BUS_HFID, hex_fid);
387 100 : jwObj_ulong(&jwc, BUS_FID, fid);
388 100 : jwObj_string(&jwc, BUS_IP, info->ip);
389 100 : jwObj_int(&jwc, BUS_PORT, info->port);
390 100 : jwObj_bool(&jwc, BUS_UNBUFFERED, info->unbuffered);
391 100 : jwObj_bool(&jwc, BUS_PUBSUB, info->pubsub);
392 100 : jwObj_bool(&jwc, BUS_RAW_TCP, info->raw_tcp);
393 100 : jwObj_bool(&jwc, BUS_STREAM, info->stream);
394 100 : jwObj_int(&jwc, BUS_NETIO_PAGES, info->netio_pages);
395 100 : jwObj_int(&jwc, BUS_NETIO_PAGESIZE, info->netio_pagesize);
396 :
397 100 : jwObj_string(&jwc, BUS_HOST, bus->host);
398 100 : jwObj_int(&jwc, BUS_PID, bus->pid);
399 100 : jwObj_string(&jwc, BUS_USER, bus->user);
400 :
401 100 : int rc = jwClose(&jwc);
402 100 : if (rc != 0) {
403 0 : printf("ERROR: Failed to write to: %s\n", bus->path);
404 0 : printf("jwError=%d str=%s\n", rc, jwErrorToString(rc));
405 0 : return rc;
406 : }
407 :
408 100 : rc = write(bus->fd, buffer, strlen(buffer));
409 100 : if (rc < 0) {
410 0 : printf("ERROR: Failed to write to: %s\n", bus->path);
411 0 : printf("errno=%d str=%s\n", errno, strerror(errno));
412 : }
413 :
414 100 : return rc < 0 ? rc : 0;
415 : }
416 :
417 3 : int felix_bus_touch(felix_bus bus) {
418 3 : return bus->fd >= 0 ? futimens(bus->fd, NULL) : 0;
419 : }
420 :
421 42 : int felix_bus_close(felix_bus bus) {
422 : // Don't really close the file, otherwise we cannot keep the lock or touch the file
423 42 : return 0;
424 : }
425 :
426 42 : int felix_bus_release(felix_bus bus) {
427 42 : int rc;
428 :
429 42 : if (bus == NULL) return 0;
430 :
431 : // delete timer
432 41 : timer_t timer = bus->timer;
433 41 : if (felix_bus_verbose)
434 1 : printf("Deleting timer(%d) for: %s\n", bus->fd, bus->path);
435 41 : if (timer != NULL) {
436 41 : rc = timer_delete(timer);
437 41 : if (rc < 0) {
438 0 : printf("ERROR: Failed to delete timer for: %s\n", bus->path);
439 0 : printf("errno=%d str=%s\n", errno, strerror(errno));
440 : }
441 : }
442 :
443 : // close and remove file
444 41 : if (felix_bus_verbose)
445 1 : printf("Closing bus file(%d): %s\n", bus->fd, bus->path);
446 41 : rc = close(bus->fd);
447 41 : if (rc < 0) {
448 0 : printf("ERROR: Failed to remove bus file: %s\n", bus->path);
449 0 : printf("errno=%d str=%s\n", errno, strerror(errno));
450 : }
451 41 : bus->fd = -1;
452 :
453 : // cleanup files
454 41 : if (felix_bus_cleanup) {
455 31 : if (felix_bus_verbose)
456 1 : printf("Removing bus file: %s\n", bus->path);
457 31 : rc = unlink(bus->path);
458 31 : if (rc < 0) {
459 0 : printf("ERROR: Failed to remove bus file: %s\n", bus->path);
460 0 : printf("errno=%d str=%s\n", errno, strerror(errno));
461 : }
462 :
463 31 : char* dir = dirname(bus->path);
464 31 : if (felix_bus_verbose) printf("Trying to remove cid dir if empty: %s\n", dir);
465 31 : rmdir(dir);
466 :
467 31 : dir = dirname(dir);
468 31 : if (felix_bus_verbose) printf("Trying to remove did dir if empty: %s\n", dir);
469 31 : rmdir(dir);
470 :
471 31 : dir = dirname(dir);
472 31 : if (felix_bus_verbose) printf("Trying to remove version dir if empty: %s\n", dir);
473 31 : rmdir(dir);
474 :
475 31 : dir = dirname(dir);
476 31 : if (felix_bus_verbose) printf("Trying to remove group_name dir if empty: %s\n", dir);
477 31 : rmdir(dir);
478 : }
479 :
480 41 : free(bus->path);
481 41 : free(bus->user);
482 41 : free(bus);
483 :
484 41 : return 0;
485 : }
|