Line data Source code
1 : #include <errno.h>
2 : #include <fcntl.h>
3 : #include <libgen.h>
4 : #include <limits.h>
5 : #include <pthread.h>
6 : #include <pwd.h>
7 : #include <signal.h>
8 : #include <stddef.h>
9 : #include <stdio.h>
10 : #include <stdlib.h>
11 : #include <string.h>
12 : #include <time.h>
13 : #include <unistd.h>
14 :
15 : #include <sys/file.h>
16 : #include <sys/stat.h>
17 : #include <sys/types.h>
18 :
19 : #include "jWrite.h"
20 : #include "felixbus/felixbus.h"
21 :
22 : // // NOTE: possible different solution for locking files, not used by FELIX currently
23 : // // from: https://stackoverflow.com/questions/29067893/flock-is-it-possible-to-merely-check-if-the-file-is-already-locked-without-a
24 :
25 : // /* Open and exclusive-lock file, creating it (-rw-------)
26 : // * if necessary. If fdptr is not NULL, the descriptor is
27 : // * saved there. The descriptor is never one of the standard
28 : // * descriptors STDIN_FILENO, STDOUT_FILENO, or STDERR_FILENO.
29 : // * If successful, the function returns 0.
30 : // * Otherwise, the function returns nonzero errno:
31 : // * EINVAL: Invalid lock file path
32 : // * EMFILE: Too many open files
33 : // * EALREADY: Already locked
34 : // * or one of the open(2)/creat(2) errors.
35 : // */
36 : // static int lockfile(const char *const filepath, int *const fdptr)
37 : // {
38 : // struct flock lock;
39 : // int used = 0; /* Bits 0 to 2: stdin, stdout, stderr */
40 : // int fd;
41 :
42 : // /* In case the caller is interested in the descriptor,
43 : // * initialize it to -1 (invalid). */
44 : // if (fdptr)
45 : // *fdptr = -1;
46 :
47 : // /* Invalid path? */
48 : // if (filepath == NULL || *filepath == '\0')
49 : // return errno = EINVAL;
50 :
51 : // /* Open the file. */
52 : // do {
53 : // fd = open(filepath, O_RDWR | O_CREAT, 0600);
54 : // } while (fd == -1 && errno == EINTR);
55 : // if (fd == -1) {
56 : // if (errno == EALREADY)
57 : // errno = EIO;
58 : // return errno;
59 : // }
60 :
61 : // /* Move fd away from the standard descriptors. */
62 : // while (1)
63 : // if (fd == STDIN_FILENO) {
64 : // used |= 1;
65 : // fd = dup(fd);
66 : // } else
67 : // if (fd == STDOUT_FILENO) {
68 : // used |= 2;
69 : // fd = dup(fd);
70 : // } else
71 : // if (fd == STDERR_FILENO) {
72 : // used |= 4;
73 : // fd = dup(fd);
74 : // } else
75 : // break;
76 :
77 : // /* Close the standard descriptors we temporarily used. */
78 : // if (used & 1)
79 : // close(STDIN_FILENO);
80 : // if (used & 2)
81 : // close(STDOUT_FILENO);
82 : // if (used & 4)
83 : // close(STDERR_FILENO);
84 :
85 : // /* Did we run out of descriptors? */
86 : // if (fd == -1)
87 : // return errno = EMFILE;
88 :
89 : // /* Exclusive lock, cover the entire file (regardless of size). */
90 : // lock.l_type = F_WRLCK;
91 : // lock.l_whence = SEEK_SET;
92 : // lock.l_start = 0;
93 : // lock.l_len = 0;
94 : // if (fcntl(fd, F_SETLK, &lock) == -1) {
95 : // /* Lock failed. Close file and report locking failure. */
96 : // close(fd);
97 : // return errno = EALREADY;
98 : // }
99 :
100 : // /* Save descriptor, if the caller wants it. */
101 : // if (fdptr)
102 : // *fdptr = fd;
103 :
104 : // return 0;
105 : // }
106 :
107 : // void lock_example() {
108 : // int result;
109 :
110 : // result = lockfile("YOUR_LOCKFILE_PATH", NULL);
111 : // if (result == 0) {
112 : // /* Have an exclusive lock on YOUR_LOCKFILE_PATH */
113 : // } else
114 : // if (result == EALREADY) {
115 : // /* YOUR_LOCKFILE_PATH is already locked by another process */
116 : // } else {
117 : // /* Cannot lock YOUR_LOCKFILE_PATH, see strerror(result). */
118 : // }
119 : // }
120 :
121 : // // END NOTE
122 :
123 : #define FELIX_BUS_TOUCH_INTERVAL 10 /* seconds */
124 :
125 : struct felix_bus_s {
126 : int fd;
127 : char* path;
128 : timer_t timer;
129 : char host[256];
130 : pid_t pid;
131 : char* user;
132 : };
133 :
134 : static int felix_bus_verbose = 0;
135 : static int felix_bus_cleanup = 1;
136 : #define BUFFER_SIZE (500000)
137 : static char buffer[BUFFER_SIZE];
138 :
139 415 : int mkpath(char *dir, mode_t mode) {
140 415 : struct stat sb;
141 :
142 415 : if (!dir) {
143 0 : errno = EINVAL;
144 0 : return 1;
145 : }
146 :
147 415 : if (!stat(dir, &sb))
148 : return 0;
149 :
150 302 : mkpath(dirname(strdupa(dir)), mode);
151 :
152 302 : return mkdir(dir, mode);
153 : }
154 :
155 31 : void on_timer(union sigval val) {
156 31 : felix_bus bus = (felix_bus)val.sival_ptr;
157 31 : if (felix_bus_verbose)
158 0 : printf("on_timer\n");
159 :
160 31 : int rc = felix_bus_touch(bus);
161 31 : if (rc < 0) {
162 0 : printf("ERROR: Failed to touch bus file: %s\n", bus->path);
163 0 : printf("errno=%d str=%s\n", errno, strerror(errno));
164 : }
165 31 : }
166 :
167 93 : timer_t periodic_timer(int seconds, void* ptr) {
168 93 : pthread_attr_t attr;
169 93 : pthread_attr_init( &attr );
170 :
171 93 : struct sched_param parm;
172 93 : parm.sched_priority = 255;
173 93 : pthread_attr_setschedparam(&attr, &parm);
174 :
175 93 : struct sigevent sig;
176 93 : sig.sigev_notify = SIGEV_THREAD;
177 93 : sig.sigev_notify_function = on_timer;
178 93 : sig.sigev_value.sival_ptr = ptr;
179 93 : sig.sigev_notify_attributes = &attr;
180 :
181 93 : timer_t timer_id;
182 93 : int rc = timer_create(CLOCK_REALTIME, &sig, &timer_id);
183 93 : if (rc < 0) return NULL;
184 :
185 93 : struct itimerspec in;
186 93 : in.it_value.tv_sec = seconds;
187 93 : in.it_value.tv_nsec = 0;
188 93 : in.it_interval.tv_sec = seconds;
189 93 : in.it_interval.tv_nsec = 0;
190 :
191 93 : rc = timer_settime(timer_id, 0, &in, NULL);
192 93 : if (rc < 0) {
193 0 : timer_delete(timer_id);
194 0 : return NULL;
195 : }
196 :
197 93 : return timer_id;
198 : }
199 :
200 : // void felix_bus_on_exit(int exit_code, void *arg) {
201 : // }
202 :
203 48749 : void felix_bus_set_verbose(int verbose) {
204 48749 : felix_bus_verbose = verbose;
205 48749 : }
206 :
207 30 : void felix_bus_set_cleanup(int cleanup) {
208 30 : felix_bus_cleanup = cleanup;
209 30 : }
210 :
211 113 : char* felix_bus_path(const char* bus_path_prefix, const char* groupname, uint8_t vid, uint8_t did, uint32_t cid, const char* bus_filename) {
212 113 : if (felix_bus_verbose)
213 20 : printf("felix_bus_path\n");
214 :
215 113 : char* bus_path;
216 :
217 : // to be freed by caller
218 113 : bus_path = (char*)malloc(PIPE_BUF*sizeof(char));
219 :
220 : // for future compatibility, DID and CID are encoded as shortest hexadecimal, VID is only used in decoding the FID
221 113 : int count = snprintf(bus_path, PIPE_BUF, "%s/%s/%x/%x/%s.ndjson", bus_path_prefix, groupname, did, cid, bus_filename);
222 113 : if ((count < 0) || (count >= PIPE_BUF)) {
223 0 : printf("ERROR: Cannot create bus_path\n");
224 0 : free(bus_path);
225 0 : return NULL;
226 : }
227 :
228 113 : char* bus_path_copy = strdup(bus_path);
229 113 : if (bus_path_copy == NULL) return NULL;
230 :
231 113 : char* dir = dirname(bus_path_copy);
232 :
233 : // Retry a few times for automounted directories
234 113 : int retry = 0;
235 113 : int rc = -1;
236 113 : while ((retry < 5) && (rc < 0)) {
237 113 : rc = mkpath(dir, S_IWUSR | S_IRUSR | S_IXUSR | S_IRGRP | S_IWGRP | S_IXGRP | S_IROTH | S_IXOTH);
238 113 : if (rc >= 0) break;
239 0 : retry++;
240 0 : printf("Retrying after %d second\n", retry);
241 0 : sleep(retry);
242 : }
243 113 : free(bus_path_copy);
244 113 : if (rc < 0) {
245 0 : printf("ERROR: Cannot create bus_path: '%s'\n", bus_path);
246 0 : return NULL;
247 : }
248 :
249 113 : if (felix_bus_verbose)
250 20 : printf("bus_path: %s\n", bus_path);
251 :
252 : return bus_path;
253 : }
254 :
255 2 : int felix_bus_locked(const char* bus_path) {
256 : /* Open file */
257 : /* NOTE: needs to be WR for it to be lockable */
258 : /* Relatively slow, use stale for faster status */
259 2 : int fd = open(bus_path, O_RDWR, 0);
260 2 : if (fd < 0) return 0;
261 :
262 : /* Try Lock file */
263 2 : int rc = flock(fd, LOCK_EX | LOCK_NB);
264 2 : if ((rc < 0) && (errno==EWOULDBLOCK)) {
265 2 : close(fd);
266 2 : return 1;
267 : }
268 : return 0;
269 : }
270 :
271 114830 : int felix_bus_stale(const char* bus_path) {
272 114830 : struct stat sb;
273 :
274 114830 : int rc = stat(bus_path, &sb);
275 114830 : if (rc < 0) {
276 0 : printf("ERROR: Failed to stat bus file: %s\n", bus_path);
277 0 : printf("errno=%d str=%s\n", errno, strerror(errno));
278 0 : return 1;
279 : }
280 :
281 114830 : time_t now = time(0);
282 114830 : if (now < 0) {
283 0 : printf("ERROR: Failed to stat bus file: %s\n", bus_path);
284 0 : printf("errno=%d str=%s\n", errno, strerror(errno));
285 0 : return 1;
286 : }
287 :
288 114830 : double diff_time = difftime(now, sb.st_mtime);
289 :
290 : // if (felix_bus_verbose) {
291 : // printf("Mod: %lu %s", sb.st_mtime, ctime(&sb.st_mtime));
292 : // printf("Now: %lu %s", now, ctime(&now));
293 : // printf("Dif: %f\n", diff_time);
294 : // }
295 :
296 114830 : return diff_time > 2*FELIX_BUS_TOUCH_INTERVAL;
297 : }
298 :
299 95 : felix_bus felix_bus_open(const char* bus_path) {
300 95 : int rc;
301 :
302 : /* Open file */
303 95 : int fd = open(bus_path, O_CREAT | O_WRONLY | O_SYNC, S_IWUSR | S_IRUSR | S_IRGRP | S_IWGRP | S_IROTH );
304 95 : if (fd < 0) return NULL;
305 :
306 : /* Lock file */
307 95 : rc = flock(fd, LOCK_EX | LOCK_NB);
308 95 : if (rc < 0) return NULL;
309 :
310 : /* Truncate the file */
311 93 : rc = ftruncate(fd, 0);
312 93 : if (rc < 0) return NULL;
313 :
314 : /* create bus */
315 93 : felix_bus bus;
316 93 : bus = (felix_bus)malloc(sizeof(struct felix_bus_s));
317 :
318 : /* Process ID */
319 93 : bus->pid = getpid();
320 :
321 : /* Hostname */
322 93 : rc = gethostname(bus->host, 256);
323 93 : if (rc < 0) {
324 0 : free(bus);
325 0 : return NULL;
326 : }
327 :
328 : /* Username */
329 93 : long user_bufsize = sysconf(_SC_GETPW_R_SIZE_MAX);
330 93 : if (user_bufsize == -1) {
331 0 : user_bufsize = 16384;
332 : }
333 93 : char *user_buf = malloc(user_bufsize);
334 93 : if (user_buf == NULL) {
335 0 : free(bus);
336 0 : return NULL;
337 : }
338 93 : uid_t uid = geteuid();
339 93 : struct passwd pwd;
340 93 : struct passwd *user;
341 93 : rc = getpwuid_r(uid, &pwd, user_buf, user_bufsize, &user);
342 93 : if ((rc < 0) || (user == NULL)) {
343 0 : free(bus);
344 0 : return NULL;
345 : }
346 93 : bus->user = strdup(pwd.pw_name);
347 :
348 : /* Free username memory */
349 93 : free(user_buf);
350 :
351 : /* Start a timer to keep the file current */
352 93 : timer_t timer = periodic_timer(FELIX_BUS_TOUCH_INTERVAL, bus);
353 93 : if (timer == NULL) {
354 0 : free(bus);
355 0 : return NULL;
356 : }
357 :
358 : /* Make sure we delete file and timer at the end */
359 : // rc = on_exit(felix_bus_on_exit, (void *)bus);
360 : // if (rc < 0) {
361 : // free(timer);
362 : // free(bus);
363 : // return NULL;
364 : // }
365 :
366 93 : bus->fd = fd;
367 93 : bus->path = strdup(bus_path);
368 93 : bus->timer = timer;
369 :
370 93 : if (felix_bus_verbose)
371 2 : printf("felix_bus_open(%d) for %s\n", fd, bus_path);
372 :
373 : return bus;
374 : }
375 :
376 1175 : int felix_bus_write(felix_bus bus, uint64_t fid, const struct felix_bus_info* info) {
377 : // FIXME check vid, cid and did fall within range
378 :
379 1175 : char hex_fid[19];
380 1175 : snprintf(hex_fid, 19, "0x%016lx", fid);
381 :
382 1175 : struct jWriteControl jwc;
383 1175 : jwOpen(&jwc, buffer, BUFFER_SIZE, JW_OBJECT, JW_NDJSON);
384 :
385 1175 : jwObj_string(&jwc, BUS_HFID, hex_fid);
386 1175 : jwObj_ulong(&jwc, BUS_FID, fid);
387 1175 : jwObj_string(&jwc, BUS_IP, info->ip);
388 1175 : jwObj_int(&jwc, BUS_PORT, info->port);
389 1175 : jwObj_bool(&jwc, BUS_UNBUFFERED, info->unbuffered);
390 1175 : jwObj_bool(&jwc, BUS_PUBSUB, info->pubsub);
391 1175 : jwObj_int(&jwc, BUS_NETIO_PAGES, info->netio_pages);
392 1175 : jwObj_int(&jwc, BUS_NETIO_PAGESIZE, info->netio_pagesize);
393 :
394 1175 : jwObj_string(&jwc, BUS_HOST, bus->host);
395 1175 : jwObj_int(&jwc, BUS_PID, bus->pid);
396 1175 : jwObj_string(&jwc, BUS_USER, bus->user);
397 :
398 1175 : int rc = jwClose(&jwc);
399 1175 : if (rc != 0) {
400 0 : printf("ERROR: Failed to write to: %s\n", bus->path);
401 0 : printf("jwError=%d str=%s\n", rc, jwErrorToString(rc));
402 0 : return rc;
403 : }
404 :
405 1175 : rc = write(bus->fd, buffer, strlen(buffer));
406 1175 : if (rc < 0) {
407 0 : printf("ERROR: Failed to write to: %s\n", bus->path);
408 0 : printf("errno=%d str=%s\n", errno, strerror(errno));
409 : }
410 :
411 1175 : return rc < 0 ? rc : 0;
412 : }
413 :
414 31 : int felix_bus_touch(felix_bus bus) {
415 31 : return bus->fd >= 0 ? futimens(bus->fd, NULL) : 0;
416 : }
417 :
418 95 : int felix_bus_close(felix_bus bus) {
419 : // Don't really close the file, otherwise we cannot keep the lock or touch the file
420 95 : return 0;
421 : }
422 :
423 12 : int felix_bus_release(felix_bus bus) {
424 12 : int rc;
425 :
426 12 : if (bus == NULL) return 0;
427 :
428 : // delete timer
429 10 : timer_t timer = bus->timer;
430 10 : if (felix_bus_verbose)
431 2 : printf("Deleting timer(%d) for: %s\n", bus->fd, bus->path);
432 10 : if (timer != NULL) {
433 10 : rc = timer_delete(timer);
434 10 : if (rc < 0) {
435 0 : printf("ERROR: Failed to delete timer for: %s\n", bus->path);
436 0 : printf("errno=%d str=%s\n", errno, strerror(errno));
437 : }
438 : }
439 :
440 : // close and remove file
441 10 : if (felix_bus_verbose)
442 2 : printf("Closing bus file(%d): %s\n", bus->fd, bus->path);
443 10 : rc = close(bus->fd);
444 10 : if (rc < 0) {
445 0 : printf("ERROR: Failed to remove bus file: %s\n", bus->path);
446 0 : printf("errno=%d str=%s\n", errno, strerror(errno));
447 : }
448 10 : bus->fd = -1;
449 :
450 : // cleanup files
451 10 : if (felix_bus_cleanup) {
452 8 : if (felix_bus_verbose)
453 2 : printf("Removing bus file: %s\n", bus->path);
454 8 : rc = unlink(bus->path);
455 8 : if (rc < 0) {
456 0 : printf("ERROR: Failed to remove bus file: %s\n", bus->path);
457 0 : printf("errno=%d str=%s\n", errno, strerror(errno));
458 : }
459 :
460 8 : char* dir = dirname(bus->path);
461 8 : if (felix_bus_verbose) printf("Trying to remove cid dir if empty: %s\n", dir);
462 8 : rmdir(dir);
463 :
464 8 : dir = dirname(dir);
465 8 : if (felix_bus_verbose) printf("Trying to remove did dir if empty: %s\n", dir);
466 8 : rmdir(dir);
467 :
468 8 : dir = dirname(dir);
469 8 : if (felix_bus_verbose) printf("Trying to remove version dir if empty: %s\n", dir);
470 8 : rmdir(dir);
471 :
472 8 : dir = dirname(dir);
473 8 : if (felix_bus_verbose) printf("Trying to remove group_name dir if empty: %s\n", dir);
474 8 : rmdir(dir);
475 : }
476 :
477 10 : free(bus->path);
478 10 : free(bus->user);
479 10 : free(bus);
480 :
481 10 : return 0;
482 : }
|