Program Listing for File tohost.hpp

Return to documentation for file (tohost.hpp)

#include <cstdint>
#include <list>
#include <map>
#include <csignal>
#include "device.hpp"
#include "flxcard/FlxCard.h"
#include "config/config.hpp"
#include "tohost_reader.hpp"
#include "network/netio_buffered_publisher.hpp"
#include "network/netio_zerocopy_publisher.hpp"
#include "log.hpp"
#include "bus.hpp"
#include "util.hpp"
#include "utility.hpp"


template <class CFG, class DEV, class BUF>
class ToHost {
    public:
        explicit ToHost(std::unique_ptr<CFG> config) :
            cfg(std::move(config)), m_hostname(Util::get_full_hostname()), m_mon(cfg->stats.monitoring_fifo)
            {m_running = true;};

        void start();

        void print_monitoring();

        void stop();

    public:
        std::unique_ptr<CFG> cfg;
        std::list<std::shared_ptr<DEV>> devices;
        std::map<int, std::shared_ptr<BUF>> dma_buffers;

    private:
        bool m_running;
        std::string m_hostname;
        ToHostMonitor m_mon;
        std::map<int, Bus> bus_map;
        std::map<int, std::list<ToHostReader<BUF>>> m_readers;
        std::vector<std::thread> irq_listeners;
        void make_bus(int dmaid, int unique_dmaid);
};


template <class CFG, class DEV, class BUF>
void ToHost<CFG, DEV, BUF>::make_bus(int dmaid, int unique_dmaid)
{
    if(bus_map.find(unique_dmaid) == bus_map.end()){
        bus_map.emplace(std::piecewise_construct,
            std::forward_as_tuple(unique_dmaid),
            std::forward_as_tuple(cfg->network.bus_dir, cfg->network.bus_groupname, dmaid, cfg->network.verbose_bus)
            );
        }
}

template <class CFG, class DEV, class BUF>
void ToHost<CFG, DEV, BUF>::stop()
{
    m_running = false;
    for (auto& dma : dma_buffers) {
        dma.second->stop();
    }
}

