LCOV - code coverage report
Current view: top level - felix-star/src - buffered.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 25 59 42.4 %
Date: 2025-06-10 03:23:28 Functions: 2 2 100.0 %

          Line data    Source code
       1             : #include "buffered.h"
       2             : #include "l1id_decoder.h"
       3             : 
       4   838293491 : int on_buffered_chunk(struct elink_entry* entry, void* data, size_t len, unsigned flags)
       5             : {
       6   838293491 :   entry->counters.received_chunks++;
       7             : 
       8   838293491 :   uint64_t fid = entry->fid;
       9   838293491 :   struct stream * stream = &entry->stream[0];
      10   838293491 :   if(entry->has_streams && len > 1) {
      11           0 :     uint8_t streamid = ((uint8_t*)data)[1]; // 2nd byte is streamid, 1st byte is FELIX header
      12           0 :     fid = fid | streamid;
      13           0 :     stream = &entry->stream[streamid];
      14             :   }
      15             : 
      16   838293491 :   if (entry->type == DCS) {
      17             :     //rate limit
      18           0 :     if (app.dcs_rate_limit){
      19           0 :       if (entry->is_throttled){ 
      20           0 :         if(entry->received_chunk_rate < 0.9*app.dcs_rate_limit){
      21           0 :           entry->is_throttled = 0;
      22           0 :           LOG_WARN("Resuming data flow of DCS fid 0x%lx: recorded rate %f kHz", fid,
      23             :               entry->received_chunk_rate);
      24             :         }
      25             :         else {
      26           0 :           ++entry->counters.dropped;
      27           0 :           return NETIO_STATUS_OK;
      28             :         }
      29             :       } else {
      30           0 :         if(entry->received_chunk_rate > app.dcs_rate_limit){
      31           0 :           entry->is_throttled = 1;
      32           0 :           LOG_WARN("Blocking messages from DCS fid 0x%lx: recorded rate %f kHz", fid,
      33             :               entry->received_chunk_rate);
      34           0 :           ++entry->counters.dropped;
      35           0 :           return NETIO_STATUS_OK;
      36             :         }
      37             :       }
      38             :     }
      39             :     //size limit
      40           0 :     if(app.dcs_size_limit){
      41           0 :       if((len - 1) == 0 || (len - 1) > app.dcs_size_limit){
      42             :         // Length has already the status byte added which has to be removed here for the check
      43           0 :         ++entry->counters.dropped; 
      44           0 :         return NETIO_STATUS_OK;
      45             :       }
      46             :     }
      47             :   }
      48             :   
      49   838293491 :   else if(app.l1id_check > 0){
      50           0 :     uint8_t* chunk = (uint8_t*)data+1; //first byte is status
      51           0 :     if(stream->again == 1){
      52           0 :       entry->last_xl1id.l1id.fields.ec -= 1;
      53             :     }
      54           0 :     int jump = 0;
      55           0 :     if (entry->type == DAQ) {
      56           0 :         jump = check_sequence_daq(fid, &entry->last_xl1id, chunk, len-1);
      57             :     }
      58           0 :     else if (entry->type == TTC) {
      59           0 :       jump = check_sequence_ttc2h(fid, &entry->last_xl1id, chunk, len-1);
      60             :     }
      61           0 :     if(jump){ entry->counters.l1id_jump++; }
      62             :   }
      63             : 
      64  1676586982 :   int ret = netio_buffered_publish((struct netio_publish_socket*)entry->pub_socket,
      65   838293491 :       fid, data, len, stream->again, &stream->subscription_cache);
      66             : 
      67             :   //printf("Buffered chunk to be sent: ");
      68             :   //for(unsigned int b=0; b<len; ++b){printf("%02X ", ((uint8_t*)data)[b]);} printf("\n");
      69             :   //fflush(stdout);
      70             : 
      71   838293491 :   if(ret == NETIO_STATUS_OK || ret == NETIO_STATUS_OK_NOSUB) {
      72   837864810 :     entry->counters.processed_chunks++;
      73   837864810 :     entry->counters.total_chunksize += len;
      74   837864810 :     stream->again = 0;
      75   837864810 :     if(len > entry->counters.largest_chunksize){entry->counters.largest_chunksize = len;}
      76   837864810 :     app.statistics.counters.chunks_processed++;
      77      428681 :   } else if (ret == NETIO_STATUS_AGAIN) {
      78      428681 :     entry->counters.received_chunks--;
      79      428681 :     stream->again = 1;
      80      428681 :     return 1;
      81           0 :   } else if (ret == NETIO_STATUS_TOO_BIG) {
      82           0 :     stream->again = 0;
      83           0 :     if(!(app.statistics.counters.chunks_discarded++ % 1000) ){
      84           0 :       LOG_WARN("%lu chunks discarded. Message from fid 0x%lx has size %lu larger than netio page size.",
      85             :           app.statistics.counters.chunks_discarded, fid, len);
      86             :     }
      87             :   } else {
      88           0 :     LOG_ERR("netio_buffered_publish error %d(%s)", ret, fi_strerror(-ret));
      89           0 :     stream->again = 0;
      90             :   }
      91             :   return 0;
      92             : }
      93             : 
      94             : 
      95     6639786 : void flush_ttc2h_buffer(struct elink_entry* entry, uint32_t block_addr)
      96             : {
      97     6639786 :   netio_buffered_publish_flush(entry->pub_socket, entry->fid, &entry->stream[0].subscription_cache);
      98     6639786 :   if(app.zero_copy_readout){
      99     6638342 :     completion_table_push(app.ctable, block_addr);
     100     6638342 :     completion_table_update(app.ctable, block_addr);
     101             :   }
     102     6639786 : }

Generated by: LCOV version 1.0