Program Listing for File tohost.hpp

Return to documentation for file (tohost.hpp)

#include <cstdint>
#include <list>
#include <map>
#include <csignal>
#include <format>
#include <memory>

#include "ers/ers.h"
#include "device.hpp"
#include "felixbus/FelixBusElinkInfo.hpp"
#include "flxcard/FlxCard.h"
#include "config/config.hpp"
#include "tohost_reader.hpp"

#include "bus.hpp"
#include "util.hpp"
#include "utility.hpp"
#include "monitoring/fifo_writer.hpp"
#include "monitoring/prometheus_writer.hpp"
#include "monitoring/webis_writer.hpp"

ERS_DECLARE_ISSUE(felix_log, tohost_issue, issue_message, ((const std::string&)issue_message))


template <class CFG, class DEV, class BUF>
class ToHost {
    public:
        explicit ToHost(std::unique_ptr<CFG> config) :
            cfg(std::move(config)), m_hostname(Util::get_full_hostname()), m_mon(std::move(create_monitoring_writers(cfg->stats.local_monitoring_types)))
            {
                m_running = true;
            };

        void start();

        void print_monitoring();

        void stop();

    public:
        std::unique_ptr<CFG> cfg;
        std::list<std::shared_ptr<DEV>> devices;
        std::map<int, std::shared_ptr<BUF>> dma_buffers;

    private:
        bool m_running{};
        std::string m_hostname{};
        std::vector<ToHostMonitor> m_mon;
        std::map<int, Bus> bus_map;
        std::map<int, std::list<ToHostReader<BUF>>> m_readers;
        std::vector<std::thread> irq_listeners;
        void make_bus(int dmaid, int unique_dmaid);

        [[nodiscard]] std::vector<ToHostMonitor> create_monitoring_writers(const std::vector<MonitoringType>& writer_types);

         [[nodiscard]] std::unique_ptr<Writer> add_monitoring_writer(const MonitoringType& writer_type);
};


template <class CFG, class DEV, class BUF>
void ToHost<CFG, DEV, BUF>::make_bus(int dmaid, int unique_dmaid)
{
    if(bus_map.find(unique_dmaid) == bus_map.end()){
        bus_map.try_emplace(unique_dmaid, cfg->network.bus_dir, cfg->network.bus_groupname, dmaid, cfg->network.verbose_bus);
    }
}

template <class CFG, class DEV, class BUF>
void ToHost<CFG, DEV, BUF>::stop()
{
    m_running = false;
    for (auto& dma : dma_buffers) {
        dma.second->stop();
    }
}

