.. _program_listing_file_tohost_reader.hpp: Program Listing for File tohost_reader.hpp ========================================== |exhale_lsh| :ref:`Return to documentation for file ` (``tohost_reader.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #ifndef TOHOST_READER_H_ #define TOHOST_READER_H_ #include #include #include "device.hpp" #include "tohost_buffer.hpp" #include "decoder.hpp" #include "publisher.hpp" #include "elink.hpp" #include "log.hpp" template class ToHostReader { public: ToHostReader(std::shared_ptr buffer, const std::vector &elinks, std::unique_ptr publisher, int buffer_poll_period_us, bool zerocopy, int l0id_ck); ToHostReader(std::shared_ptr buffer, elink_type_t elinks_type, std::unique_ptr publisher, int buffer_poll_period_us, bool zerocopy, int l0id_ck); ToHostReader(const ToHostReader&) = delete; ToHostReader& operator=(const ToHostReader&) = delete; ToHostReader(ToHostReader&&) noexcept = default; ToHostReader& operator=(ToHostReader&&) noexcept = default; bool read(); [[nodiscard]] size_t get_read_blocks() const { return m_read_blocks; } void fire_publisher_async_signal() { m_publisher->fire_asynch_callback(); } //monitor info std::vector get_reader_elink_stats(); uint32_t get_network_resource_counter(); uint32_t get_network_subscriptions(); elink_type_t get_elink_type() {return m_elink_type;} uint64_t get_network_resource_available_calls(); uint32_t get_id() const { return m_id; } private: const uint32_t m_id; std::shared_ptr m_buffer; std::vector m_lid_2_fid; std::vector m_elink_processors; std::unique_ptr m_publisher; size_t m_read_blocks; size_t m_skipped_blocks; uint32_t m_last_rd; //for zc bool m_zerocopy; bool m_running; size_t m_block_size; elink_type_t m_elink_type; std::vector m_elink_stats; // "Last" readout, to compute increment uint32_t get_sent_bytes(Block* span_front, uint32_t read_bytes); }; template ToHostReader::ToHostReader(std::shared_ptr buffer, const std::vector &elinks, std::unique_ptr publisher, int buffer_poll_period_us, bool zerocopy, int l0id_ck) : m_id(buffer->reader_register(zerocopy)), m_buffer(buffer), m_lid_2_fid(0xfff, 0xffff), m_publisher{std::move(publisher)}, m_read_blocks{0}, m_last_rd{0}, m_zerocopy{zerocopy}, m_running{true}, m_block_size{m_buffer->get_device()->get_block_size()}, m_elink_type{elinks.empty() ? NONE_ELINK_TYPE : elinks.begin()->type} { m_elink_processors.reserve(elinks.size()); size_t pos = 0; for (const auto & e : elinks) { assert(e.lid < 0xfff); m_lid_2_fid[e.lid] = pos++; flx_tohost_format fmt = buffer->get_device()->get_tohost_data_format(); m_elink_processors.emplace_back(e, *m_publisher, fmt, l0id_ck, m_block_size, buffer->dma_get_vaddr()); m_elink_stats.emplace_back(e.fid); } m_publisher->declare(elinks); //polling mode / interrupt-driven if (buffer_poll_period_us > 0){ m_publisher->set_periodic_callback(buffer_poll_period_us, [this]{return read();}); } else { m_publisher->set_asynch_callback([this]{return read();}); } } template ToHostReader::ToHostReader(std::shared_ptr buffer, elink_type_t type, std::unique_ptr publisher, int buffer_poll_period_us, bool zerocopy, int l0id_ck) : ToHostReader(buffer, buffer->get_elinks_of_type(type), std::move(publisher), buffer_poll_period_us, zerocopy, l0id_ck) {} template bool ToHostReader::read() { if (m_buffer->is_stopped()){return false;} std::span blocks = m_buffer->reader_get_available_blocks(m_id); LOG_TRACE("Reader %u there are %lu blocks to read. Current ptr 0x%lx", m_id, blocks.size(), m_buffer->dma_get_read_offset()); uint32_t read_blocks(0); for (Block& block : blocks) { local_elink_t lid = block.elink; if (m_lid_2_fid.size() <= lid) { LOG_ERR("Unexpected E-Link ID = 0x%x ", lid); ++read_blocks; continue; } if (m_lid_2_fid[lid] == uint16_t(-1)) { ++read_blocks; continue; } Decoder & eproc = m_elink_processors[m_lid_2_fid[lid]]; //The e-link processor decodes and publishes the chunks in the block auto result = eproc.decode(block); if (result == Publisher::AGAIN or result == Publisher::PARTIAL) { break; } ++read_blocks; } m_read_blocks += read_blocks; uint32_t read_bytes = read_blocks * sizeof(Block); uint32_t sent_bytes = m_zerocopy ? get_sent_bytes(blocks.data(), read_bytes) : read_bytes; m_buffer->reader_advance_read_ptr(m_id, read_bytes, sent_bytes); return (!blocks.empty() and read_blocks != blocks.size()); } template uint32_t ToHostReader::get_sent_bytes(Block* span_front, uint32_t read_bytes) { if ( m_publisher->get_completion_table()->is_empty() ) { uint64_t first_block = reinterpret_cast(span_front); m_last_rd = (first_block + read_bytes) % m_buffer->dma_get_size(); return read_bytes; } uint32_t sent_bytes(0); uint32_t ct_ptr = m_publisher->get_completion_table()->get_rd(); if (ct_ptr >= m_last_rd) { sent_bytes = ct_ptr - m_last_rd; } else { sent_bytes = (m_buffer->dma_get_size() - m_last_rd) + ct_ptr; } m_last_rd = ct_ptr; return sent_bytes; } template std::vector ToHostReader::get_reader_elink_stats() { std::vector data(m_elink_processors.size()); for(size_t idx = 0; idx < m_elink_processors.size(); ++idx){ data.at(idx) = {m_elink_processors.at(idx).get_decoder_stats_increment(m_elink_stats.at(idx))}; } return data; } template uint32_t ToHostReader::get_network_resource_counter() { return m_publisher->get_resource_counter(); } template uint32_t ToHostReader::get_network_subscriptions() { return m_publisher->get_subscription_number(); } template uint64_t ToHostReader::get_network_resource_available_calls() { return m_publisher->get_resource_available_calls(); } #endif /* TOHOST_READER_H_ */