Line data Source code
1 : #include "buffered.h" 2 : #include "l1id_decoder.h" 3 : 4 838293491 : int on_buffered_chunk(struct elink_entry* entry, void* data, size_t len, unsigned flags) 5 : { 6 838293491 : entry->counters.received_chunks++; 7 : 8 838293491 : uint64_t fid = entry->fid; 9 838293491 : struct stream * stream = &entry->stream[0]; 10 838293491 : if(entry->has_streams && len > 1) { 11 0 : uint8_t streamid = ((uint8_t*)data)[1]; // 2nd byte is streamid, 1st byte is FELIX header 12 0 : fid = fid | streamid; 13 0 : stream = &entry->stream[streamid]; 14 : } 15 : 16 838293491 : if (entry->type == DCS) { 17 : //rate limit 18 0 : if (app.dcs_rate_limit){ 19 0 : if (entry->is_throttled){ 20 0 : if(entry->received_chunk_rate < 0.9*app.dcs_rate_limit){ 21 0 : entry->is_throttled = 0; 22 0 : LOG_WARN("Resuming data flow of DCS fid 0x%lx: recorded rate %f kHz", fid, 23 : entry->received_chunk_rate); 24 : } 25 : else { 26 0 : ++entry->counters.dropped; 27 0 : return NETIO_STATUS_OK; 28 : } 29 : } else { 30 0 : if(entry->received_chunk_rate > app.dcs_rate_limit){ 31 0 : entry->is_throttled = 1; 32 0 : LOG_WARN("Blocking messages from DCS fid 0x%lx: recorded rate %f kHz", fid, 33 : entry->received_chunk_rate); 34 0 : ++entry->counters.dropped; 35 0 : return NETIO_STATUS_OK; 36 : } 37 : } 38 : } 39 : //size limit 40 0 : if(app.dcs_size_limit){ 41 0 : if((len - 1) == 0 || (len - 1) > app.dcs_size_limit){ 42 : // Length has already the status byte added which has to be removed here for the check 43 0 : ++entry->counters.dropped; 44 0 : return NETIO_STATUS_OK; 45 : } 46 : } 47 : } 48 : 49 838293491 : else if(app.l1id_check > 0){ 50 0 : uint8_t* chunk = (uint8_t*)data+1; //first byte is status 51 0 : if(stream->again == 1){ 52 0 : entry->last_xl1id.l1id.fields.ec -= 1; 53 : } 54 0 : int jump = 0; 55 0 : if (entry->type == DAQ) { 56 0 : jump = check_sequence_daq(fid, &entry->last_xl1id, chunk, len-1); 57 : } 58 0 : else if (entry->type == TTC) { 59 0 : jump = check_sequence_ttc2h(fid, &entry->last_xl1id, chunk, len-1); 60 : } 61 0 : if(jump){ entry->counters.l1id_jump++; } 62 : } 63 : 64 1676586982 : int ret = netio_buffered_publish((struct netio_publish_socket*)entry->pub_socket, 65 838293491 : fid, data, len, stream->again, &stream->subscription_cache); 66 : 67 : //printf("Buffered chunk to be sent: "); 68 : //for(unsigned int b=0; b<len; ++b){printf("%02X ", ((uint8_t*)data)[b]);} printf("\n"); 69 : //fflush(stdout); 70 : 71 838293491 : if(ret == NETIO_STATUS_OK || ret == NETIO_STATUS_OK_NOSUB) { 72 837864810 : entry->counters.processed_chunks++; 73 837864810 : entry->counters.total_chunksize += len; 74 837864810 : stream->again = 0; 75 837864810 : if(len > entry->counters.largest_chunksize){entry->counters.largest_chunksize = len;} 76 837864810 : app.statistics.counters.chunks_processed++; 77 428681 : } else if (ret == NETIO_STATUS_AGAIN) { 78 428681 : entry->counters.received_chunks--; 79 428681 : stream->again = 1; 80 428681 : return 1; 81 0 : } else if (ret == NETIO_STATUS_TOO_BIG) { 82 0 : stream->again = 0; 83 0 : if(!(app.statistics.counters.chunks_discarded++ % 1000) ){ 84 0 : LOG_WARN("%lu chunks discarded. Message from fid 0x%lx has size %lu larger than netio page size.", 85 : app.statistics.counters.chunks_discarded, fid, len); 86 : } 87 : } else { 88 0 : LOG_ERR("netio_buffered_publish error %d(%s)", ret, fi_strerror(-ret)); 89 0 : stream->again = 0; 90 : } 91 : return 0; 92 : } 93 : 94 : 95 6639786 : void flush_ttc2h_buffer(struct elink_entry* entry, uint32_t block_addr) 96 : { 97 6639786 : netio_buffered_publish_flush(entry->pub_socket, entry->fid, &entry->stream[0].subscription_cache); 98 6639786 : if(app.zero_copy_readout){ 99 6638342 : completion_table_push(app.ctable, block_addr); 100 6638342 : completion_table_update(app.ctable, block_addr); 101 : } 102 6639786 : }