template <class CFG, class DEV, class BUF>
void ToHost<CFG, DEV, BUF>::start()
{
    log_init(cfg->verbose);
    LOG_INFO("Number of PCIe endpoints to read %d", cfg->get_number_devices());
    LOG_INFO("Polling period %u us", cfg->poll_time);
    //Open devices, allocate buffers and bus
    auto daq_reader_factory = [&](int udmaid, std::shared_ptr<BUF> buf, Bus &bus, int port, const std::vector<Elink> & elinks) {
        m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_buffered_publisher(cfg->network.evloop_type, cfg->network.ip, port, bus,
            cfg->network.netio_pages, cfg->network.netio_pagesize,
            cfg->network.netio_watermark, cfg->network.netio_timeout,
            cfg->max_daq_chunk_size), cfg->poll_time, false, cfg->l1id_check);
    };

    auto dcs_reader_factory = [&](int udmaid, std::shared_ptr<BUF> buf, Bus &bus, int port, const std::vector<Elink> & elinks) {
        m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_buffered_publisher(cfg->network.evloop_type, cfg->network.dcs_ip, port, bus,
            cfg->network.dcs_netio_pages, cfg->network.dcs_netio_pagesize,
            cfg->network.dcs_netio_watermark, cfg->network.dcs_netio_timeout,
            cfg->max_dcs_chunk_size), cfg->poll_time, false, 0);
    };

    auto ttc_reader_factory = [&](int udmaid, std::shared_ptr<BUF> buf, Bus &bus, int port, const std::vector<Elink> & elinks) {
        m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_buffered_publisher(cfg->network.evloop_type, cfg->network.ip, port, bus,
            cfg->network.ttc_netio_pages, cfg->network.ttc_netio_pagesize,
            cfg->network.ttc_netio_watermark, cfg->network.ttc_netio_timeout,
            UINT32_MAX), cfg->poll_time, false, cfg->l1id_check);
    };

    auto daq_zc_reader_factory = [&](int udmaid, std::shared_ptr<BUF> buf, Bus &bus, const std::string& ip, int port, const std::vector<Elink> & elinks) {
        m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_zero_copy_publisher(cfg->network.evloop_type, ip, port, bus,
            cfg->network.netio_pages, cfg->network.netio_pagesize, cfg->max_daq_chunk_size,
            buf->dma_get_vaddr(), buf->dma_get_size()), cfg->poll_time, true, cfg->l1id_check);
    };

    auto dcs_zc_reader_factory = [&](int udmaid, std::shared_ptr<BUF> buf, Bus &bus, const std::string& ip, int port, const std::vector<Elink> & elinks) {
        m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_zero_copy_publisher(cfg->network.evloop_type, ip, port, bus,
            cfg->network.dcs_netio_pages, cfg->network.dcs_netio_pagesize, cfg->max_dcs_chunk_size,
            buf->dma_get_vaddr(), buf->dma_get_size()), cfg->poll_time, true, cfg->l1id_check);
    };

    //for interrupt-driven mode
    auto irq_listener = [&](int udmaid){
        LOG_INFO("Running interrupt listener for unique DMA ID %d", udmaid);
        while(m_running){
            dma_buffers.at(udmaid)->dma_wait_for_data_irq();
            dma_buffers.at(udmaid)->dma_increase_irq_counter();
            for (auto& r :  m_readers.at(udmaid)){
                r.fire_publisher_async_signal();
            }
        }
    };

    int tot_elinks{0};
    //Open devices, allocate buffers and bus
    for (auto & dev : devices){

        int dev_no = dev->get_device_number();
        LOG_DBG("Opening PCIe endpoint %u", dev_no);
        unsigned int lock_mask = dev->make_dma_lock_mask(cfg->resource.dma_ids);
        int ret = dev->open_device(lock_mask);
        if(ret != 0 ){
            LOG_ERR("Cannot open device %d.", dev_no);
            throw std::runtime_error("Fatal error: cannot open requested device");
        }

        flx_tohost_format fmt = dev->get_tohost_data_format();
        switch (fmt) {
            case flx_tohost_format::TOHOST_SUBCHUNK_TRAILER:
                LOG_INFO("Device %d uses TOHOST_SUBCHUNK_TRAILER data format", dev_no);
                break;
        case flx_tohost_format::TOHOST_SUBCHUNK_HEADER:
                LOG_INFO("Device %d uses TOHOST_SUBCHUNK_HEADER data format", dev_no);
                break;
        default:
            LOG_ERR("Device %d: ToHost data format %d not recognised. Assuming TOHOST_SUBCHUNK_TRAILER", dev_no, fmt);
        }


        for(int id : cfg->resource.dma_ids){

            int unique_id = cfg->get_unique_dmaid(id, dev_no);
            auto dma_buf = dma_buffers.at(unique_id);
            make_bus(id, unique_id);

            std::string dma_name = "tohost-d" + std::to_string(dev_no) + "-" + std::to_string(id);
            LOG_INFO("Allocating DMA buffer %s (PCIe endpoint %u, DMAID %d)", dma_name.c_str(), dev_no, id);
            dma_buf->allocate_buffer(cfg->resource.cmem_buffersize, dma_name, cfg->vmem, cfg->resource.free_previous_cmem);
            dma_buf->dma_start_continuous();

            if (cfg->poll_time == 0){
                dma_buf->irq_data_enable();
            }

            std::vector<Elink> ttc2h_elinks = dma_buf->get_elinks_of_type(elink_type_t::TTC);
            std::vector<Elink> ic_elinks = dma_buf->get_elinks_of_type(elink_type_t::IC);
            std::vector<Elink> dcs_elinks = dma_buf->get_elinks_of_type(elink_type_t::DCS);
            std::vector<Elink> ic_dcs_elinks(ic_elinks.begin(), ic_elinks.end());
            ic_dcs_elinks.insert(ic_dcs_elinks.end(), dcs_elinks.begin(), dcs_elinks.end());

            bool has_daq = not dma_buf->get_elinks_of_type(elink_type_t::DAQ).empty();
            bool has_ttc = not ttc2h_elinks.empty();
            bool has_dcs = not ic_dcs_elinks.empty();
            if ((has_daq and cfg->network.daq_unbuffered)
                    or (has_dcs and cfg->network.dcs_unbuffered)) {
                dma_buf->set_zero_copy_reader();
            }

            //DAQ
            int thread{0};
            for (const std::vector<Elink> & elinks :
                dma_buf->split_elinks_of_type(elink_type_t::DAQ, cfg->daq_threads)) {
                int port = cfg->network.ports.at(unique_id)+1000*thread;
                LOG_INFO("PCIe endpoint %u, DMA %d: DAQ thread %d publishing on port %d", dev_no, id, thread, port);
                for( const auto & e : elinks ){
                    LOG_INFO("E-link %d:\t0x%02X\t0x%016lx\t(channel %d egroup %d epath %d), type %s, streams %c",
                        tot_elinks, e.lid, e.fid, dev->get_channel(e.lid), dev->get_egroup(e.lid), dev->get_epath(e.lid),
                        dev->get_elink_type_str(e.type).c_str(), e.has_streams ? 'Y' : 'N');
                    ++tot_elinks;
                }
                if(cfg->network.daq_unbuffered) {
                    LOG_INFO("Enabling zero-copy poublication of DAQ e-links");
                    daq_zc_reader_factory(unique_id, dma_buf, bus_map.at(unique_id), cfg->network.ip, port, elinks);
                }
                else {
                    daq_reader_factory(unique_id, dma_buf, bus_map.at(unique_id), port, elinks);
                }
                ++thread;
            }

            //TTC2H
            if ( has_ttc ) {
                int port = cfg->network.ttc_ports.at(unique_id);
                LOG_INFO("Device %u, DMA %d: TTC2H elink published on port %d", dev_no, id, port);
                auto & e = ttc2h_elinks[0];
                LOG_INFO("E-link %d:\t0x%02X\t0x%016lx\t(channel %d egroup %d epath %d), type %s",
                    tot_elinks, e.lid, e.fid, dev->get_channel(e.lid), dev->get_egroup(e.lid),
                    dev->get_epath(e.lid), dev->get_elink_type_str(e.type).c_str());
                ttc_reader_factory(unique_id, dma_buf, bus_map.at(unique_id), port, ttc2h_elinks);
                ++tot_elinks;
            }

            if ( has_dcs ) {
                int port = cfg->network.dcs_ports.at(unique_id);
                LOG_INFO("Device %u, DMA %d: DCS elinks published on port %d", dev_no, id, port);
                for( auto & e : ic_dcs_elinks ){
                    LOG_INFO("E-link %d:\t0x%02X\t0x%016lx\t(channel %d egroup %d epath %d), type %s",
                        tot_elinks, e.lid, e.fid, dev->get_channel(e.lid), dev->get_egroup(e.lid),
                        dev->get_epath(e.lid), dev->get_elink_type_str(e.type).c_str());
                    ++tot_elinks;
                }

                if(cfg->network.dcs_unbuffered) {
                    LOG_INFO("Enabling zero-copy poublication of DCS e-links");
                    dcs_zc_reader_factory(unique_id, dma_buf, bus_map.at(unique_id), cfg->network.dcs_ip, port, ic_dcs_elinks);
                }
                else {
                    dcs_reader_factory(unique_id, dma_buf, bus_map.at(unique_id), port, ic_dcs_elinks);
                }
            }

            //run interrupt-listening threads
            if(cfg->poll_time == 0){
                irq_listeners.push_back(std::thread(irq_listener, std::ref(unique_id)));
            }

        } // DMAs
    } // Devices
}