template <class CFG, class DEV, class BUF>
void ToHost<CFG, DEV, BUF>::start()
{
    ERS_INFO(std::format("Number of PCIe endpoints to read {}", cfg->get_number_devices()));
    ERS_INFO(std::format("Polling period {} us", cfg->poll_time));
    //Open devices, allocate buffers and bus
    auto daq_reader_factory = [&](int udmaid, std::shared_ptr<BUF> buf, uint16_t port, const std::vector<Elink> & elinks, int thread_id, std::string pub_info) {
        m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_buffered_publisher(cfg->network.daq_network_mode, cfg->network.ip, port,
            cfg->network.netio_pages, cfg->network.netio_pagesize,
            cfg->network.netio_timeout, cfg->max_daq_chunk_size, pub_info,
            std::chrono::milliseconds{cfg->stats.monitoring_period_ms}), cfg->poll_time, cfg->network.daq_send_blocks, false, cfg->l1id_check, thread_id);
    };

    auto dcs_reader_factory = [&](int udmaid, std::shared_ptr<BUF> buf, uint16_t port, const std::vector<Elink> & elinks, std::string pub_info) {
        m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_buffered_publisher(cfg->network.dcs_network_mode, cfg->network.ip, port,
            cfg->network.dcs_netio_pages, cfg->network.dcs_netio_pagesize,
            cfg->network.dcs_netio_timeout, cfg->max_dcs_chunk_size, pub_info,
            std::chrono::milliseconds{cfg->stats.monitoring_period_ms}), cfg->poll_time, cfg->network.dcs_send_blocks, false, 0);
    };

    auto ttc_reader_factory = [&](int udmaid, std::shared_ptr<BUF> buf, uint16_t port, const std::vector<Elink> & elinks, std::string pub_info) {
        m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_buffered_publisher(cfg->network.daq_network_mode, cfg->network.ip, port,
            cfg->network.ttc_netio_pages, cfg->network.ttc_netio_pagesize,
            cfg->network.ttc_netio_timeout, UINT32_MAX, pub_info,
            std::chrono::milliseconds{cfg->stats.monitoring_period_ms}), cfg->poll_time, cfg->network.ttc_send_blocks, false, cfg->l1id_check);
    };

    // For zero-copy readers set send blocks to false as these options are incompatible and will fail
    auto daq_zc_reader_factory = [&](int udmaid, std::shared_ptr<BUF> buf, const std::string& ip, uint16_t port, const std::vector<Elink> & elinks, int thread_id, std::string pub_info) {
        m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_zero_copy_publisher(cfg->network.daq_network_mode, ip, port,
            cfg->network.netio_pages, cfg->network.netio_pagesize, cfg->max_daq_chunk_size,
            buf->dma_get_vaddr(), buf->dma_get_size(), pub_info, std::chrono::milliseconds{cfg->stats.monitoring_period_ms}), cfg->poll_time, false, true, cfg->l1id_check, thread_id);
    };

    auto dcs_zc_reader_factory = [&](int udmaid, std::shared_ptr<BUF> buf, const std::string& ip, uint16_t port, const std::vector<Elink> & elinks, std::string pub_info) {
        m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_zero_copy_publisher(cfg->network.dcs_network_mode, ip, port,
            cfg->network.dcs_netio_pages, cfg->network.dcs_netio_pagesize, cfg->max_dcs_chunk_size,
            buf->dma_get_vaddr(), buf->dma_get_size(), pub_info, std::chrono::milliseconds{cfg->stats.monitoring_period_ms}), cfg->poll_time, false, true, cfg->l1id_check);
    };

    //for interrupt-driven mode
    auto irq_listener = [&](int udmaid){
        ERS_INFO(std::format("Running interrupt listener for unique DMA ID {}", udmaid));
        while(m_running){
            dma_buffers.at(udmaid)->dma_wait_for_data_irq();
            dma_buffers.at(udmaid)->dma_increase_irq_counter();
            for (auto& r :  m_readers.at(udmaid)){
                r.fire_publisher_async_signal();
            }
        }
    };

    int tot_elinks{0};
    //Open devices, allocate buffers and bus
    for (auto & dev : devices){

        int dev_no = dev->get_device_number();
        ERS_INFO(std::format("Opening PCIe endpoint {}", dev_no));
        unsigned int lock_mask = dev->make_dma_lock_mask(cfg->resource.dma_ids);
        int ret = dev->open_device(lock_mask);
        if(ret != 0 ){
            ers::error(felix_log::tohost_issue(ERS_HERE, std::format("ToHost - cannot open device {} ", dev_no)));
            throw std::runtime_error("Fatal error: cannot open requested device");
        }

        flx_tohost_format fmt = dev->get_tohost_data_format();
        switch (fmt) {
            case flx_tohost_format::TOHOST_SUBCHUNK_TRAILER:
                ERS_INFO(std::format("Device {} uses TOHOST_SUBCHUNK_TRAILER data format", dev_no));
                break;
        case flx_tohost_format::TOHOST_SUBCHUNK_HEADER:
                ERS_INFO(std::format("Device {} uses TOHOST_SUBCHUNK_HEADER data format", dev_no));
                break;
        default:
            ers::error(felix_log::tohost_issue(ERS_HERE, std::format("ToHost - Device {}: data format {} not recognised. Assuming TOHOST_SUBCHUNK_TRAILER", dev_no, static_cast<int>(fmt))));
        }


        for(int dmaid : cfg->resource.dma_ids){

            int unique_id = cfg->get_unique_dmaid(dmaid, dev_no);
            auto dma_buf = dma_buffers.at(unique_id);
            make_bus(dmaid, unique_id);

            std::string dma_name = "tohost-d" + std::to_string(dev_no) + "-" + std::to_string(dmaid);
            ERS_INFO(std::format("Allocating DMA buffer {} (PCIe endpoint {}, DMAID {})", dma_name, dev_no, dmaid));
            dma_buf->allocate_buffer(cfg->resource.cmem_buffersize, dma_name, cfg->vmem, cfg->resource.free_previous_cmem);
            dma_buf->dma_start_continuous();

            if (cfg->poll_time == 0){
                dma_buf->irq_data_enable();
            }

            std::vector<Elink> ttc2h_elinks = dma_buf->get_elinks_of_type(elink_type_t::TTC);
            std::vector<Elink> ic_elinks = dma_buf->get_elinks_of_type(elink_type_t::IC);
            std::vector<Elink> dcs_elinks = dma_buf->get_elinks_of_type(elink_type_t::DCS);
            std::vector<Elink> ic_dcs_elinks(ic_elinks.begin(), ic_elinks.end());
            ic_dcs_elinks.insert(ic_dcs_elinks.end(), dcs_elinks.begin(), dcs_elinks.end());

            bool has_daq = not dma_buf->get_elinks_of_type(elink_type_t::DAQ).empty();
            bool has_ttc = not ttc2h_elinks.empty();
            bool has_dcs = not ic_dcs_elinks.empty();
            if ((has_daq and cfg->network.daq_unbuffered)
                    or (has_dcs and cfg->network.dcs_unbuffered)) {
                dma_buf->set_zero_copy_reader();
            }

            if (dma_buf->has_zero_copy_reader() and ((has_daq and has_ttc) or (has_daq and has_dcs) or (has_dcs and has_ttc))) {
                throw felix_log::tohost_issue(std::format("Zero-copy/unbuffered mode requires exactly one publisher type. "
                    "DMA buffer {} has multiple types enabled (DAQ: {}, TTC: {}, DCS: {})",
                    dmaid, has_daq, has_ttc, has_dcs));
            }

            // Collect elinks to publish on bus
            std::vector<felixbus::FelixBusElinkInfo> bus_info{};

            //DAQ
            int thread{0};
            for (const std::vector<Elink> & elinks :
                dma_buf->split_elinks_of_type(elink_type_t::DAQ, cfg->daq_threads)) {


                for (const auto& e : elinks) {
                    ERS_INFO(std::format("E-link {}:\t{:#4x}\t{:#18x}\t(channel {} egroup {} epath {}), streams {}",
                     tot_elinks, e.lid, e.fid, dev->get_channel(e.lid), dev->get_egroup(e.lid),
                     dev->get_epath(e.lid), (e.has_streams ? 'Y' : 'N')));
                    ++tot_elinks;
                }

                const auto base_port = cfg->network.ports.at(unique_id);
                const auto computed_port = base_port == 0 ? base_port : base_port + 1000u * thread;

                if (computed_port > std::numeric_limits<uint16_t>::max()) {
                    ers::warning(felix_log::tohost_issue(std::format("Port {} number overflow! Starting port is too big", computed_port)));
                }
                const uint16_t port = static_cast<uint16_t>(computed_port);

                // Used to create an event loop with this information. Needed to trace back the publisher thread
                // The same is valid for all the pub_info that appear below.
                const std::string daq_pub_info = std::format("d{}D{}t{}DAQ", dev_no, dmaid, thread);
                if (cfg->network.daq_unbuffered) {
                    ERS_INFO("Enabling zero-copy publication of DAQ e-links");
                    daq_zc_reader_factory(unique_id, dma_buf, cfg->network.ip, port, elinks, thread, daq_pub_info);
                }
                else {
                    daq_reader_factory(unique_id, dma_buf, port, elinks, thread, daq_pub_info);
                }

                const uint16_t actual_port = m_readers.at(unique_id).back().get_port();
                std::ranges::transform(elinks, std::back_inserter(bus_info), [this, &actual_port](const Elink& e) {
                    return felixbus::FelixBusElinkInfo{
                        .fid = e.fid,
                        .ip = cfg->network.ip,
                        .port = actual_port,
                        .unbuffered = cfg->network.daq_unbuffered,
                        .pubsub = true,
                        .tcp = cfg->network.daq_network_mode == NetworkMode::tcp,
                        .stream = e.has_streams,
                        .netio_pages = cfg->network.netio_pages,
                        .netio_pagesize = cfg->network.netio_pagesize,
                        .blocks = cfg->network.daq_send_blocks,
                    };
                });

                ERS_INFO(std::format("PCIe endpoint {}, DMA {}: DAQ thread {} publishing on port {}", dev_no, dmaid, thread, actual_port));

                ++thread;
            }

            //TTC2H
            if (has_ttc) {
                const uint16_t port = cfg->network.ttc_ports.at(unique_id);
                const std::string ttc_pub_info = std::format("d{}D{}TTC", dev_no, dmaid);

                ttc_reader_factory(unique_id, dma_buf, port, ttc2h_elinks, ttc_pub_info);

                const uint16_t assigned_port = m_readers.at(unique_id).back().get_port();
                std::ranges::transform(ttc2h_elinks, std::back_inserter(bus_info), [this, &assigned_port](const Elink& elink) {
                    return felixbus::FelixBusElinkInfo{
                        .fid = elink.fid,
                        .ip = cfg->network.ip,
                        .port = assigned_port,
                        .unbuffered = false,
                        .pubsub = true,
                        .tcp = cfg->network.daq_network_mode == NetworkMode::tcp,
                        .stream = elink.has_streams,
                        .netio_pages = cfg->network.ttc_netio_pages,
                        .netio_pagesize = cfg->network.ttc_netio_pagesize,
                        .blocks = cfg->network.ttc_send_blocks,
                    };
                });

                ERS_INFO(std::format("Device {}, DMA {}: TTC2H elink published on port {}", dev_no, dmaid, assigned_port));
                auto& e = ttc2h_elinks[0];
                ERS_INFO(std::format("E-link {}:\t{:#4x}\t{:#18x}\t(channel {} egroup {} epath {}), type {}",
                     tot_elinks, e.lid, e.fid, dev->get_channel(e.lid), dev->get_egroup(e.lid),
                     dev->get_epath(e.lid), dev->get_elink_type_str(e.type)));

                ++tot_elinks;
            }

            if ( has_dcs ) {
                const uint16_t port = cfg->network.dcs_ports.at(unique_id);
                const std::string dcs_pub_info = std::format("d{}D{}DCS", dev_no, dmaid);

                if (cfg->network.dcs_unbuffered) {
                    ERS_INFO("Enabling zero-copy publication of DCS e-links");
                    dcs_zc_reader_factory(unique_id, dma_buf, cfg->network.dcs_ip, port, ic_dcs_elinks, dcs_pub_info);
                }
                else {
                    dcs_reader_factory(unique_id, dma_buf, port, ic_dcs_elinks, dcs_pub_info);
                }

                const uint16_t assigned_port = m_readers.at(unique_id).back().get_port();
                std::ranges::transform(ic_dcs_elinks, std::back_inserter(bus_info), [this, &assigned_port](const Elink& elink) {
                    return felixbus::FelixBusElinkInfo{
                        .fid = elink.fid,
                        .ip = cfg->network.dcs_ip,
                        .port = assigned_port,
                        .unbuffered = cfg->network.dcs_unbuffered,
                        .pubsub = true,
                        .tcp = cfg->network.dcs_network_mode == NetworkMode::tcp,
                        .stream = elink.has_streams,
                        .netio_pages = cfg->network.dcs_netio_pages,
                        .netio_pagesize = cfg->network.dcs_netio_pagesize,
                        .blocks = cfg->network.dcs_send_blocks,
                    };
                });

                ERS_INFO(std::format("Device {}, DMA {}: DCS elinks published on port {}", dev_no, dmaid, assigned_port));
                for (auto& e : ic_dcs_elinks) {
                    ERS_INFO(std::format("E-link {}:\t{:#4x}\t{:#18x}\t(channel {} egroup {} epath {}), type {}",
                        tot_elinks, e.lid, e.fid, dev->get_channel(e.lid), dev->get_egroup(e.lid),
                        dev->get_epath(e.lid), dev->get_elink_type_str(e.type)));
                    ++tot_elinks;
                }
            }

            bus_map.at(unique_id).publish(bus_info);

            //run interrupt-listening threads
            if(cfg->poll_time == 0){
                irq_listeners.push_back(std::thread(irq_listener, std::ref(unique_id)));
            }

        } // DMAs
    } // Devices
}


