Program Listing for File tohost_reader.hpp

Return to documentation for file (tohost_reader.hpp)

#ifndef TOHOST_READER_H_
#define TOHOST_READER_H_

#include <cassert>
#include <cstdint>
#include "device.hpp"
#include "tohost_buffer.hpp"
#include "decoder.hpp"
#include "publisher.hpp"
#include "elink.hpp"
#include "log.hpp"

template <class BUF>
class ToHostReader {

    public:
        ToHostReader(std::shared_ptr<BUF> buffer, const std::vector<Elink> &elinks,
            std::unique_ptr<Publisher> publisher, int buffer_poll_period_us, bool zerocopy, int l0id_ck);

        ToHostReader(std::shared_ptr<BUF> buffer, elink_type_t elinks_type,
            std::unique_ptr<Publisher> publisher, int buffer_poll_period_us, bool zerocopy, int l0id_ck);

        ToHostReader(const ToHostReader&) = delete;
        ToHostReader& operator=(const ToHostReader&) = delete;

        ToHostReader(ToHostReader&&) noexcept = default;
        ToHostReader& operator=(ToHostReader&&) noexcept = default;

        bool read();

        [[nodiscard]] size_t get_read_blocks() const {
            return m_read_blocks;
        }

        void fire_publisher_async_signal() {
            m_publisher->fire_asynch_callback();
        }

        //monitor info
        std::vector<ToHostElinkStats> get_reader_elink_stats();

        uint32_t get_network_resource_counter();

        uint32_t get_network_subscriptions();

        elink_type_t get_elink_type() {return m_elink_type;}

        uint64_t get_network_resource_available_calls();

        uint32_t get_id() const {
            return m_id;
        }

    private:
        const uint32_t m_id;
        std::shared_ptr<BUF> m_buffer;
        std::vector<uint16_t> m_lid_2_fid;
        std::vector<Decoder> m_elink_processors;
        std::unique_ptr<Publisher> m_publisher;
        size_t m_read_blocks;
        size_t m_skipped_blocks;
        uint32_t m_last_rd; //for zc
        bool m_zerocopy;
        bool m_running;
        size_t m_block_size;
        elink_type_t m_elink_type;
        std::vector<ToHostElinkStats> m_elink_stats; // "Last" readout, to compute increment

        uint32_t get_sent_bytes(Block* span_front, uint32_t read_bytes);
};


template <class BUF>
ToHostReader<BUF>::ToHostReader(std::shared_ptr<BUF> buffer, const std::vector<Elink> &elinks,
    std::unique_ptr<Publisher> publisher, int buffer_poll_period_us, bool zerocopy, int l0id_ck) :
        m_id(buffer->reader_register(zerocopy)),
        m_buffer(buffer),
        m_lid_2_fid(0xfff, 0xffff),
        m_publisher{std::move(publisher)},
        m_read_blocks{0},
        m_last_rd{0},
        m_zerocopy{zerocopy},
        m_running{true},
        m_block_size{m_buffer->get_device()->get_block_size()},
        m_elink_type{elinks.empty() ? NONE_ELINK_TYPE : elinks.begin()->type}
{
    m_elink_processors.reserve(elinks.size());
    size_t pos = 0;
    for (const auto & e : elinks) {
        assert(e.lid < 0xfff);
        m_lid_2_fid[e.lid] = pos++;
        flx_tohost_format fmt = buffer->get_device()->get_tohost_data_format();
        m_elink_processors.emplace_back(e, *m_publisher, fmt, l0id_ck, m_block_size, buffer->dma_get_vaddr());
        m_elink_stats.emplace_back(e.fid);
    }
    m_publisher->declare(elinks);

    //polling mode / interrupt-driven
    if (buffer_poll_period_us > 0){
        m_publisher->set_periodic_callback(buffer_poll_period_us, [this]{return read();});
    }
    else {
        m_publisher->set_asynch_callback([this]{return read();});
    }
}

template <class BUF>
ToHostReader<BUF>::ToHostReader(std::shared_ptr<BUF> buffer, elink_type_t type,
    std::unique_ptr<Publisher> publisher, int buffer_poll_period_us, bool zerocopy, int l0id_ck) :
        ToHostReader(buffer, buffer->get_elinks_of_type(type),
            std::move(publisher), buffer_poll_period_us, zerocopy, l0id_ck)
{}

template <class BUF>
bool ToHostReader<BUF>::read()
{
    if (m_buffer->is_stopped()){return false;}
    std::span<Block> blocks = m_buffer->reader_get_available_blocks(m_id);
    LOG_TRACE("Reader %u there are %lu blocks to read. Current ptr 0x%lx", m_id, blocks.size(), m_buffer->dma_get_read_offset());

    uint32_t read_blocks(0);
    for (Block& block : blocks) {
        local_elink_t lid = block.elink;
        if (m_lid_2_fid.size() <= lid) {
           LOG_ERR("Unexpected E-Link ID = 0x%x ", lid);
            ++read_blocks;
            continue;
        }
        if (m_lid_2_fid[lid] == uint16_t(-1)) {
            ++read_blocks;
            continue;
        }
        Decoder & eproc = m_elink_processors[m_lid_2_fid[lid]];
        //The e-link processor decodes and publishes the chunks in the block
        auto result = eproc.decode(block);
        if (result == Publisher::AGAIN or result == Publisher::PARTIAL) {
            break;
        }
        ++read_blocks;
    }

    m_read_blocks += read_blocks;

    uint32_t read_bytes = read_blocks * sizeof(Block);
    uint32_t sent_bytes = m_zerocopy ? get_sent_bytes(blocks.data(), read_bytes) : read_bytes;

    m_buffer->reader_advance_read_ptr(m_id, read_bytes, sent_bytes);
    return (!blocks.empty() and read_blocks != blocks.size());
}


template <class BUF>
uint32_t ToHostReader<BUF>::get_sent_bytes(Block* span_front, uint32_t read_bytes)
{
    if ( m_publisher->get_completion_table()->is_empty() ) {
        uint64_t first_block = reinterpret_cast<uint64_t>(span_front);
        m_last_rd = (first_block + read_bytes) % m_buffer->dma_get_size();
        return read_bytes;
    }

    uint32_t sent_bytes(0);
    uint32_t ct_ptr = m_publisher->get_completion_table()->get_rd();
    if (ct_ptr >= m_last_rd) {
        sent_bytes = ct_ptr - m_last_rd;
    } else {
        sent_bytes = (m_buffer->dma_get_size() - m_last_rd) + ct_ptr;
    }
    m_last_rd = ct_ptr;
    return sent_bytes;
}


template <class BUF>
std::vector<ToHostElinkStats> ToHostReader<BUF>::get_reader_elink_stats()
{
    std::vector<ToHostElinkStats> data(m_elink_processors.size());
    for(size_t idx = 0; idx < m_elink_processors.size(); ++idx){
        data.at(idx) = {m_elink_processors.at(idx).get_decoder_stats_increment(m_elink_stats.at(idx))};
    }
    return data;
}


template <class BUF>
uint32_t ToHostReader<BUF>::get_network_resource_counter()
{
    return m_publisher->get_resource_counter();
}


template <class BUF>
uint32_t ToHostReader<BUF>::get_network_subscriptions()
{
    return m_publisher->get_subscription_number();
}


template <class BUF>
uint64_t ToHostReader<BUF>::get_network_resource_available_calls()
{
    return m_publisher->get_resource_available_calls();
}


#endif /* TOHOST_READER_H_ */