LCOV - code coverage report
Current view: top level - src - tohost_reader.hpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 67 74 90.5 %
Date: 2025-09-09 12:09:29 Functions: 7 7 100.0 %

          Line data    Source code
       1             : #ifndef TOHOST_READER_H_
       2             : #define TOHOST_READER_H_
       3             : 
       4             : #include <cassert>
       5             : #include <cstdint>
       6             : #include "device.hpp"
       7             : #include "tohost_buffer.hpp"
       8             : #include "decoder.hpp"
       9             : #include "publisher.hpp"
      10             : #include "elink.hpp"
      11             : #include "log.hpp"
      12             : 
      13             : /**
      14             :  * A ToHost DMA buffer can be read by multiple ToHostReaders.
      15             :  * ToHostReader is configured to read data from a set of elinks, data from
      16             :  * other elinks is ignored. ToHostReader includes a number of Decoders equal to
      17             :  * the number of elinks and one publisher.
      18             :  * The publisher runs an eventloop thread, therefore each ToHostReaders runs on
      19             :  * a dedicate thread.
      20             :  */
      21             : template <class BUF>
      22             : class ToHostReader {
      23             : 
      24             :     public:
      25             :         /**
      26             :          * @brief ToHostReader constructor.
      27             :          * @param buffer DMA buffer to be read.
      28             :          * @param elinks vector of elinks whose blocks are to be decoded by this reader.
      29             :          * @param publisher network publisher.
      30             :          * @param buffer_poll_period_us polling period in microseconds.
      31             :          * @param zerocopy if zero-copy readout (instead of buffered) is used.
      32             :          * @param l0id_ck flag to enable the L0ID sequentiality check.
      33             :          */
      34             :         ToHostReader(std::shared_ptr<BUF> buffer, const std::vector<Elink> &elinks,
      35             :             std::unique_ptr<Publisher> publisher, int buffer_poll_period_us, bool zerocopy, int l0id_ck);
      36             : 
      37             :         /**
      38             :          * @brief ToHostReader constructor.
      39             :          * @param buffer DMA buffer to be read.
      40             :          * @param elinks_type type of elinks whose blocks are to be decoded by this reader.
      41             :          * @param publisher network publisher.
      42             :          * @param buffer_poll_period_us polling period in microseconds.
      43             :          * @param zerocopy if zero-copy readout (instead of buffered) is used.
      44             :          * @param l0id_ck flag to enable the L0ID sequentiality check.
      45             :          */
      46             :         ToHostReader(std::shared_ptr<BUF> buffer, elink_type_t elinks_type,
      47             :             std::unique_ptr<Publisher> publisher, int buffer_poll_period_us, bool zerocopy, int l0id_ck);
      48             : 
      49             :         ToHostReader(const ToHostReader&) = delete;
      50             :         ToHostReader& operator=(const ToHostReader&) = delete;
      51             : 
      52             :         ToHostReader(ToHostReader&&) noexcept = default;
      53             :         ToHostReader& operator=(ToHostReader&&) noexcept = default;
      54             : 
      55             :         /**
      56             :          * @brief read the DMA buffer: decode blocks and publish chunks.
      57             :          * @return true if not all blocks available at the beginning of the call were decoded.
      58             :          */
      59             :         bool read();
      60             : 
      61             :         /**
      62             :          * @return the number of blocks read by this reader.
      63             :          */
      64             :         [[nodiscard]] size_t get_read_blocks() const {
      65             :             return m_read_blocks;
      66             :         }
      67             : 
      68             :         /**
      69             :          * @brief fire the signal to invoke the configured callback.
      70             :          */
      71           0 :         void fire_publisher_async_signal() {
      72           0 :             m_publisher->fire_asynch_callback();
      73             :         }
      74             : 
      75             :         //monitor info
      76             :         /**
      77             :          * \defgroup Retrieval of monitoring information
      78             :          * @{
      79             :          */
      80             :         /**
      81             :          * @return per-elink monitoring information.
      82             :          */
      83             :         std::vector<ToHostElinkStats> get_reader_elink_stats();
      84             : 
      85             :         /**
      86             :          * @return publisher network resource counter (e.g. number of available buffers or completion objects).
      87             :          */
      88             :         uint32_t get_network_resource_counter();
      89             : 
      90             :         /**
      91             :          * @return Number of subscriptions to the publisher.
      92             :          */
      93             :         uint32_t get_network_subscriptions();
      94             : 
      95             :         /**
      96             :          * @return Number of subscriptions to the publisher.
      97             :          */
      98         188 :         elink_type_t get_elink_type() {return m_elink_type;}
      99             : 
     100             :         /**
     101             :          * @return number of callbacks invoked when network resources (e.g. buffers)
     102             :          * become available after having been exhausted.
     103             :          */
     104             :         uint64_t get_network_resource_available_calls();
     105             : 
     106             :         /**
     107             :          * @return id of the reader
     108             :          */
     109         188 :         uint32_t get_id() const {
     110         188 :             return m_id;
     111             :         }
     112             : 
     113             :         /**
     114             :          * @}
     115             :          */
     116             : 
     117             :     private:
     118             :         const uint32_t m_id;
     119             :         std::shared_ptr<BUF> m_buffer;
     120             :         std::vector<uint16_t> m_lid_2_fid;
     121             :         std::vector<Decoder> m_elink_processors;
     122             :         std::unique_ptr<Publisher> m_publisher;
     123             :         size_t m_read_blocks;
     124             :         size_t m_skipped_blocks;
     125             :         uint32_t m_last_rd; //for zc
     126             :         bool m_zerocopy;
     127             :         bool m_running;
     128             :         size_t m_block_size;
     129             :         elink_type_t m_elink_type;
     130             :         std::vector<ToHostElinkStats> m_elink_stats; // "Last" readout, to compute increment
     131             : 
     132             :         uint32_t get_sent_bytes(Block* span_front, uint32_t read_bytes);
     133             : };
     134             : 
     135             : 
     136             : template <class BUF>
     137          24 : ToHostReader<BUF>::ToHostReader(std::shared_ptr<BUF> buffer, const std::vector<Elink> &elinks,
     138             :     std::unique_ptr<Publisher> publisher, int buffer_poll_period_us, bool zerocopy, int l0id_ck) :
     139          24 :         m_id(buffer->reader_register(zerocopy)),
     140          24 :         m_buffer(buffer),
     141          24 :         m_lid_2_fid(0xfff, 0xffff),
     142          24 :         m_publisher{std::move(publisher)},
     143          24 :         m_read_blocks{0},
     144          24 :         m_last_rd{0},
     145          24 :         m_zerocopy{zerocopy},
     146          24 :         m_running{true},
     147          72 :         m_block_size{m_buffer->get_device()->get_block_size()},
     148          48 :         m_elink_type{elinks.empty() ? NONE_ELINK_TYPE : elinks.begin()->type}
     149             : {
     150          24 :     m_elink_processors.reserve(elinks.size());
     151          24 :     size_t pos = 0;
     152          60 :     for (const auto & e : elinks) {
     153             :         assert(e.lid < 0xfff);
     154          36 :         m_lid_2_fid[e.lid] = pos++;
     155          72 :         flx_tohost_format fmt = buffer->get_device()->get_tohost_data_format();
     156          36 :         m_elink_processors.emplace_back(e, *m_publisher, fmt, l0id_ck, m_block_size, buffer->dma_get_vaddr());
     157          36 :         m_elink_stats.emplace_back(e.fid);
     158             :     }
     159          24 :     m_publisher->declare(elinks);
     160             : 
     161             :     //polling mode / interrupt-driven
     162          24 :     if (buffer_poll_period_us > 0){
     163     6940738 :         m_publisher->set_periodic_callback(buffer_poll_period_us, [this]{return read();});
     164             :     }
     165             :     else {
     166           0 :         m_publisher->set_asynch_callback([this]{return read();});
     167             :     }
     168          24 : }
     169             : 
     170             : template <class BUF>
     171             : ToHostReader<BUF>::ToHostReader(std::shared_ptr<BUF> buffer, elink_type_t type,
     172             :     std::unique_ptr<Publisher> publisher, int buffer_poll_period_us, bool zerocopy, int l0id_ck) :
     173             :         ToHostReader(buffer, buffer->get_elinks_of_type(type),
     174             :             std::move(publisher), buffer_poll_period_us, zerocopy, l0id_ck)
     175             : {}
     176             : 
     177             : template <class BUF>
     178     6940698 : bool ToHostReader<BUF>::read()
     179             : {
     180     6940698 :     if (m_buffer->is_stopped()){return false;}
     181     6884189 :     std::span<Block> blocks = m_buffer->reader_get_available_blocks(m_id);
     182             :     LOG_TRACE("Reader %u there are %lu blocks to read. Current ptr 0x%lx", m_id, blocks.size(), m_buffer->dma_get_read_offset());
     183             : 
     184     6885607 :     uint32_t read_blocks(0);
     185   666419725 :     for (Block& block : blocks) {
     186   666132576 :         local_elink_t lid = block.elink;
     187   666132576 :         if (m_lid_2_fid.size() <= lid) {
     188           0 :            LOG_ERR("Unexpected E-Link ID = 0x%x ", lid);
     189           0 :             ++read_blocks;
     190           0 :             continue;
     191             :         }
     192   666132576 :         if (m_lid_2_fid[lid] == uint16_t(-1)) {
     193    54216598 :             ++read_blocks;
     194    54216598 :             continue;
     195             :         }
     196   611915978 :         Decoder & eproc = m_elink_processors[m_lid_2_fid[lid]];
     197             :         //The e-link processor decodes and publishes the chunks in the block
     198   611915978 :         auto result = eproc.decode(block);
     199   611916150 :         if (result == Publisher::AGAIN or result == Publisher::PARTIAL) {
     200             :             break;
     201             :         }
     202   605317520 :         ++read_blocks;
     203             :     }
     204             : 
     205     6885779 :     m_read_blocks += read_blocks;
     206             : 
     207     6885779 :     uint32_t read_bytes = read_blocks * sizeof(Block);
     208     6885779 :     uint32_t sent_bytes = m_zerocopy ? get_sent_bytes(blocks.data(), read_bytes) : read_bytes;
     209             : 
     210     6885779 :     m_buffer->reader_advance_read_ptr(m_id, read_bytes, sent_bytes);
     211     6885806 :     return (!blocks.empty() and read_blocks != blocks.size());
     212             : }
     213             : 
     214             : 
     215             : template <class BUF>
     216     6457570 : uint32_t ToHostReader<BUF>::get_sent_bytes(Block* span_front, uint32_t read_bytes)
     217             : {
     218     6457570 :     if ( m_publisher->get_completion_table()->is_empty() ) {
     219       36654 :         uint64_t first_block = reinterpret_cast<uint64_t>(span_front);
     220       36654 :         m_last_rd = (first_block + read_bytes) % m_buffer->dma_get_size();
     221       36654 :         return read_bytes;
     222             :     }
     223             : 
     224     6420916 :     uint32_t sent_bytes(0);
     225     6420916 :     uint32_t ct_ptr = m_publisher->get_completion_table()->get_rd();
     226     6420916 :     if (ct_ptr >= m_last_rd) {
     227     6420555 :         sent_bytes = ct_ptr - m_last_rd;
     228             :     } else {
     229         361 :         sent_bytes = (m_buffer->dma_get_size() - m_last_rd) + ct_ptr;
     230             :     }
     231     6420916 :     m_last_rd = ct_ptr;
     232     6420916 :     return sent_bytes;
     233             : }
     234             : 
     235             : 
     236             : template <class BUF>
     237         188 : std::vector<ToHostElinkStats> ToHostReader<BUF>::get_reader_elink_stats()
     238             : {
     239         188 :     std::vector<ToHostElinkStats> data(m_elink_processors.size());
     240         489 :     for(size_t idx = 0; idx < m_elink_processors.size(); ++idx){
     241         602 :         data.at(idx) = {m_elink_processors.at(idx).get_decoder_stats_increment(m_elink_stats.at(idx))};
     242             :     }
     243         188 :     return data;
     244           0 : }
     245             : 
     246             : 
     247             : template <class BUF>
     248         188 : uint32_t ToHostReader<BUF>::get_network_resource_counter()
     249             : {
     250         188 :     return m_publisher->get_resource_counter();
     251             : }
     252             : 
     253             : 
     254             : template <class BUF>
     255         188 : uint32_t ToHostReader<BUF>::get_network_subscriptions()
     256             : {
     257         188 :     return m_publisher->get_subscription_number();
     258             : }
     259             : 
     260             : 
     261             : template <class BUF>
     262         188 : uint64_t ToHostReader<BUF>::get_network_resource_available_calls()
     263             : {
     264         188 :     return m_publisher->get_resource_available_calls();
     265             : }
     266             : 
     267             : 
     268             : #endif /* TOHOST_READER_H_ */

Generated by: LCOV version 1.0