Line data Source code
1 : #include "unbuffered.h"
2 : #include "l1id_decoder.h"
3 :
4 : //#define FELIX_TOHOST_DEBUG_RD
5 :
6 : #if defined FELIX_TOHOST_DEBUG_RD
7 : int wasbad = 0;
8 :
9 : static int is_bad()
10 : {
11 : uint64_t write_ptr = (uint64_t)flx_get_write_ptr(&app.flx, &app.buffer);
12 : uint64_t read_ptr = (uint64_t)flx_get_read_ptr(&app.flx, &app.buffer);
13 : int even = flx_dma_cmp_even_bits(&app.flx);
14 : if((read_ptr < write_ptr && even == 0) || (read_ptr > write_ptr && even == 1)){
15 : if (wasbad == 0){
16 : LOG_ERR("Invalid read ptr configuration: read 0x%lx write 0x%lx parity %d", read_ptr, write_ptr, even);
17 : }
18 : wasbad = 1;
19 : return 1;
20 : } else {
21 : wasbad = 0;
22 : return 0;
23 : }
24 : }
25 :
26 :
27 : static double difference_MB(uint64_t new, uint64_t old)
28 : {
29 : double diff = ((double)new - (double)(old))/1024.0/1024.0;
30 : return diff;
31 : }
32 : #endif
33 :
34 :
35 13 : void init_completion_table()
36 : {
37 13 : completion_table_init(&app.ctable);
38 13 : }
39 :
40 :
41 0 : static void assemble_chunk(uint8_t* chunk, struct iovec* iov, size_t count)
42 : {
43 0 : unsigned int idx=0;
44 0 : for(unsigned i=0; i < count; i++) {
45 0 : for (unsigned int b=0; b < iov[i].iov_len; ++b){
46 0 : chunk[idx] = ((uint8_t*)(iov[i].iov_base))[b];
47 0 : ++idx;
48 : }
49 : }
50 : }
51 :
52 :
53 0 : static void check_daq_chunk_l1id(uint64_t fid, struct elink_entry* e, struct iovec* iov, size_t count, size_t total_size)
54 : {
55 0 : uint8_t* chunk = malloc(total_size);
56 0 : assemble_chunk(chunk, iov, count);
57 0 : int jump = check_sequence_daq(fid, &e->last_xl1id, chunk, total_size);
58 0 : if(jump){ e->counters.l1id_jump++; }
59 0 : free(chunk);
60 0 : }
61 :
62 :
63 2667836 : void unbuffered_advance_read_ptr()
64 : {
65 2667836 : uint64_t current_rd_ptr = (uint64_t)flx_get_read_ptr(&app.flx, &app.buffer);
66 2667836 : uint32_t rd = completion_table_get_rd(app.ctable);
67 2667836 : if(rd == current_rd_ptr || rd == UINT32_MAX){
68 : //without the next line if data arrives after application start
69 : //the rd ptr is never advanced. It looks like felix hw interrupts are not fired.
70 0 : flx_set_read_ptr(&app.flx, &app.buffer, (void*)(current_rd_ptr));
71 0 : return;
72 : } else {
73 2667836 : uint64_t new_rd_ptr = app.buffer.vaddr + rd;
74 2667836 : if(new_rd_ptr < current_rd_ptr) {
75 : #if defined FELIX_TOHOST_DEBUG_RD
76 : double diff = difference_MB(new_rd_ptr, current_rd_ptr);
77 : uint32_t prev = get_previous_rd(app.ctable);
78 : LOG_INFO("Backwards of %f MB from 0x%08lx to 0x%08lx last rd from table 0x%04x", diff, current_rd_ptr-app.buffer.vaddr, new_rd_ptr-app.buffer.vaddr, prev);
79 : #endif
80 1133 : flx_set_read_ptr(&app.flx, &app.buffer, (void*)(app.buffer.vaddr+app.buffer.size));
81 : }
82 2667836 : flx_set_read_ptr(&app.flx, &app.buffer, (void*)new_rd_ptr);
83 : }
84 : #if defined FELIX_TOHOST_DEBUG_RD
85 : is_bad();
86 : #endif
87 : }
88 :
89 :
90 81567839 : int on_unbuffered_chunk(struct elink_entry* entry, uint32_t block_addr, struct iovec* iov, size_t count, uint8_t header, unsigned flags)
91 : {
92 81567839 : entry->counters.received_chunks++;
93 :
94 81567839 : size_t size = 0;
95 426403748 : for(unsigned i=0; i<count; i++) {
96 344835909 : size += iov[i].iov_len;
97 : }
98 :
99 81567839 : uint64_t fid = entry->fid;
100 81567839 : struct stream *stream = &entry->stream[0];
101 81567839 : if(entry->has_streams && size > 1) {
102 26065554 : uint8_t streamid = ((uint8_t*)iov[0].iov_base)[0]; // FELIX header is added later in FM
103 26065554 : fid = fid | streamid;
104 26065554 : stream = &entry->stream[streamid];
105 : }
106 :
107 81567839 : if (entry->type == DCS) {
108 : //rate limit
109 0 : if (app.dcs_rate_limit){
110 0 : if (entry->is_throttled){
111 0 : if(entry->received_chunk_rate < 0.9*app.dcs_rate_limit){
112 0 : entry->is_throttled = 0;
113 0 : LOG_WARN("Resuming data flow of DCS fid 0x%lx: recorded rate %f kHz", fid, entry->received_chunk_rate);
114 : } else {
115 0 : ++entry->counters.dropped;
116 0 : return NETIO_STATUS_OK;
117 : }
118 : } else {
119 0 : if(entry->received_chunk_rate > app.dcs_rate_limit){
120 0 : entry->is_throttled = 1;
121 0 : LOG_WARN("Blocking messages from DCS fid 0x%lx: recorded rate %f kHz", fid, entry->received_chunk_rate);
122 0 : ++entry->counters.dropped;
123 0 : return NETIO_STATUS_OK;
124 : }
125 : }
126 : }
127 : //size limit
128 0 : if(app.dcs_size_limit){
129 0 : if(size == 0 || size > app.dcs_size_limit){
130 0 : ++entry->counters.dropped;
131 0 : return NETIO_STATUS_OK;
132 : }
133 : }
134 : }
135 81567839 : else if(app.l1id_check > 1 && entry->type == DAQ){
136 0 : if(stream->again == 0){
137 0 : check_daq_chunk_l1id(fid, entry, iov, count, size);
138 : }
139 : }
140 :
141 81567839 : unsigned netio_flags = 0;
142 81567839 : if(stream->again == 0) { //OK
143 80106004 : if(app.zero_copy_readout == 1){
144 80106004 : completion_table_push(app.ctable, block_addr);
145 : }
146 80106004 : stream->key = ( (((fid >> 16) & 0xffff) << 32) | block_addr );
147 1461835 : } else if (stream->again == 1) { //AGAIN
148 1461835 : stream->key = ( (((fid >> 16) & 0xffff) << 32) | block_addr );
149 : } else { //PARTIAL
150 : netio_flags |= NETIO_REENTRY;
151 : }
152 :
153 81567839 : struct netio_unbuffered_publish_socket* pub_socket =
154 : (struct netio_unbuffered_publish_socket*)entry->pub_socket;
155 :
156 81567839 : int ret = netio_unbuffered_publishv_usr(pub_socket,
157 : fid,
158 : iov,
159 : count, // iov count
160 : &stream->key, // key
161 : netio_flags, // flags
162 : &stream->subscription_cache, // subscription cache
163 : header,
164 : 1 //header size: 1 byte
165 : );
166 :
167 81567839 : if(ret == NETIO_STATUS_OK || ret == NETIO_STATUS_OK_NOSUB) {
168 : // Message sent
169 80106004 : stream->again = 0;
170 80106004 : entry->counters.processed_chunks++;
171 80106004 : entry->counters.total_chunksize += size;
172 80106004 : if(size > entry->counters.largest_chunksize){entry->counters.largest_chunksize = size;}
173 80106004 : app.statistics.counters.chunks_processed++;
174 80106004 : return 0;
175 : }
176 :
177 : else if(ret == NETIO_STATUS_AGAIN) {
178 : // No data were sent, we need to redo the whole call
179 1461835 : entry->counters.received_chunks--;
180 1461835 : stream->again = 1;
181 1461835 : return 1;
182 : }
183 :
184 : else if(ret == NETIO_STATUS_PARTIAL) {
185 : // Some data were sent, we need to redo the call but set NETIO_REENTRY
186 0 : entry->counters.received_chunks--;
187 0 : stream->again = 2;
188 0 : return 1;
189 : }
190 :
191 : else if (ret == NETIO_ERROR_MAX_IOV_EXCEEDED){
192 : // Message too large, discarded.
193 0 : stream->again = 0;
194 0 : LOG_DBG("Message too large, IOV count %lu. Discarded.", count);
195 0 : if(app.zero_copy_readout == 1){
196 0 : completion_table_update(app.ctable, block_addr);
197 : }
198 0 : return 0;
199 : }
200 :
201 : else {
202 : //NETIO_STATUS_ERROR message discarded.
203 0 : stream->again = 0;
204 0 : LOG_DBG("Netio error, message discarded");
205 0 : if(app.zero_copy_readout == 1){
206 0 : completion_table_update(app.ctable, block_addr);
207 : }
208 0 : return 0;
209 : }
210 : }
211 :
212 :
213 80106004 : void on_unbuffered_msg_published(struct netio_unbuffered_publish_socket* socket, uint64_t key)
214 : {
215 80106004 : uint32_t offset = key & 0xFFFFFFFF;
216 80106004 : completion_table_update(app.ctable, offset);
217 80106004 : }
|