template <class CFG, class DEV, class BUF>
void ToHost<CFG, DEV, BUF>::print_monitoring()
{
    for (auto & wtype : m_mon) {
        wtype.create_new_message(m_hostname);
        //devices
        for (const auto& dev : devices) {
            const unsigned int num_dev = dev->get_device_number();
            const ToHostDeviceStats dev_stats(num_dev);
            wtype.append_device_stats(dev_stats);
            //buffers
            for (const auto& dma : dma_buffers) {
                //Discard DMA buffers of other devices
                const unsigned int dma_buf_device = cfg->udmaid_to_deviceid(dma.first);
                if (dma_buf_device != num_dev) {
                    continue;
                }
                const unsigned int dmaid = cfg->udmaid_to_dmaid(dma.first);
                const ToHostDmaStats dma_stats(
                    dmaid,
                    dma.second->dma_get_free_MB(),
                    dma.second->dma_get_irq_counter()
                );
                wtype.append_dma_stats(dma_stats);
                //readers
                if (m_readers.count(dma.first) > 0 ){ //for DMAs with no e-links/readers
                    for (auto& reader : m_readers.at(dma.first)) {
                        const int thread_id = reader.get_thread_id();
                        const ToHostReaderStats reader_stats (
                            Device::get_elink_type_str(reader.get_elink_type()),
                            thread_id,
                            reader.get_network_subscriptions(),
                            reader.get_network_resource_counter(), //non-thread safe read op
                            reader.get_network_resource_available_calls()  //non-thread safe read op
                        );
                        wtype.append_reader_stats(reader_stats);
                        //elinks
                        const auto elink_mon_info = reader.get_reader_elink_stats();
                        for(const auto & elink_stats : elink_mon_info) {
                            wtype.append_elink_stats(elink_stats);
                        }
                    }
                }
            }
        }
        wtype.write_message();
    }
}

template <class CFG, class DEV, class BUF>
std::vector<ToHostMonitor> ToHost<CFG, DEV, BUF>::create_monitoring_writers(const std::vector<MonitoringType>& writer_types) {
    std::vector<ToHostMonitor> writers;
    for (const auto& writer_type : writer_types) {
        writers.emplace_back(add_monitoring_writer(writer_type));
    }
    return writers;
}

template <class CFG, class DEV, class BUF>
std::unique_ptr<Writer> ToHost<CFG, DEV, BUF>::add_monitoring_writer(const MonitoringType& writer_type) {
    switch (writer_type) {
        case MonitoringType::FIFO:
            return std::make_unique<FIFOWriter>(cfg->stats.monitoring_fifo);
        case MonitoringType::Prometheus: {
            return std::make_unique<PrometheusWriter>(cfg->stats.prometheus_port);
        }
        case MonitoringType::WebIS:
            return std::make_unique<WebISWriter>();
        default:
            throw std::invalid_argument("Unknown writer type");
    }
}