Program Listing for File fromhost.hpp

Return to documentation for file (fromhost.hpp)

#include <list>
#include <map>
#include <csignal>
#include "config/config.hpp"
#include "fromhost_writer.hpp"
#include "network/netio_buffered_receiver.hpp"
#include "network/netio_unbuffered_receiver.hpp"
#include "log.hpp"
#include "bus.hpp"
#include "util.hpp"
#include "utility.hpp"


template <class C, class D, class B>
class FromHost {

    public:
        explicit FromHost(std::unique_ptr<C> config) :
            cfg(std::move(config)), m_hostname(Util::get_full_hostname()), m_mon(cfg->stats.monitoring_fifo){ };

        void start();

        void stop();

        void print_monitoring();

    public:
        std::unique_ptr<C> cfg;
        std::list<std::shared_ptr<D>> devices;
        std::map<int, std::shared_ptr<B>> dma_buffers; //use device number as id

    private:
        std::string m_hostname;
        FromHostMonitor m_mon;
        std::map<int, Bus> m_bus_map;
        std::map<int, std::list<FromHostWriter<B>>> m_writers;
};


template <class C, class D, class B>
void FromHost<C, D, B>::start()
{
    log_init(cfg->verbose);
    LOG_INFO("Number of PCIe endpoints to serve %d", cfg->get_number_devices());

    //Open devices, allocate buffers and bus
    auto netiobuf_writer_factory = [&](int dev_no, std::shared_ptr<B> buf, Bus &bus, int port, const std::vector<Elink> & elinks) {
        m_writers[dev_no].emplace_back(buf, elinks, network::utility::create_buffered_receiver(cfg->network.evloop_type, cfg->network.ip, port, bus,
            cfg->network.netio_pages, cfg->network.netio_pagesize), cfg->trickle);
    };

    auto netiounbuf_writer_factory = [&](int dev_no, std::shared_ptr<B> buf, Bus &bus, int port, const std::vector<Elink> & elinks) {
        m_writers[dev_no].emplace_back(buf, elinks, network::utility::create_unbuffered_receiver(cfg->network.evloop_type, cfg->network.ip, port, bus,
            cfg->network.netio_pages, cfg->network.netio_pagesize));
    };

    int tot_elinks{0};
    int thread{0};

    for (auto & dev : devices){

        //Get DMA ID
        int ret = dev->open_device(0); // Open and close device with lock_mask{0} just to recover DMA ID
        int dev_no = dev->get_device_number();
        if(ret != 0 ){
            LOG_ERR("Cannot open device %d.", dev_no);
            continue;
        };
        int dmaid = (cfg->trickle) ? dev->get_trickle_dmaid() : dev->get_fromhost_dmaid();
        dev->close_device(); // Open and close device with lock_mask{0} just to recover DMA ID

        LOG_INFO("Opening PCIe endpoint %u", dev_no);
        unsigned int lock_mask = dev->make_dma_lock_mask({dmaid});
        ret = dev->open_device(lock_mask);
        if(ret != 0 ){
            LOG_ERR("Cannot open device %d.", dev_no);
            continue;
        };

        //Initialize bus and  DMA buffers
        auto dma_buf = dma_buffers.at(dev_no);
        dma_buf->set_dmaid(dmaid);

        auto data_format = dev->get_fromhost_data_format();
        dma_buf->set_encoder_data_format(data_format);

        m_bus_map.emplace(std::piecewise_construct,
                std::forward_as_tuple(dev_no),
                std::forward_as_tuple(cfg->network.bus_dir, cfg->network.bus_groupname, dmaid, cfg->network.verbose_bus)
        );

        std::string dma_name = "fromhost-d" + std::to_string(dev_no) + "-" + std::to_string(dmaid);
        LOG_INFO("Allocating DMA buffer %s of size %u MB (PCIe endpoint %u, DMAID %d)", dma_name.c_str(), cfg->resource.cmem_buffersize/1024/1024, dev_no, dmaid);
        dma_buf->allocate_buffer(cfg->resource.cmem_buffersize, dma_name, cfg->vmem, cfg->resource.free_previous_cmem);

        dma_buf->dma_start_continuous();

        std::vector<Elink> elinks = dma_buf->get_elinks();
        int port = cfg->network.ports.at(dev_no) + 1000*thread;

        LOG_INFO("PCIe endpoint %u, DMA %d: DAQ thread %d publishing on port %d", dev_no, dmaid, thread, port);
        for( const auto & e : elinks ) {
            LOG_INFO("E-link %d:\t0x%02X\t0x%016lx\t(channel %d egroup %d epath %d), type %s",
                tot_elinks, e.lid, e.fid, dev->get_channel(e.lid), dev->get_egroup(e.lid),
                dev->get_epath(e.lid), dev->get_elink_type_str(e.type).c_str());
            ++tot_elinks;
        }

        if(cfg->unbuffered) {
            LOG_INFO("Requesting unbuffered communication to sender");
            netiounbuf_writer_factory(dev_no, dma_buf, m_bus_map.at(dev_no), port, elinks);
        }
        else {
            netiobuf_writer_factory(dev_no, dma_buf, m_bus_map.at(dev_no), port, elinks);
        }
        ++thread;
    }

}


template <class C, class D, class B>
void FromHost<C, D, B>::stop()
{
    for (auto& dma : dma_buffers) {
        dma.second->stop();
    }
}


template <class C, class D, class B>
void FromHost<C, D, B>::print_monitoring()
{
    //devices
    std::string ts = Util::ISO8601TimeUTC();
    for (auto& d : devices) {
        unsigned int device = d->get_device_number();
        FromHostDeviceStats s(
            device
        );
        m_mon.append_device_stats(ts, m_hostname, s);
        auto dma = dma_buffers.at(device);
        unsigned int dmaid = cfg->udmaid_to_dmaid(dma->get_dmaid());
        m_mon.append_dma_stats(ts, m_hostname, device, dma->get_monitoring_data());
        //readers
        for (auto& w : m_writers.at(device)) {
            unsigned int writer_id = w.get_id();
            FromHostWriterStats ws(
                writer_id,
                0, //type TODO: RDMA, TCP?
                0  //netio-next socket list doesn't provide a counter
            );
            m_mon.append_writer_stats(ts, m_hostname, device, dmaid, ws);
            //elinks
            for(const auto & e : w.get_writer_elink_stats()) {
                m_mon.append_elink_stats(ts, m_hostname, device, dmaid, writer_id, e.second);
            }
        }
    }
    //m_mon.write_message(); //keep commented for tokenized writes
}