Line data Source code
1 : #include <rdma/fabric.h>
2 : #include <sched.h>
3 :
4 : #include "app.h"
5 : #include "log.h"
6 : #include "config.h"
7 : #include "block.h"
8 : #include "fifo.h"
9 : #include "bus.h"
10 : #include "elinktable.h"
11 : #include "buffered.h"
12 : #include "unbuffered.h"
13 : #include "felixbus/felixbus.h"
14 : #include "felixtag.h"
15 : #include "l1id_decoder.h"
16 :
17 : #define MAX_STREAMS (256)
18 :
19 : struct config cfg;
20 : struct application app;
21 : static int mode;
22 :
23 : // Forward declarations
24 : void on_interrupt(void* ptr);
25 : int process_block(char* ptr);
26 :
27 : void (*on_data_available)();
28 : static void on_data_available_buffered();
29 : static void on_data_available_zerocopy();
30 : void on_buffered_publish_buffer_available(struct netio_publish_socket* socket);
31 :
32 : static void on_unbuffered_connection_established(struct netio_unbuffered_publish_socket*);
33 : static void on_unbuffered_connection_closed(struct netio_unbuffered_publish_socket*);
34 :
35 : void on_subscribe(netio_tag_t, void*, size_t);
36 : static void on_buffered_subscribe(struct netio_publish_socket*, netio_tag_t, void*, size_t);
37 : static void on_unbuffered_subscribe(struct netio_unbuffered_publish_socket*, netio_tag_t, void*, size_t);
38 :
39 1638 : unsigned get_minimum_pages(struct netio_publish_socket* socket)
40 : {
41 1638 : unsigned pages = socket->attr.num_pages;
42 1638 : struct netio_socket_list* itr = socket->subscription_table.socket_list;
43 :
44 1760 : while(itr != NULL){
45 122 : struct netio_buffered_send_socket* buf_send_socket = (struct netio_buffered_send_socket*)itr->socket;
46 122 : unsigned socket_pages = buf_send_socket->buffers.available_buffers;
47 122 : if(socket_pages < pages){
48 : pages = socket_pages;
49 : }
50 122 : itr = itr->next;
51 : }
52 1638 : return pages;
53 : }
54 :
55 : // Callbacks
56 :
57 546 : void on_stat_callback(void* ptr) {
58 546 : app.daq_min_pages = get_minimum_pages(&app.gbt_socket);
59 546 : app.dcs_min_pages = get_minimum_pages(&app.dcs_socket);
60 546 : app.ttc_min_pages = get_minimum_pages(&app.ttc2h_socket);
61 546 : }
62 :
63 44 : void on_init(void* data)
64 : {
65 44 : clock_gettime(CLOCK_MONOTONIC_RAW, &app.statistics.last_ts);
66 44 : app.stats_writer.timer.cb = on_stat_callback;
67 44 : netio_timer_start_ms(&app.stats_writer.timer, cfg.statistics_period);
68 44 : }
69 :
70 : static void
71 44 : dump_fabric_info(struct fi_info* info)
72 : {
73 44 : const char* info_str = fi_tostr(info, FI_TYPE_INFO);
74 44 : unsigned prov_version = info->fabric_attr->prov_version;
75 44 : unsigned api_version = info->fabric_attr->api_version;
76 :
77 44 : LOG_INFO("libfabric provider: %s (version: %u.%u), api version: %u.%u",
78 : info->fabric_attr->prov_name,
79 : FI_MAJOR(prov_version), FI_MINOR(prov_version),
80 : FI_MAJOR(api_version), FI_MINOR(api_version));
81 44 : LOG_INFO("fabric: %s", info->fabric_attr->name);
82 44 : if(strcmp("verbs", info->fabric_attr->prov_name)){
83 0 : LOG_WARN("Selected libfabric provider %s is not verbs.", info->fabric_attr->prov_name);
84 : }
85 44 : LOG_DBG("%s", info_str);
86 44 : }
87 :
88 64512 : void dump_block(char* block){
89 64512 : uint64_t fw_ptr = flx_dma_get_current_address(&app.flx, &app.buffer);
90 64512 : uint64_t sw_ptr = flx_dma_get_read_ptr(&app.flx, &app.buffer);
91 64512 : LOG_DUMP("Invalid block dump. Pointers: fw 0x%lx, sw 0x%lx, start 0x%lx, end 0x%lx", fw_ptr, sw_ptr, app.buffer.pstart, app.buffer.pend);
92 64512 : uint32_t* block32 = (uint32_t*)block;
93 64512 : LOG_DUMP("Block address 0x%lx", (uint64_t)block);
94 16579584 : for (unsigned i=0; i < app.block_size/4; ++i)
95 : {
96 16515072 : fprintf(stderr, "%08X ", *(block32+i));
97 : }
98 64512 : fprintf(stderr, "\n");
99 64512 : }
100 :
101 :
102 0 : static void set_scheduling(pthread_t th, int policy, int priority) {
103 0 : struct sched_param sch_params;
104 0 : sch_params.sched_priority = priority;
105 0 : if( pthread_setschedparam(th, policy, &sch_params) ) {
106 0 : LOG_ERR("Failed to set Thread scheduling: %s", strerror(errno));
107 : } else {
108 0 : int p;
109 0 : if ( pthread_getschedparam(th, &p, &sch_params) ) {
110 0 : LOG_ERR("Failed to read back Thread scheduling settings: %s", strerror(errno));
111 : } else {
112 0 : LOG_INFO("Set thread policy %s, priority %d",
113 : (p == SCHED_FIFO) ? "SCHED_FIFO" :
114 : (p == SCHED_RR) ? "SCHED_RR" :
115 : (p == SCHED_OTHER) ? "SCHED_OTHER" :
116 : "???",
117 : sch_params.sched_priority);
118 : }
119 : }
120 0 : }
121 :
122 :
123 : // unbuffered sockets
124 : static void
125 13 : init_unbuffered_socket()
126 : {
127 13 : app.read_ptr = (void*)app.buffer.vaddr;
128 13 : app.fm_buf.data = (void*)app.buffer.vaddr;
129 13 : app.fm_buf.size = app.buffer.size;
130 13 : netio_unbuffered_publish_socket_init(&app.fm_socket, &app.ctx, cfg.ip, cfg.port, &app.fm_buf);
131 :
132 13 : app.fm_socket.cb_msg_published = on_unbuffered_msg_published;
133 13 : app.fm_socket.cb_subscribe = on_unbuffered_subscribe;
134 13 : app.fm_socket.cb_connection_established = on_unbuffered_connection_established;
135 13 : app.fm_socket.cb_connection_closed = on_unbuffered_connection_closed;
136 :
137 13 : dump_fabric_info(app.fm_socket.lsocket.fi);
138 13 : }
139 :
140 :
141 : static void
142 6 : init_dcs_unbuffered_socket()
143 : {
144 6 : app.dcs_ubuf.size = app.buffer.size;
145 6 : app.dcs_ubuf.data = (void*)app.buffer.vaddr;
146 6 : netio_unbuffered_publish_socket_init(&app.dcs_usocket, &app.ctx, cfg.ip, cfg.dcsport, &app.dcs_ubuf);
147 6 : if (app.zero_copy_readout == 1){
148 0 : app.dcs_usocket.cb_msg_published = on_unbuffered_msg_published;
149 : } else {
150 6 : app.dcs_usocket.cb_msg_published = NULL;
151 : }
152 6 : app.dcs_usocket.cb_subscribe = on_unbuffered_subscribe;
153 6 : app.dcs_usocket.cb_connection_established = on_unbuffered_connection_established;
154 6 : app.dcs_usocket.cb_connection_closed = on_unbuffered_connection_closed;
155 6 : }
156 :
157 : //buffered sockets
158 : static void
159 31 : init_buffered_socket()
160 : {
161 31 : struct netio_buffered_socket_attr attr;
162 31 : attr.num_pages = cfg.netio_pages;
163 31 : attr.pagesize = cfg.netio_pagesize;
164 31 : attr.watermark = cfg.netio_watermark;
165 31 : attr.timeout_ms = cfg.netio_timeout;
166 :
167 31 : netio_publish_socket_init(&app.gbt_socket, &app.ctx, cfg.ip, cfg.port, &attr);
168 31 : app.gbt_socket.cb_subscribe = on_buffered_subscribe;
169 31 : app.gbt_socket.cb_buffer_available = on_buffered_publish_buffer_available;
170 :
171 31 : dump_fabric_info(app.gbt_socket.lsocket.fi);
172 31 : }
173 :
174 : static void
175 44 : init_ttc2h_socket()
176 : {
177 44 : struct netio_buffered_socket_attr attr;
178 44 : attr.num_pages = cfg.ttc_netio_pages;
179 44 : attr.pagesize = cfg.ttc_netio_pagesize;
180 44 : attr.watermark = cfg.ttc_netio_watermark;
181 44 : attr.timeout_ms = cfg.ttc_netio_timeout;
182 44 : netio_publish_socket_init(&app.ttc2h_socket, &app.ctx, cfg.ip, cfg.ttcport, &attr);
183 44 : app.ttc2h_socket.cb_subscribe = on_buffered_subscribe;
184 44 : app.ttc2h_socket.cb_buffer_available = on_buffered_publish_buffer_available;
185 44 : }
186 :
187 : static void
188 25 : init_dcs_buffered_socket()
189 : {
190 25 : struct netio_buffered_socket_attr attr;
191 25 : attr.num_pages = cfg.dcs_netio_pages;
192 25 : attr.pagesize = cfg.dcs_netio_pagesize;
193 25 : attr.watermark = cfg.dcs_netio_watermark;
194 25 : attr.timeout_ms = cfg.dcs_netio_timeout;
195 25 : netio_publish_socket_init(&app.dcs_socket, &app.ctx, cfg.ip, cfg.dcsport, &attr);
196 25 : app.dcs_socket.cb_subscribe = on_buffered_subscribe;
197 25 : app.dcs_socket.cb_buffer_available = on_buffered_publish_buffer_available;
198 25 : }
199 :
200 18 : static void on_unbuffered_connection_established(struct netio_unbuffered_publish_socket* socket) {
201 18 : LOG_INFO("Connection established (unbuffered)");
202 18 : }
203 :
204 18 : static void on_unbuffered_connection_closed(struct netio_unbuffered_publish_socket* socket) {
205 18 : LOG_INFO("Connection closed (unbuffered)");
206 18 : }
207 :
208 36 : static void on_buffered_subscribe(struct netio_publish_socket* socket, netio_tag_t tag, void* addr, size_t addrlen)
209 : {
210 36 : on_subscribe(tag, addr, addrlen);
211 36 : }
212 :
213 18 : static void on_unbuffered_subscribe(struct netio_unbuffered_publish_socket* socket, netio_tag_t tag, void* addr, size_t addrlen)
214 : {
215 18 : on_subscribe(tag, addr, addrlen);
216 18 : }
217 :
218 54 : void on_subscribe(netio_tag_t fid, void* addr, size_t addrlen)
219 : {
220 486 : LOG_INFO("new subscription for elink %d fid 0x%016lx 0x%01x-%02x-%04x-%01x-%05x-%01x-%02x-%02x",
221 : get_elink(fid),
222 : fid,
223 : get_vid(fid), get_did(fid), get_cid(fid),
224 : is_virtual(fid), get_elink(fid), is_to_flx(fid), get_protocol(fid), get_sid(fid));
225 54 : }
226 :
227 0 : void on_poll(void* ptr)
228 : {
229 0 : ++app.statistics.counters.polls;
230 0 : on_data_available();
231 0 : }
232 :
233 5535804 : void on_interrupt(void* ptr)
234 : {
235 5535804 : ++app.statistics.counters.interrupts;
236 5535804 : on_data_available();
237 5535804 : }
238 :
239 214519 : void on_buffered_publish_buffer_available(struct netio_publish_socket* socket)
240 : {
241 214519 : app.statistics.counters.buffer_available_events++;
242 214519 : on_data_available();
243 214519 : }
244 :
245 3082487 : static void on_data_available_buffered()
246 : {
247 3082487 : size_t total = flx_bytes_available(&app.flx, &app.buffer);
248 : // total may contain bytes wrapping around the circular buffer
249 3082487 : size_t available = MIN(total, app.buffer.pend - app.buffer.pc_ptr);
250 :
251 3082487 : unsigned nblocks = available / app.block_size;
252 3082487 : char* block_ptr = (char*)(app.buffer.vaddr + (app.buffer.pc_ptr - app.buffer.pstart));
253 3082487 : unsigned blocks_processed = 0;
254 853018718 : for(; blocks_processed < nblocks; ++blocks_processed) {
255 850045069 : int result = process_block(block_ptr);
256 850045069 : if (result) {
257 173350 : if (result == BLOCK_AGAIN) {
258 108838 : app.statistics.counters.number_send_retries++;
259 108838 : break;
260 : }
261 64512 : if (result == BLOCK_ERROR){
262 64512 : dump_block(block_ptr);
263 : }
264 : }
265 849936231 : block_ptr += app.block_size;
266 : }
267 3082487 : flx_advance_read_ptr(&app.flx, &app.buffer, blocks_processed * app.block_size);
268 :
269 3082487 : size_t bytes_processed = blocks_processed * app.block_size;
270 3082487 : app.statistics.counters.bytes_processed += bytes_processed;
271 3082487 : app.statistics.counters.blocks_processed += blocks_processed;
272 :
273 3082487 : if(bytes_processed < total){
274 112185 : netio_signal_fire(&app.signal_data_available);
275 : }
276 3082487 : }
277 :
278 2667836 : static void on_data_available_zerocopy()
279 : {
280 2667836 : uint64_t read_ptr = (uint64_t)app.read_ptr;
281 2667836 : uint64_t write_ptr = (uint64_t)flx_get_write_ptr(&app.flx, &app.buffer);
282 :
283 2667836 : uint64_t available;
284 2667836 : if(write_ptr < read_ptr) {
285 867235 : available = (app.buffer.size - (read_ptr - write_ptr));
286 : } else {
287 1800601 : available = write_ptr - read_ptr;
288 : }
289 :
290 2667836 : if (available == app.buffer.size){
291 0 : available -= app.block_size;
292 : }
293 2667836 : unsigned nblocks = available / app.block_size;
294 2667836 : char* block_ptr = (char*)read_ptr;
295 2667836 : unsigned blocks_processed = 0;
296 317281796 : for(; blocks_processed < nblocks; ++blocks_processed) {
297 316395638 : int result = process_block(block_ptr);
298 316395638 : if (result) {
299 1781678 : if(result & BLOCK_AGAIN) {
300 : break;
301 : }
302 0 : if (result & BLOCK_ERROR){
303 0 : dump_block(block_ptr);
304 : }
305 : }
306 314613960 : block_ptr += app.block_size;
307 : // Wrap-around may occur
308 314613960 : if((uint64_t)block_ptr >= (app.buffer.vaddr + app.buffer.size)) {
309 1192 : block_ptr = (char*)app.buffer.vaddr;
310 : }
311 : }
312 2667836 : unbuffered_advance_read_ptr();
313 2667836 : app.read_ptr = (void*)block_ptr;
314 2667836 : app.statistics.counters.bytes_processed += blocks_processed * app.block_size;
315 2667836 : app.statistics.counters.blocks_processed += blocks_processed;
316 :
317 2667836 : if(blocks_processed < nblocks){
318 1781678 : netio_signal_fire(&app.signal_data_available);
319 : }
320 2667836 : }
321 :
322 : static void
323 44 : on_quit()
324 : {
325 44 : LOG_DBG("Exit handler");
326 44 : log_flush();
327 :
328 44 : app.running = 0;
329 44 : if(cfg.poll_time > 0) {
330 0 : netio_timer_stop(&app.poll_timer);
331 : } else {
332 44 : flx_cancel_irqs(&app.flx);
333 : }
334 44 : netio_terminate(&app.ctx.evloop);
335 44 : }
336 :
337 : static void
338 44 : sigterm_handler (int sig)
339 : {
340 44 : LOG_INFO("Caught signal %d, cleaning up", sig);
341 44 : on_quit();
342 : //exit(0);
343 44 : }
344 :
345 :
346 : // Functions
347 1166440707 : int process_block(char* blockptr)
348 : {
349 1166440707 : struct block* b = (struct block*)blockptr;
350 1166440707 : local_elink_t elink = b->hdr.elink;
351 1166440707 : if(elink >= app.elink_table.max_elinks){
352 : return BLOCK_OK;
353 : }
354 1166440707 : struct elink_entry* e = &app.elink_table.entries[elink];
355 1166440707 : if (e->type == 0) {
356 0 : if (e->is_reported == 0) {
357 0 : LOG_ERR("Received a block from E-Link %d which was not configured, will not report again.", elink);
358 0 : e->is_reported = 1;
359 : }
360 0 : return BLOCK_OK;
361 : }
362 1166440707 : int r = decode_block(&e->decoder, b);
363 1166440707 : e->counters.number_send_retries += (r & BLOCK_AGAIN);
364 1166440707 : e->counters.processed_blocks += !(r & BLOCK_AGAIN);
365 1166440707 : return r;
366 : }
367 :
368 : // Threads
369 : void*
370 44 : run_card_thread(void* ptr)
371 : {
372 44 : flx_irq_data_enable(&app.flx);
373 9073567 : while(app.running) {
374 9073479 : flx_wait_data_available(&app.flx, &app.buffer);
375 9073479 : if(app.running) {
376 9073436 : netio_signal_fire(&app.signal_data_available);
377 : }
378 : }
379 44 : return NULL;
380 : }
381 :
382 : void*
383 44 : run_netio_thread(void* ptr)
384 : {
385 44 : netio_run(&app.ctx.evloop);
386 44 : return NULL;
387 : }
388 :
389 : // Main
390 44 : int main(int argc, char** argv)
391 : {
392 44 : cli_parse(argc, argv, &cfg);
393 :
394 44 : app.daq_min_pages = cfg.netio_pages;
395 44 : app.dcs_min_pages = cfg.dcs_netio_pages;
396 44 : app.ttc_min_pages = cfg.ttc_netio_pages;
397 :
398 44 : app.dcs_rate_limit = cfg.dcs_rate_limit;
399 44 : if(app.dcs_rate_limit){
400 0 : LOG_INFO("DCS e-link masked above %d kHz", app.dcs_rate_limit);
401 : }
402 :
403 44 : app.dcs_size_limit = cfg.dcs_size_limit;
404 44 : if(app.dcs_size_limit){
405 44 : LOG_INFO("DCS e-links drop messages with size 0 and larger than %d", app.dcs_size_limit);
406 : }
407 :
408 44 : if(cfg.max_buf_chunk_size != MAX_BUF_CHUNK_SIZE){
409 0 : LOG_INFO("Maximum chunk size for buffered mode is set to: %lu", cfg.max_buf_chunk_size);
410 : }
411 :
412 : //logging
413 44 : netio_tag_t log_fid = get_fid(cfg.co, ERROR_LINK, 0, cfg.vid, 0, 1); // sid = 0; is_to_flx = 0, is_virtual = 1
414 44 : log_init(cfg.verbose, cfg.error_fifo, log_fid, cfg.appname, cfg.device);
415 :
416 44 : netio_tag_t stats_fid = get_fid(cfg.co, STATS_LINK, 0, cfg.vid, 0, 1); // sid = 0; is_to_flx = 0, is_virtual = 1
417 :
418 44 : app.statistics.counters.chunks_discarded = 0;
419 44 : statistics_init(&app.stats_writer, cfg.statistics_fifo, stats_fid, &app.flx, &cfg, &app.buffer);
420 :
421 44 : cfg.lock_mask = 0;
422 : // for backward compatibility
423 44 : if (cfg.dmaid == 0) {
424 44 : cfg.lock_mask = LOCK_DMA0;
425 : }
426 : // real lock
427 44 : cfg.lock_mask |= LOCK_DMA(cfg.dmaid);
428 :
429 44 : int rv = flx_init_card(&app.flx, &cfg);
430 44 : if (rv) {
431 0 : LOG_ERR("Could not open (virtual) device %d with lock_mask %d: error_code %d\n", cfg.device, cfg.lock_mask, rv);
432 0 : return 1;
433 : }
434 :
435 : //interrupts
436 44 : flx_irq_data_disable(&app.flx);
437 44 : flx_irq_busy_enable(&app.flx);
438 :
439 44 : app.regmap = flx_get_regmap_version(&app.flx);
440 : #if REGMAP_VERSION < 0x0500
441 23 : if ((app.regmap < 0x0400) || (app.regmap >= 0x0500)) {
442 : #else
443 21 : if ((app.regmap < 0x0500) || (app.regmap >= 0x0600)) {
444 : #endif
445 0 : LOG_ERR("Wrong version of felix-star %s for this registermap %x\n", FELIX_TAG, app.regmap);
446 0 : return 1;
447 : }
448 :
449 44 : char cmem_name[255];
450 44 : snprintf(cmem_name, 255, "%s-%d-%d", basename(argv[0]), cfg.device, cfg.dmaid);
451 44 : LOG_INFO("CMEM name: %s", cmem_name);
452 44 : app.buffer.vmem = cfg.vmem;
453 44 : app.buffer.free_previous_cmem = cfg.free_previous_cmem;
454 44 : flx_init_cmem_buffer(&app.buffer, cfg.cmem_buffersize, cmem_name);
455 44 : cfg.dma_buffer_vaddr = app.buffer.vaddr;
456 :
457 44 : struct sigaction action;
458 44 : memset (&action, 0, sizeof(action));
459 44 : action.sa_handler = sigterm_handler;
460 44 : if (sigaction(SIGINT, &action, 0)) {
461 0 : perror ("sigaction");
462 0 : return 1;
463 : }
464 44 : LOG_DBG("registered signal handler");
465 :
466 44 : mode = flx_get_mode(&app.flx);
467 44 : if(mode == FLX_MODE_GBT || mode == FLX_MODE_LPGBT || mode == FLX_MODE_LTDB || mode == FLX_MODE_PIXEL || mode == FLX_MODE_STRIP) {
468 31 : app.zero_copy_readout = 0;
469 31 : on_data_available = on_data_available_buffered;
470 : } else if(mode == FLX_MODE_FULL || mode == FLX_MODE_MROD) {
471 13 : app.zero_copy_readout = cfg.buffered ? 0 : 1;
472 13 : if (app.zero_copy_readout == 1){
473 13 : init_completion_table();
474 13 : on_data_available = on_data_available_zerocopy;
475 : } else {
476 0 : app.ctable = NULL;
477 0 : on_data_available = on_data_available_buffered;
478 : }
479 : } else {
480 0 : LOG_ERR("Firmware mode unsupported");
481 0 : exit(1);
482 : }
483 :
484 44 : statistics_mode_init(&app.stats_writer);
485 44 : statistics_reg_init(&app.stats_writer);
486 44 : app.l1id_check = cfg.l1id_check;
487 44 : init_l1id_decoder(cfg.l1id_check);
488 :
489 : //config value check
490 44 : if (cfg.netio_watermark > cfg.netio_pagesize){
491 0 : LOG_WARN("Netio watermark higher than pagesize. Setting watermark to pagesize-1");
492 0 : cfg.netio_watermark = cfg.netio_pagesize - 1;
493 : }
494 44 : if (cfg.ttc_netio_watermark > cfg.ttc_netio_pagesize){
495 0 : LOG_WARN("Netio TTC2H watermark higher than pagesize. Setting watermark to pagesize-1");
496 0 : cfg.ttc_netio_watermark = cfg.ttc_netio_pagesize - 1;
497 : }
498 :
499 44 : LOG_INFO("publishing DAQ data on %s:%d", cfg.ip, cfg.port);
500 44 : LOG_INFO("publishing TTC data on %s:%d", cfg.ip, cfg.ttcport);
501 44 : LOG_INFO("publishing DCS data on %s:%d", cfg.ip, cfg.dcsport);
502 44 : LOG_INFO("Connector Offset (co): 0x%07x (vid: 0x%x, did: 0x%x, cid: 0x%x)",
503 : cfg.co, cfg.vid, cfg.did, cfg.cid);
504 :
505 44 : netio_init(&app.ctx);
506 44 : netio_timer_init(&app.ctx.evloop, &app.stats_writer.timer);
507 44 : netio_signal_no_semaphore_init(&app.ctx.evloop, &app.signal_data_available);
508 44 : app.signal_data_available.cb = on_interrupt;
509 44 : app.ctx.evloop.cb_init = on_init;
510 :
511 44 : if(app.zero_copy_readout) {
512 13 : if(mode != FLX_MODE_FULL && mode != FLX_MODE_MROD) {
513 0 : LOG_ERR("Unbuffered mode is only supported for FULL-mode");
514 0 : exit(1);
515 : }
516 13 : LOG_INFO("Using FULL-mode (unbuffered)");
517 13 : init_unbuffered_socket();
518 13 : app.stats_writer.daq_sub_table = &app.fm_socket.subscription_table;
519 :
520 : } else {
521 31 : if(mode == FLX_MODE_FULL || mode == FLX_MODE_MROD) {
522 0 : LOG_INFO("Using FULL-mode (buffered)");
523 : } else {
524 31 : LOG_INFO("Using GBT-mode (buffered)");
525 : }
526 31 : init_buffered_socket();
527 31 : app.stats_writer.daq_sub_table = &app.gbt_socket.subscription_table;
528 : }
529 :
530 44 : init_ttc2h_socket();
531 :
532 44 : if(!(mode == FLX_MODE_FULL || mode == FLX_MODE_MROD)){
533 31 : if(cfg.dcs_buffered){
534 25 : LOG_INFO("Using buffered DCS socket");
535 25 : init_dcs_buffered_socket();
536 25 : app.stats_writer.dcs_sub_table = &app.dcs_socket.subscription_table;
537 : }
538 : else{
539 6 : LOG_INFO("Using unbuffered DCS socket");
540 6 : init_dcs_unbuffered_socket();
541 6 : app.stats_writer.dcs_sub_table = &app.dcs_usocket.subscription_table;
542 : }
543 : } else {
544 13 : app.stats_writer.dcs_sub_table = NULL;
545 : }
546 :
547 44 : uint to_flx = 0;
548 44 : app.num_elinks = flx_read_elink_config(&app.flx, &app.elinks, to_flx);
549 44 : LOG_INFO("num elinks: %u", app.num_elinks);
550 :
551 44 : app.block_size = flx_get_blocksize(&app.flx);
552 44 : app.trailer_size = flx_get_trailer_size(&app.flx);
553 :
554 44 : felix_elinktable_init(&app, &app.elink_table, app.elinks, app.num_elinks, !app.zero_copy_readout,
555 : mode, app.block_size, app.trailer_size, &app.flx, &cfg);
556 :
557 : // Notify bus
558 44 : char bus_filename[255];
559 44 : snprintf(bus_filename, 255, "dma-%d", cfg.dmaid);
560 44 : LOG_INFO("BUS Filename: %s", bus_filename);
561 44 : char* bus_path = felix_bus_path(cfg.bus_dir, cfg.bus_groupname, cfg.vid, cfg.did, cfg.cid, bus_filename);
562 44 : if (bus_path == NULL) {
563 0 : LOG_ERR("Could not create bus_path\n");
564 0 : return 1;
565 : }
566 44 : felix_bus bus = felix_bus_open(bus_path);
567 44 : if (!bus) {
568 0 : LOG_ERR("Could not open bus path %s\n", bus_path);
569 0 : return 1;
570 : }
571 44 : bus_write(bus, app.elinks, app.num_elinks, true, app.zero_copy_readout, &cfg);
572 44 : felix_bus_close(bus); // internally intentionally kept open until program terminates
573 :
574 44 : flx_start_circular_dma(&app.flx, &app.buffer, cfg.dmaid);
575 :
576 44 : app.running = 1;
577 44 : pthread_t card_thread;
578 44 : if(cfg.poll_time == 0) {
579 44 : if(pthread_create(&card_thread, NULL, run_card_thread, NULL)) {
580 0 : LOG_ERR("Could not create card thread. Exit.");
581 0 : return 1;
582 : }
583 : } else {
584 0 : LOG_INFO("activating polling, period: %d us", cfg.poll_time);
585 0 : app.poll_timer.cb = on_poll;
586 0 : netio_timer_init(&app.ctx.evloop, &app.poll_timer);
587 0 : netio_timer_start_us(&app.poll_timer, cfg.poll_time);
588 : }
589 :
590 44 : pthread_t netio_thread;
591 44 : if(pthread_create(&netio_thread, NULL, run_netio_thread, NULL)) {
592 0 : LOG_ERR("Could not create card thread. Exit.");
593 0 : return 1;
594 : }
595 :
596 44 : if(cfg.priority){
597 0 : set_scheduling(netio_thread, SCHED_RR, 2);
598 : }
599 :
600 559 : while (app.running) {
601 515 : usleep(1000*cfg.statistics_period);
602 515 : statistics_write_tohost(&app.stats_writer, &app.statistics, &app.elink_table);
603 : }
604 :
605 44 : void* tmp;
606 44 : if(pthread_join(netio_thread, &tmp)) {
607 0 : LOG_ERR("Could not join thread. Exit.");
608 0 : return 1;
609 : }
610 :
611 44 : if(cfg.poll_time == 0 && pthread_join(card_thread, &tmp)) {
612 0 : LOG_ERR("Could not join thread. Exit.");
613 0 : return 1;
614 : }
615 :
616 : /*
617 : fi_close(&app.fm_buf.mr->fid);
618 : LOG_DBG("MR closed");
619 : log_flush();
620 : */
621 44 : flx_irq_data_disable(&app.flx);
622 44 : flx_irq_busy_disable(&app.flx);
623 :
624 44 : flx_close_card(&app.flx);
625 44 : LOG_DBG("Card closed");
626 44 : log_flush();
627 :
628 44 : flx_close_cmem_buffer(&app.buffer);
629 44 : LOG_DBG("CMEM buffer closed");
630 44 : log_flush();
631 :
632 44 : statistics_free(&app.stats_writer);
633 :
634 44 : felix_elinktable_free(&app.elink_table);
635 :
636 44 : return 0;
637 : }
|