Program Listing for File tohost.hpp
↰ Return to documentation for file (tohost.hpp
)
#include <cstdint>
#include <list>
#include <map>
#include <csignal>
#include "device.hpp"
#include "flxcard/FlxCard.h"
#include "config/config.hpp"
#include "tohost_reader.hpp"
#include "network/netio_buffered_publisher.hpp"
#include "network/netio_zerocopy_publisher.hpp"
#include "log.hpp"
#include "bus.hpp"
#include "util.hpp"
#include "utility.hpp"
template <class CFG, class DEV, class BUF>
class ToHost {
public:
explicit ToHost(std::unique_ptr<CFG> config) :
cfg(std::move(config)), m_hostname(Util::get_full_hostname()), m_mon(cfg->stats.monitoring_fifo)
{m_running = true;};
void start();
void print_monitoring();
void stop();
public:
std::unique_ptr<CFG> cfg;
std::list<std::shared_ptr<DEV>> devices;
std::map<int, std::shared_ptr<BUF>> dma_buffers;
private:
bool m_running;
std::string m_hostname;
ToHostMonitor m_mon;
std::map<int, Bus> bus_map;
std::map<int, std::list<ToHostReader<BUF>>> m_readers;
std::vector<std::thread> irq_listeners;
void make_bus(int dmaid, int unique_dmaid);
};
template <class CFG, class DEV, class BUF>
void ToHost<CFG, DEV, BUF>::make_bus(int dmaid, int unique_dmaid)
{
if(bus_map.find(unique_dmaid) == bus_map.end()){
bus_map.emplace(std::piecewise_construct,
std::forward_as_tuple(unique_dmaid),
std::forward_as_tuple(cfg->network.bus_dir, cfg->network.bus_groupname, dmaid, cfg->network.verbose_bus)
);
}
}
template <class CFG, class DEV, class BUF>
void ToHost<CFG, DEV, BUF>::stop()
{
m_running = false;
for (auto& dma : dma_buffers) {
dma.second->stop();
}
}
template <class CFG, class DEV, class BUF>
void ToHost<CFG, DEV, BUF>::start()
{
log_init(cfg->verbose);
LOG_INFO("Number of PCIe endpoints to read %d", cfg->get_number_devices());
LOG_INFO("Polling period %u us", cfg->poll_time);
//Open devices, allocate buffers and bus
auto daq_reader_factory = [&](int udmaid, std::shared_ptr<BUF> buf, Bus &bus, int port, const std::vector<Elink> & elinks) {
m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_buffered_publisher(cfg->network.evloop_type, cfg->network.ip, port, bus,
cfg->network.netio_pages, cfg->network.netio_pagesize,
cfg->network.netio_watermark, cfg->network.netio_timeout,
cfg->max_daq_chunk_size), cfg->poll_time, false, cfg->l1id_check);
};
auto dcs_reader_factory = [&](int udmaid, std::shared_ptr<BUF> buf, Bus &bus, int port, const std::vector<Elink> & elinks) {
m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_buffered_publisher(cfg->network.evloop_type, cfg->network.dcs_ip, port, bus,
cfg->network.dcs_netio_pages, cfg->network.dcs_netio_pagesize,
cfg->network.dcs_netio_watermark, cfg->network.dcs_netio_timeout,
cfg->max_dcs_chunk_size), cfg->poll_time, false, 0);
};
auto ttc_reader_factory = [&](int udmaid, std::shared_ptr<BUF> buf, Bus &bus, int port, const std::vector<Elink> & elinks) {
m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_buffered_publisher(cfg->network.evloop_type, cfg->network.ip, port, bus,
cfg->network.ttc_netio_pages, cfg->network.ttc_netio_pagesize,
cfg->network.ttc_netio_watermark, cfg->network.ttc_netio_timeout,
UINT32_MAX), cfg->poll_time, false, cfg->l1id_check);
};
auto daq_zc_reader_factory = [&](int udmaid, std::shared_ptr<BUF> buf, Bus &bus, const std::string& ip, int port, const std::vector<Elink> & elinks) {
m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_zero_copy_publisher(cfg->network.evloop_type, ip, port, bus,
cfg->network.netio_pages, cfg->network.netio_pagesize, cfg->max_daq_chunk_size,
buf->dma_get_vaddr(), buf->dma_get_size()), cfg->poll_time, true, cfg->l1id_check);
};
auto dcs_zc_reader_factory = [&](int udmaid, std::shared_ptr<BUF> buf, Bus &bus, const std::string& ip, int port, const std::vector<Elink> & elinks) {
m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_zero_copy_publisher(cfg->network.evloop_type, ip, port, bus,
cfg->network.dcs_netio_pages, cfg->network.dcs_netio_pagesize, cfg->max_dcs_chunk_size,
buf->dma_get_vaddr(), buf->dma_get_size()), cfg->poll_time, true, cfg->l1id_check);
};
//for interrupt-driven mode
auto irq_listener = [&](int udmaid){
LOG_INFO("Running interrupt listener for unique DMA ID %d", udmaid);
while(m_running){
dma_buffers.at(udmaid)->dma_wait_for_data_irq();
dma_buffers.at(udmaid)->dma_increase_irq_counter();
for (auto& r : m_readers.at(udmaid)){
r.fire_publisher_async_signal();
}
}
};
int tot_elinks{0};
//Open devices, allocate buffers and bus
for (auto & dev : devices){
int dev_no = dev->get_device_number();
LOG_DBG("Opening PCIe endpoint %u", dev_no);
unsigned int lock_mask = dev->make_dma_lock_mask(cfg->resource.dma_ids);
int ret = dev->open_device(lock_mask);
if(ret != 0 ){
LOG_ERR("Cannot open device %d.", dev_no);
throw std::runtime_error("Fatal error: cannot open requested device");
}
flx_tohost_format fmt = dev->get_tohost_data_format();
switch (fmt) {
case flx_tohost_format::TOHOST_SUBCHUNK_TRAILER:
LOG_INFO("Device %d uses TOHOST_SUBCHUNK_TRAILER data format", dev_no);
break;
case flx_tohost_format::TOHOST_SUBCHUNK_HEADER:
LOG_INFO("Device %d uses TOHOST_SUBCHUNK_HEADER data format", dev_no);
break;
default:
LOG_ERR("Device %d: ToHost data format %d not recognised. Assuming TOHOST_SUBCHUNK_TRAILER", dev_no, fmt);
}
for(int id : cfg->resource.dma_ids){
int unique_id = cfg->get_unique_dmaid(id, dev_no);
auto dma_buf = dma_buffers.at(unique_id);
make_bus(id, unique_id);
std::string dma_name = "tohost-d" + std::to_string(dev_no) + "-" + std::to_string(id);
LOG_INFO("Allocating DMA buffer %s (PCIe endpoint %u, DMAID %d)", dma_name.c_str(), dev_no, id);
dma_buf->allocate_buffer(cfg->resource.cmem_buffersize, dma_name, cfg->vmem, cfg->resource.free_previous_cmem);
dma_buf->dma_start_continuous();
if (cfg->poll_time == 0){
dma_buf->irq_data_enable();
}
std::vector<Elink> ttc2h_elinks = dma_buf->get_elinks_of_type(elink_type_t::TTC);
std::vector<Elink> ic_elinks = dma_buf->get_elinks_of_type(elink_type_t::IC);
std::vector<Elink> dcs_elinks = dma_buf->get_elinks_of_type(elink_type_t::DCS);
std::vector<Elink> ic_dcs_elinks(ic_elinks.begin(), ic_elinks.end());
ic_dcs_elinks.insert(ic_dcs_elinks.end(), dcs_elinks.begin(), dcs_elinks.end());
bool has_daq = not dma_buf->get_elinks_of_type(elink_type_t::DAQ).empty();
bool has_ttc = not ttc2h_elinks.empty();
bool has_dcs = not ic_dcs_elinks.empty();
if ((has_daq and cfg->network.daq_unbuffered)
or (has_dcs and cfg->network.dcs_unbuffered)) {
dma_buf->set_zero_copy_reader();
}
//DAQ
int thread{0};
for (const std::vector<Elink> & elinks :
dma_buf->split_elinks_of_type(elink_type_t::DAQ, cfg->daq_threads)) {
int port = cfg->network.ports.at(unique_id)+1000*thread;
LOG_INFO("PCIe endpoint %u, DMA %d: DAQ thread %d publishing on port %d", dev_no, id, thread, port);
for( const auto & e : elinks ){
LOG_INFO("E-link %d:\t0x%02X\t0x%016lx\t(channel %d egroup %d epath %d), type %s, streams %c",
tot_elinks, e.lid, e.fid, dev->get_channel(e.lid), dev->get_egroup(e.lid), dev->get_epath(e.lid),
dev->get_elink_type_str(e.type).c_str(), e.has_streams ? 'Y' : 'N');
++tot_elinks;
}
if(cfg->network.daq_unbuffered) {
LOG_INFO("Enabling zero-copy poublication of DAQ e-links");
daq_zc_reader_factory(unique_id, dma_buf, bus_map.at(unique_id), cfg->network.ip, port, elinks);
}
else {
daq_reader_factory(unique_id, dma_buf, bus_map.at(unique_id), port, elinks);
}
++thread;
}
//TTC2H
if ( has_ttc ) {
int port = cfg->network.ttc_ports.at(unique_id);
LOG_INFO("Device %u, DMA %d: TTC2H elink published on port %d", dev_no, id, port);
auto & e = ttc2h_elinks[0];
LOG_INFO("E-link %d:\t0x%02X\t0x%016lx\t(channel %d egroup %d epath %d), type %s",
tot_elinks, e.lid, e.fid, dev->get_channel(e.lid), dev->get_egroup(e.lid),
dev->get_epath(e.lid), dev->get_elink_type_str(e.type).c_str());
ttc_reader_factory(unique_id, dma_buf, bus_map.at(unique_id), port, ttc2h_elinks);
++tot_elinks;
}
if ( has_dcs ) {
int port = cfg->network.dcs_ports.at(unique_id);
LOG_INFO("Device %u, DMA %d: DCS elinks published on port %d", dev_no, id, port);
for( auto & e : ic_dcs_elinks ){
LOG_INFO("E-link %d:\t0x%02X\t0x%016lx\t(channel %d egroup %d epath %d), type %s",
tot_elinks, e.lid, e.fid, dev->get_channel(e.lid), dev->get_egroup(e.lid),
dev->get_epath(e.lid), dev->get_elink_type_str(e.type).c_str());
++tot_elinks;
}
if(cfg->network.dcs_unbuffered) {
LOG_INFO("Enabling zero-copy poublication of DCS e-links");
dcs_zc_reader_factory(unique_id, dma_buf, bus_map.at(unique_id), cfg->network.dcs_ip, port, ic_dcs_elinks);
}
else {
dcs_reader_factory(unique_id, dma_buf, bus_map.at(unique_id), port, ic_dcs_elinks);
}
}
//run interrupt-listening threads
if(cfg->poll_time == 0){
irq_listeners.push_back(std::thread(irq_listener, std::ref(unique_id)));
}
} // DMAs
} // Devices
}
template <class CFG, class DEV, class BUF>
void ToHost<CFG, DEV, BUF>::print_monitoring()
{
//devices
std::string ts = Util::ISO8601TimeUTC();
for (auto& d : devices) {
unsigned int device = d->get_device_number();
ToHostDeviceStats s(
device
);
m_mon.append_device_stats(ts, m_hostname, s);
//buffers
for (auto& dma : dma_buffers) {
//Discard DMA buffers of other devices
unsigned int dma_buf_device = cfg->udmaid_to_deviceid(dma.first);
if (dma_buf_device != device) {
continue;
}
unsigned int dmaid = cfg->udmaid_to_dmaid(dma.first);
ToHostDmaStats ds(
dmaid,
dma.second->dma_get_free_MB(),
dma.second->dma_get_irq_counter()
);
m_mon.append_dma_stats(ts, m_hostname, device, ds);
//readers
if (m_readers.count(dma.first) > 0 ){ //for DMAs with no e-links/readers
for (auto& r : m_readers.at(dma.first)) {
unsigned int reader_id = r.get_id();
ToHostReaderStats rs (
reader_id,
static_cast<int>(r.get_elink_type()),
r.get_network_subscriptions(),
r.get_network_resource_counter(), //non-thread safe read op
r.get_network_resource_available_calls() //non-thread safe read op
);
m_mon.append_reader_stats(ts, m_hostname, device, dmaid, rs);
//elinks
auto elink_mon_info = r.get_reader_elink_stats();
for(const auto & e : elink_mon_info) {
m_mon.append_elink_stats(ts, m_hostname, device, dmaid, reader_id, e);
}
}
}
}
}
// m_mon.write_message(); //keep commented for tokenized writes
}