Program Listing for File tohost_reader.hpp
↰ Return to documentation for file (tohost_reader.hpp
)
#ifndef TOHOST_READER_H_
#define TOHOST_READER_H_
#include <cassert>
#include <cstdint>
#include "device.hpp"
#include "tohost_buffer.hpp"
#include "decoder.hpp"
#include "publisher.hpp"
#include "elink.hpp"
#include "log.hpp"
template <class BUF>
class ToHostReader {
public:
ToHostReader(std::shared_ptr<BUF> buffer, const std::vector<Elink> &elinks,
std::unique_ptr<Publisher> publisher, int buffer_poll_period_us, bool zerocopy, int l0id_ck);
ToHostReader(std::shared_ptr<BUF> buffer, elink_type_t elinks_type,
std::unique_ptr<Publisher> publisher, int buffer_poll_period_us, bool zerocopy, int l0id_ck);
ToHostReader(const ToHostReader&) = delete;
ToHostReader& operator=(const ToHostReader&) = delete;
ToHostReader(ToHostReader&&) noexcept = default;
ToHostReader& operator=(ToHostReader&&) noexcept = default;
bool read();
[[nodiscard]] size_t get_read_blocks() const {
return m_read_blocks;
}
void fire_publisher_async_signal() {
m_publisher->fire_asynch_callback();
}
//monitor info
std::vector<ToHostElinkStats> get_reader_elink_stats();
uint32_t get_network_resource_counter();
uint32_t get_network_subscriptions();
elink_type_t get_elink_type() {return m_elink_type;}
uint64_t get_network_resource_available_calls();
uint32_t get_id() const {
return m_id;
}
private:
const uint32_t m_id;
std::shared_ptr<BUF> m_buffer;
std::vector<uint16_t> m_lid_2_fid;
std::vector<Decoder> m_elink_processors;
std::unique_ptr<Publisher> m_publisher;
size_t m_read_blocks;
size_t m_skipped_blocks;
uint32_t m_last_rd; //for zc
bool m_zerocopy;
bool m_running;
size_t m_block_size;
elink_type_t m_elink_type;
std::vector<ToHostElinkStats> m_elink_stats; // "Last" readout, to compute increment
uint32_t get_sent_bytes(Block* span_front, uint32_t read_bytes);
};
template <class BUF>
ToHostReader<BUF>::ToHostReader(std::shared_ptr<BUF> buffer, const std::vector<Elink> &elinks,
std::unique_ptr<Publisher> publisher, int buffer_poll_period_us, bool zerocopy, int l0id_ck) :
m_id(buffer->reader_register(zerocopy)),
m_buffer(buffer),
m_lid_2_fid(0xfff, 0xffff),
m_publisher{std::move(publisher)},
m_read_blocks{0},
m_last_rd{0},
m_zerocopy{zerocopy},
m_running{true},
m_block_size{m_buffer->get_device()->get_block_size()},
m_elink_type{elinks.empty() ? NONE_ELINK_TYPE : elinks.begin()->type}
{
m_elink_processors.reserve(elinks.size());
size_t pos = 0;
for (const auto & e : elinks) {
assert(e.lid < 0xfff);
m_lid_2_fid[e.lid] = pos++;
flx_tohost_format fmt = buffer->get_device()->get_tohost_data_format();
m_elink_processors.emplace_back(e, *m_publisher, fmt, l0id_ck, m_block_size, buffer->dma_get_vaddr());
m_elink_stats.emplace_back(e.fid);
}
m_publisher->declare(elinks);
//polling mode / interrupt-driven
if (buffer_poll_period_us > 0){
m_publisher->set_periodic_callback(buffer_poll_period_us, [this]{return read();});
}
else {
m_publisher->set_asynch_callback([this]{return read();});
}
}
template <class BUF>
ToHostReader<BUF>::ToHostReader(std::shared_ptr<BUF> buffer, elink_type_t type,
std::unique_ptr<Publisher> publisher, int buffer_poll_period_us, bool zerocopy, int l0id_ck) :
ToHostReader(buffer, buffer->get_elinks_of_type(type),
std::move(publisher), buffer_poll_period_us, zerocopy, l0id_ck)
{}
template <class BUF>
bool ToHostReader<BUF>::read()
{
if (m_buffer->is_stopped()){return false;}
std::span<Block> blocks = m_buffer->reader_get_available_blocks(m_id);
LOG_TRACE("Reader %u there are %lu blocks to read. Current ptr 0x%lx", m_id, blocks.size(), m_buffer->dma_get_read_offset());
uint32_t read_blocks(0);
for (Block& block : blocks) {
local_elink_t lid = block.elink;
if (m_lid_2_fid.size() <= lid) {
LOG_ERR("Unexpected E-Link ID = 0x%x ", lid);
++read_blocks;
continue;
}
if (m_lid_2_fid[lid] == uint16_t(-1)) {
++read_blocks;
continue;
}
Decoder & eproc = m_elink_processors[m_lid_2_fid[lid]];
//The e-link processor decodes and publishes the chunks in the block
auto result = eproc.decode(block);
if (result == Publisher::AGAIN or result == Publisher::PARTIAL) {
break;
}
++read_blocks;
}
m_read_blocks += read_blocks;
uint32_t read_bytes = read_blocks * sizeof(Block);
uint32_t sent_bytes = m_zerocopy ? get_sent_bytes(blocks.data(), read_bytes) : read_bytes;
m_buffer->reader_advance_read_ptr(m_id, read_bytes, sent_bytes);
return (!blocks.empty() and read_blocks != blocks.size());
}
template <class BUF>
uint32_t ToHostReader<BUF>::get_sent_bytes(Block* span_front, uint32_t read_bytes)
{
if ( m_publisher->get_completion_table()->is_empty() ) {
uint64_t first_block = reinterpret_cast<uint64_t>(span_front);
m_last_rd = (first_block + read_bytes) % m_buffer->dma_get_size();
return read_bytes;
}
uint32_t sent_bytes(0);
uint32_t ct_ptr = m_publisher->get_completion_table()->get_rd();
if (ct_ptr >= m_last_rd) {
sent_bytes = ct_ptr - m_last_rd;
} else {
sent_bytes = (m_buffer->dma_get_size() - m_last_rd) + ct_ptr;
}
m_last_rd = ct_ptr;
return sent_bytes;
}
template <class BUF>
std::vector<ToHostElinkStats> ToHostReader<BUF>::get_reader_elink_stats()
{
std::vector<ToHostElinkStats> data(m_elink_processors.size());
for(size_t idx = 0; idx < m_elink_processors.size(); ++idx){
data.at(idx) = {m_elink_processors.at(idx).get_decoder_stats_increment(m_elink_stats.at(idx))};
}
return data;
}
template <class BUF>
uint32_t ToHostReader<BUF>::get_network_resource_counter()
{
return m_publisher->get_resource_counter();
}
template <class BUF>
uint32_t ToHostReader<BUF>::get_network_subscriptions()
{
return m_publisher->get_subscription_number();
}
template <class BUF>
uint64_t ToHostReader<BUF>::get_network_resource_available_calls()
{
return m_publisher->get_resource_available_calls();
}
#endif /* TOHOST_READER_H_ */