Line data Source code
1 : #ifndef TOHOST_READER_H_
2 : #define TOHOST_READER_H_
3 :
4 : #include <cassert>
5 : #include <cstdint>
6 : #include "device.hpp"
7 : #include "tohost_buffer.hpp"
8 : #include "decoder.hpp"
9 : #include "publisher.hpp"
10 : #include "elink.hpp"
11 : #include "log.hpp"
12 :
13 : /**
14 : * A ToHost DMA buffer can be read by multiple ToHostReaders.
15 : * ToHostReader is configured to read data from a set of elinks, data from
16 : * other elinks is ignored. ToHostReader includes a number of Decoders equal to
17 : * the number of elinks and one publisher.
18 : * The publisher runs an eventloop thread, therefore each ToHostReaders runs on
19 : * a dedicate thread.
20 : */
21 : template <class BUF>
22 : class ToHostReader {
23 :
24 : public:
25 : /**
26 : * @brief ToHostReader constructor.
27 : * @param buffer DMA buffer to be read.
28 : * @param elinks vector of elinks whose blocks are to be decoded by this reader.
29 : * @param publisher network publisher.
30 : * @param buffer_poll_period_us polling period in microseconds.
31 : * @param zerocopy if zero-copy readout (instead of buffered) is used.
32 : * @param l0id_ck flag to enable the L0ID sequentiality check.
33 : */
34 : ToHostReader(std::shared_ptr<BUF> buffer, const std::vector<Elink> &elinks,
35 : std::unique_ptr<Publisher> publisher, int buffer_poll_period_us, bool zerocopy, int l0id_ck);
36 :
37 : /**
38 : * @brief ToHostReader constructor.
39 : * @param buffer DMA buffer to be read.
40 : * @param elinks_type type of elinks whose blocks are to be decoded by this reader.
41 : * @param publisher network publisher.
42 : * @param buffer_poll_period_us polling period in microseconds.
43 : * @param zerocopy if zero-copy readout (instead of buffered) is used.
44 : * @param l0id_ck flag to enable the L0ID sequentiality check.
45 : */
46 : ToHostReader(std::shared_ptr<BUF> buffer, elink_type_t elinks_type,
47 : std::unique_ptr<Publisher> publisher, int buffer_poll_period_us, bool zerocopy, int l0id_ck);
48 :
49 : ToHostReader(const ToHostReader&) = delete;
50 : ToHostReader& operator=(const ToHostReader&) = delete;
51 :
52 : ToHostReader(ToHostReader&&) noexcept = default;
53 : ToHostReader& operator=(ToHostReader&&) noexcept = default;
54 :
55 : /**
56 : * @brief read the DMA buffer: decode blocks and publish chunks.
57 : * @return true if not all blocks available at the beginning of the call were decoded.
58 : */
59 : bool read();
60 :
61 : /**
62 : * @return the number of blocks read by this reader.
63 : */
64 : [[nodiscard]] size_t get_read_blocks() const {
65 : return m_read_blocks;
66 : }
67 :
68 : /**
69 : * @brief fire the signal to invoke the configured callback.
70 : */
71 0 : void fire_publisher_async_signal() {
72 0 : m_publisher->fire_asynch_callback();
73 : }
74 :
75 : //monitor info
76 : /**
77 : * \defgroup Retrieval of monitoring information
78 : * @{
79 : */
80 : /**
81 : * @return per-elink monitoring information.
82 : */
83 : std::vector<ToHostElinkStats> get_reader_elink_stats();
84 :
85 : /**
86 : * @return publisher network resource counter (e.g. number of available buffers or completion objects).
87 : */
88 : uint32_t get_network_resource_counter();
89 :
90 : /**
91 : * @return Number of subscriptions to the publisher.
92 : */
93 : uint32_t get_network_subscriptions();
94 :
95 : /**
96 : * @return Number of subscriptions to the publisher.
97 : */
98 188 : elink_type_t get_elink_type() {return m_elink_type;}
99 :
100 : /**
101 : * @return number of callbacks invoked when network resources (e.g. buffers)
102 : * become available after having been exhausted.
103 : */
104 : uint64_t get_network_resource_available_calls();
105 :
106 : /**
107 : * @return id of the reader
108 : */
109 188 : uint32_t get_id() const {
110 188 : return m_id;
111 : }
112 :
113 : /**
114 : * @}
115 : */
116 :
117 : private:
118 : const uint32_t m_id;
119 : std::shared_ptr<BUF> m_buffer;
120 : std::vector<uint16_t> m_lid_2_fid;
121 : std::vector<Decoder> m_elink_processors;
122 : std::unique_ptr<Publisher> m_publisher;
123 : size_t m_read_blocks;
124 : size_t m_skipped_blocks;
125 : uint32_t m_last_rd; //for zc
126 : bool m_zerocopy;
127 : bool m_running;
128 : size_t m_block_size;
129 : elink_type_t m_elink_type;
130 : std::vector<ToHostElinkStats> m_elink_stats; // "Last" readout, to compute increment
131 :
132 : uint32_t get_sent_bytes(Block* span_front, uint32_t read_bytes);
133 : };
134 :
135 :
136 : template <class BUF>
137 24 : ToHostReader<BUF>::ToHostReader(std::shared_ptr<BUF> buffer, const std::vector<Elink> &elinks,
138 : std::unique_ptr<Publisher> publisher, int buffer_poll_period_us, bool zerocopy, int l0id_ck) :
139 24 : m_id(buffer->reader_register(zerocopy)),
140 24 : m_buffer(buffer),
141 24 : m_lid_2_fid(0xfff, 0xffff),
142 24 : m_publisher{std::move(publisher)},
143 24 : m_read_blocks{0},
144 24 : m_last_rd{0},
145 24 : m_zerocopy{zerocopy},
146 24 : m_running{true},
147 72 : m_block_size{m_buffer->get_device()->get_block_size()},
148 48 : m_elink_type{elinks.empty() ? NONE_ELINK_TYPE : elinks.begin()->type}
149 : {
150 24 : m_elink_processors.reserve(elinks.size());
151 24 : size_t pos = 0;
152 60 : for (const auto & e : elinks) {
153 : assert(e.lid < 0xfff);
154 36 : m_lid_2_fid[e.lid] = pos++;
155 72 : flx_tohost_format fmt = buffer->get_device()->get_tohost_data_format();
156 36 : m_elink_processors.emplace_back(e, *m_publisher, fmt, l0id_ck, m_block_size, buffer->dma_get_vaddr());
157 36 : m_elink_stats.emplace_back(e.fid);
158 : }
159 24 : m_publisher->declare(elinks);
160 :
161 : //polling mode / interrupt-driven
162 24 : if (buffer_poll_period_us > 0){
163 6940738 : m_publisher->set_periodic_callback(buffer_poll_period_us, [this]{return read();});
164 : }
165 : else {
166 0 : m_publisher->set_asynch_callback([this]{return read();});
167 : }
168 24 : }
169 :
170 : template <class BUF>
171 : ToHostReader<BUF>::ToHostReader(std::shared_ptr<BUF> buffer, elink_type_t type,
172 : std::unique_ptr<Publisher> publisher, int buffer_poll_period_us, bool zerocopy, int l0id_ck) :
173 : ToHostReader(buffer, buffer->get_elinks_of_type(type),
174 : std::move(publisher), buffer_poll_period_us, zerocopy, l0id_ck)
175 : {}
176 :
177 : template <class BUF>
178 6940698 : bool ToHostReader<BUF>::read()
179 : {
180 6940698 : if (m_buffer->is_stopped()){return false;}
181 6884189 : std::span<Block> blocks = m_buffer->reader_get_available_blocks(m_id);
182 : LOG_TRACE("Reader %u there are %lu blocks to read. Current ptr 0x%lx", m_id, blocks.size(), m_buffer->dma_get_read_offset());
183 :
184 6885607 : uint32_t read_blocks(0);
185 666419725 : for (Block& block : blocks) {
186 666132576 : local_elink_t lid = block.elink;
187 666132576 : if (m_lid_2_fid.size() <= lid) {
188 0 : LOG_ERR("Unexpected E-Link ID = 0x%x ", lid);
189 0 : ++read_blocks;
190 0 : continue;
191 : }
192 666132576 : if (m_lid_2_fid[lid] == uint16_t(-1)) {
193 54216598 : ++read_blocks;
194 54216598 : continue;
195 : }
196 611915978 : Decoder & eproc = m_elink_processors[m_lid_2_fid[lid]];
197 : //The e-link processor decodes and publishes the chunks in the block
198 611915978 : auto result = eproc.decode(block);
199 611916150 : if (result == Publisher::AGAIN or result == Publisher::PARTIAL) {
200 : break;
201 : }
202 605317520 : ++read_blocks;
203 : }
204 :
205 6885779 : m_read_blocks += read_blocks;
206 :
207 6885779 : uint32_t read_bytes = read_blocks * sizeof(Block);
208 6885779 : uint32_t sent_bytes = m_zerocopy ? get_sent_bytes(blocks.data(), read_bytes) : read_bytes;
209 :
210 6885779 : m_buffer->reader_advance_read_ptr(m_id, read_bytes, sent_bytes);
211 6885806 : return (!blocks.empty() and read_blocks != blocks.size());
212 : }
213 :
214 :
215 : template <class BUF>
216 6457570 : uint32_t ToHostReader<BUF>::get_sent_bytes(Block* span_front, uint32_t read_bytes)
217 : {
218 6457570 : if ( m_publisher->get_completion_table()->is_empty() ) {
219 36654 : uint64_t first_block = reinterpret_cast<uint64_t>(span_front);
220 36654 : m_last_rd = (first_block + read_bytes) % m_buffer->dma_get_size();
221 36654 : return read_bytes;
222 : }
223 :
224 6420916 : uint32_t sent_bytes(0);
225 6420916 : uint32_t ct_ptr = m_publisher->get_completion_table()->get_rd();
226 6420916 : if (ct_ptr >= m_last_rd) {
227 6420555 : sent_bytes = ct_ptr - m_last_rd;
228 : } else {
229 361 : sent_bytes = (m_buffer->dma_get_size() - m_last_rd) + ct_ptr;
230 : }
231 6420916 : m_last_rd = ct_ptr;
232 6420916 : return sent_bytes;
233 : }
234 :
235 :
236 : template <class BUF>
237 188 : std::vector<ToHostElinkStats> ToHostReader<BUF>::get_reader_elink_stats()
238 : {
239 188 : std::vector<ToHostElinkStats> data(m_elink_processors.size());
240 489 : for(size_t idx = 0; idx < m_elink_processors.size(); ++idx){
241 602 : data.at(idx) = {m_elink_processors.at(idx).get_decoder_stats_increment(m_elink_stats.at(idx))};
242 : }
243 188 : return data;
244 0 : }
245 :
246 :
247 : template <class BUF>
248 188 : uint32_t ToHostReader<BUF>::get_network_resource_counter()
249 : {
250 188 : return m_publisher->get_resource_counter();
251 : }
252 :
253 :
254 : template <class BUF>
255 188 : uint32_t ToHostReader<BUF>::get_network_subscriptions()
256 : {
257 188 : return m_publisher->get_subscription_number();
258 : }
259 :
260 :
261 : template <class BUF>
262 188 : uint64_t ToHostReader<BUF>::get_network_resource_available_calls()
263 : {
264 188 : return m_publisher->get_resource_available_calls();
265 : }
266 :
267 :
268 : #endif /* TOHOST_READER_H_ */
|