LCOV - code coverage report
Current view: top level - felix-star/src - unbuffered.c (source / functions) Hit Total Coverage
Test: coverage.info Lines: 47 93 50.5 %
Date: 2025-06-10 03:23:28 Functions: 4 5 80.0 %

          Line data    Source code
       1             : #include "unbuffered.h"
       2             : #include "l1id_decoder.h"
       3             : 
       4             : //#define FELIX_TOHOST_DEBUG_RD
       5             : 
       6             : #if defined FELIX_TOHOST_DEBUG_RD
       7             : int wasbad = 0;
       8             : 
       9             : static int is_bad()
      10             : {
      11             :   uint64_t write_ptr = (uint64_t)flx_get_write_ptr(&app.flx, &app.buffer);
      12             :   uint64_t read_ptr = (uint64_t)flx_get_read_ptr(&app.flx, &app.buffer);
      13             :   int even = flx_dma_cmp_even_bits(&app.flx);
      14             :   if((read_ptr < write_ptr && even == 0) || (read_ptr > write_ptr && even == 1)){
      15             :     if (wasbad == 0){
      16             :       LOG_ERR("Invalid read ptr configuration: read 0x%lx write 0x%lx parity %d", read_ptr, write_ptr, even);
      17             :     }
      18             :     wasbad = 1;
      19             :     return 1;
      20             :   } else {
      21             :     wasbad = 0;
      22             :     return 0;
      23             :   }
      24             : }
      25             : 
      26             : 
      27             : static double difference_MB(uint64_t new, uint64_t old)
      28             : {
      29             :   double diff = ((double)new - (double)(old))/1024.0/1024.0;
      30             :   return diff;
      31             : }
      32             : #endif
      33             : 
      34             : 
      35          13 : void init_completion_table()
      36             : {
      37          13 :   completion_table_init(&app.ctable);
      38          13 : }
      39             : 
      40             : 
      41           0 : static void assemble_chunk(uint8_t* chunk, struct iovec* iov, size_t count)
      42             : {
      43           0 :   unsigned int idx=0;
      44           0 :   for(unsigned i=0; i < count; i++) {
      45           0 :     for (unsigned int b=0; b < iov[i].iov_len; ++b){
      46           0 :       chunk[idx] = ((uint8_t*)(iov[i].iov_base))[b];
      47           0 :       ++idx;
      48             :     }
      49             :   }
      50             : }
      51             : 
      52             : 
      53           0 : static void check_daq_chunk_l1id(uint64_t fid, struct elink_entry* e, struct iovec* iov, size_t count, size_t total_size)
      54             : {
      55           0 :   uint8_t* chunk = malloc(total_size);
      56           0 :   assemble_chunk(chunk, iov, count);
      57           0 :   int jump = check_sequence_daq(fid, &e->last_xl1id, chunk, total_size);
      58           0 :   if(jump){ e->counters.l1id_jump++; }
      59           0 :   free(chunk);
      60           0 : }
      61             : 
      62             : 
      63     2667836 : void unbuffered_advance_read_ptr()
      64             : {
      65     2667836 :   uint64_t current_rd_ptr = (uint64_t)flx_get_read_ptr(&app.flx, &app.buffer);
      66     2667836 :   uint32_t rd = completion_table_get_rd(app.ctable);
      67     2667836 :   if(rd == current_rd_ptr || rd == UINT32_MAX){
      68             :     //without the next line if data arrives after application start
      69             :     //the rd ptr is never advanced. It looks like felix hw interrupts are not fired.
      70           0 :     flx_set_read_ptr(&app.flx, &app.buffer, (void*)(current_rd_ptr));
      71           0 :     return;
      72             :   } else {
      73     2667836 :     uint64_t new_rd_ptr = app.buffer.vaddr + rd;
      74     2667836 :     if(new_rd_ptr < current_rd_ptr) {
      75             : #if defined FELIX_TOHOST_DEBUG_RD
      76             :       double diff = difference_MB(new_rd_ptr, current_rd_ptr);
      77             :       uint32_t prev = get_previous_rd(app.ctable);
      78             :       LOG_INFO("Backwards of %f MB from 0x%08lx to 0x%08lx last rd from table 0x%04x", diff, current_rd_ptr-app.buffer.vaddr, new_rd_ptr-app.buffer.vaddr, prev);
      79             : #endif
      80        1133 :       flx_set_read_ptr(&app.flx, &app.buffer, (void*)(app.buffer.vaddr+app.buffer.size));
      81             :     }
      82     2667836 :     flx_set_read_ptr(&app.flx, &app.buffer, (void*)new_rd_ptr);
      83             :   }
      84             : #if defined FELIX_TOHOST_DEBUG_RD
      85             :   is_bad();
      86             : #endif
      87             : }
      88             : 
      89             : 
      90    81567839 : int on_unbuffered_chunk(struct elink_entry* entry, uint32_t block_addr, struct iovec* iov, size_t count, uint8_t header, unsigned flags)
      91             : {
      92    81567839 :   entry->counters.received_chunks++;
      93             : 
      94    81567839 :   size_t size = 0;
      95   426403748 :   for(unsigned i=0; i<count; i++) {
      96   344835909 :     size += iov[i].iov_len; 
      97             :   }
      98             : 
      99    81567839 :   uint64_t fid = entry->fid;
     100    81567839 :   struct stream *stream = &entry->stream[0];
     101    81567839 :   if(entry->has_streams && size > 1) {
     102    26065554 :     uint8_t streamid = ((uint8_t*)iov[0].iov_base)[0]; // FELIX header is added later in FM
     103    26065554 :     fid = fid | streamid;
     104    26065554 :     stream = &entry->stream[streamid];
     105             :   }
     106             : 
     107    81567839 :   if (entry->type == DCS) {
     108             :     //rate limit
     109           0 :     if (app.dcs_rate_limit){
     110           0 :       if (entry->is_throttled){
     111           0 :         if(entry->received_chunk_rate < 0.9*app.dcs_rate_limit){
     112           0 :           entry->is_throttled = 0;
     113           0 :           LOG_WARN("Resuming data flow of DCS fid 0x%lx: recorded rate %f kHz", fid, entry->received_chunk_rate);
     114             :         } else {
     115           0 :           ++entry->counters.dropped;
     116           0 :           return NETIO_STATUS_OK;
     117             :         }
     118             :       } else {
     119           0 :         if(entry->received_chunk_rate > app.dcs_rate_limit){
     120           0 :           entry->is_throttled = 1;
     121           0 :           LOG_WARN("Blocking messages from DCS fid 0x%lx: recorded rate %f kHz", fid, entry->received_chunk_rate);
     122           0 :           ++entry->counters.dropped;
     123           0 :           return NETIO_STATUS_OK;
     124             :         }
     125             :       }
     126             :     }
     127             :     //size limit
     128           0 :     if(app.dcs_size_limit){
     129           0 :       if(size == 0 || size > app.dcs_size_limit){
     130           0 :         ++entry->counters.dropped;
     131           0 :         return NETIO_STATUS_OK;
     132             :       }
     133             :     }
     134             :   }
     135    81567839 :   else if(app.l1id_check > 1 && entry->type == DAQ){
     136           0 :     if(stream->again == 0){
     137           0 :       check_daq_chunk_l1id(fid, entry, iov, count, size);
     138             :     }
     139             :   }
     140             : 
     141    81567839 :   unsigned netio_flags = 0;
     142    81567839 :   if(stream->again == 0) { //OK
     143    80106004 :     if(app.zero_copy_readout == 1){
     144    80106004 :       completion_table_push(app.ctable, block_addr);
     145             :     }
     146    80106004 :     stream->key = ( (((fid >> 16) & 0xffff) << 32) | block_addr );
     147     1461835 :   } else if (stream->again == 1) { //AGAIN
     148     1461835 :     stream->key = ( (((fid >> 16) & 0xffff) << 32) | block_addr );
     149             :   } else { //PARTIAL
     150             :     netio_flags |= NETIO_REENTRY;
     151             :   }
     152             : 
     153    81567839 :   struct netio_unbuffered_publish_socket* pub_socket =
     154             :     (struct netio_unbuffered_publish_socket*)entry->pub_socket;
     155             : 
     156    81567839 :   int ret = netio_unbuffered_publishv_usr(pub_socket,
     157             :                                       fid,
     158             :                                       iov,
     159             :                                       count,          // iov count
     160             :                                       &stream->key,    // key
     161             :                                       netio_flags,    // flags
     162             :                                       &stream->subscription_cache,  // subscription cache
     163             :                                       header,
     164             :                                       1 //header size: 1 byte
     165             :                                       );
     166             : 
     167    81567839 :   if(ret == NETIO_STATUS_OK || ret == NETIO_STATUS_OK_NOSUB) {
     168             :     // Message sent
     169    80106004 :     stream->again = 0;
     170    80106004 :     entry->counters.processed_chunks++;
     171    80106004 :     entry->counters.total_chunksize += size;
     172    80106004 :     if(size > entry->counters.largest_chunksize){entry->counters.largest_chunksize = size;}
     173    80106004 :     app.statistics.counters.chunks_processed++;
     174    80106004 :     return 0;
     175             :   } 
     176             :   
     177             :   else if(ret == NETIO_STATUS_AGAIN) {
     178             :     // No data were sent, we need to redo the whole call
     179     1461835 :     entry->counters.received_chunks--;
     180     1461835 :     stream->again = 1;
     181     1461835 :     return 1;
     182             :   }
     183             :   
     184             :   else if(ret == NETIO_STATUS_PARTIAL) { 
     185             :     // Some data were sent, we need to redo the call but set NETIO_REENTRY
     186           0 :     entry->counters.received_chunks--;
     187           0 :     stream->again = 2;
     188           0 :     return 1;
     189             :   }
     190             :   
     191             :   else if (ret == NETIO_ERROR_MAX_IOV_EXCEEDED){
     192             :     // Message too large, discarded.
     193           0 :     stream->again = 0;
     194           0 :     LOG_DBG("Message too large, IOV count %lu. Discarded.", count);
     195           0 :     if(app.zero_copy_readout == 1){
     196           0 :       completion_table_update(app.ctable, block_addr);
     197             :     }
     198           0 :     return 0;
     199             :   }
     200             :   
     201             :   else { 
     202             :     //NETIO_STATUS_ERROR message discarded.
     203           0 :     stream->again = 0;
     204           0 :     LOG_DBG("Netio error, message discarded");
     205           0 :     if(app.zero_copy_readout == 1){
     206           0 :       completion_table_update(app.ctable, block_addr);
     207             :     }
     208           0 :     return 0;
     209             :   }
     210             : }
     211             : 
     212             : 
     213    80106004 : void on_unbuffered_msg_published(struct netio_unbuffered_publish_socket* socket, uint64_t key)
     214             : {
     215    80106004 :   uint32_t offset = key & 0xFFFFFFFF;
     216    80106004 :   completion_table_update(app.ctable, offset);
     217    80106004 : }

Generated by: LCOV version 1.0