template <class CFG, class DEV, class BUF>
void ToHost<CFG, DEV, BUF>::print_monitoring()
{
    //devices
    std::string ts = Util::ISO8601TimeUTC();
    for (auto& d : devices) {
        unsigned int device = d->get_device_number();
        ToHostDeviceStats s(
            device
        );
        m_mon.append_device_stats(ts, m_hostname, s);
        //buffers
        for (auto& dma : dma_buffers) {
            //Discard DMA buffers of other devices
            unsigned int dma_buf_device = cfg->udmaid_to_deviceid(dma.first);
            if (dma_buf_device != device) {
                continue;
            }
            unsigned int dmaid = cfg->udmaid_to_dmaid(dma.first);
            ToHostDmaStats ds(
                dmaid,
                dma.second->dma_get_free_MB(),
                dma.second->dma_get_irq_counter()
            );
            m_mon.append_dma_stats(ts, m_hostname, device, ds);
            //readers
            if (m_readers.count(dma.first) > 0 ){ //for DMAs with no e-links/readers
                for (auto& r : m_readers.at(dma.first)) {
                    unsigned int reader_id = r.get_id();
                    ToHostReaderStats rs (
                        reader_id,
                        static_cast<int>(r.get_elink_type()),
                        r.get_network_subscriptions(),
                        r.get_network_resource_counter(), //non-thread safe read op
                        r.get_network_resource_available_calls()  //non-thread safe read op
                    );
                    m_mon.append_reader_stats(ts, m_hostname, device, dmaid, rs);
                    //elinks
                    auto elink_mon_info = r.get_reader_elink_stats();
                    for(const auto & e : elink_mon_info) {
                        m_mon.append_elink_stats(ts, m_hostname, device, dmaid, reader_id, e);
                    }
                }
            }
        }
    }
    // m_mon.write_message(); //keep commented for tokenized writes
}