Program Listing for File tohost.hpp
↰ Return to documentation for file (tohost.hpp)
#include <cstdint>
#include <list>
#include <map>
#include <csignal>
#include <format>
#include <memory>
#include "ers/ers.h"
#include "device.hpp"
#include "felixbus/FelixBusElinkInfo.hpp"
#include "flxcard/FlxCard.h"
#include "config/config.hpp"
#include "tohost_reader.hpp"
#include "bus.hpp"
#include "util.hpp"
#include "utility.hpp"
#include "monitoring/fifo_writer.hpp"
#include "monitoring/prometheus_writer.hpp"
#include "monitoring/webis_writer.hpp"
ERS_DECLARE_ISSUE(felix_log, tohost_issue, issue_message, ((const std::string&)issue_message))
template <class CFG, class DEV, class BUF>
class ToHost {
public:
explicit ToHost(std::unique_ptr<CFG> config) :
cfg(std::move(config)), m_hostname(Util::get_full_hostname()), m_mon(std::move(create_monitoring_writers(cfg->stats.local_monitoring_types)))
{
m_running = true;
};
void start();
void print_monitoring();
void stop();
public:
std::unique_ptr<CFG> cfg;
std::list<std::shared_ptr<DEV>> devices;
std::map<int, std::shared_ptr<BUF>> dma_buffers;
private:
bool m_running{};
std::string m_hostname{};
std::vector<ToHostMonitor> m_mon;
std::map<int, Bus> bus_map;
std::map<int, std::list<ToHostReader<BUF>>> m_readers;
std::vector<std::thread> irq_listeners;
void make_bus(int dmaid, int unique_dmaid);
[[nodiscard]] std::vector<ToHostMonitor> create_monitoring_writers(const std::vector<MonitoringType>& writer_types);
[[nodiscard]] std::unique_ptr<Writer> add_monitoring_writer(const MonitoringType& writer_type);
};
template <class CFG, class DEV, class BUF>
void ToHost<CFG, DEV, BUF>::make_bus(int dmaid, int unique_dmaid)
{
if(bus_map.find(unique_dmaid) == bus_map.end()){
bus_map.try_emplace(unique_dmaid, cfg->network.bus_dir, cfg->network.bus_groupname, dmaid, cfg->network.verbose_bus);
}
}
template <class CFG, class DEV, class BUF>
void ToHost<CFG, DEV, BUF>::stop()
{
m_running = false;
for (auto& dma : dma_buffers) {
dma.second->stop();
}
}
template <class CFG, class DEV, class BUF>
void ToHost<CFG, DEV, BUF>::start()
{
ERS_INFO(std::format("Number of PCIe endpoints to read {}", cfg->get_number_devices()));
ERS_INFO(std::format("Polling period {} us", cfg->poll_time));
//Open devices, allocate buffers and bus
auto daq_reader_factory = [&](int udmaid, std::shared_ptr<BUF> buf, uint16_t port, const std::vector<Elink> & elinks, int thread_id, std::string pub_info) {
m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_buffered_publisher(cfg->network.daq_network_mode, cfg->network.ip, port,
cfg->network.netio_pages, cfg->network.netio_pagesize,
cfg->network.netio_timeout, cfg->max_daq_chunk_size, pub_info,
std::chrono::milliseconds{cfg->stats.monitoring_period_ms}), cfg->poll_time, cfg->network.daq_send_blocks, false, cfg->l1id_check, thread_id);
};
auto dcs_reader_factory = [&](int udmaid, std::shared_ptr<BUF> buf, uint16_t port, const std::vector<Elink> & elinks, std::string pub_info) {
m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_buffered_publisher(cfg->network.dcs_network_mode, cfg->network.ip, port,
cfg->network.dcs_netio_pages, cfg->network.dcs_netio_pagesize,
cfg->network.dcs_netio_timeout, cfg->max_dcs_chunk_size, pub_info,
std::chrono::milliseconds{cfg->stats.monitoring_period_ms}), cfg->poll_time, cfg->network.dcs_send_blocks, false, 0);
};
auto ttc_reader_factory = [&](int udmaid, std::shared_ptr<BUF> buf, uint16_t port, const std::vector<Elink> & elinks, std::string pub_info) {
m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_buffered_publisher(cfg->network.daq_network_mode, cfg->network.ip, port,
cfg->network.ttc_netio_pages, cfg->network.ttc_netio_pagesize,
cfg->network.ttc_netio_timeout, UINT32_MAX, pub_info,
std::chrono::milliseconds{cfg->stats.monitoring_period_ms}), cfg->poll_time, cfg->network.ttc_send_blocks, false, cfg->l1id_check);
};
// For zero-copy readers set send blocks to false as these options are incompatible and will fail
auto daq_zc_reader_factory = [&](int udmaid, std::shared_ptr<BUF> buf, const std::string& ip, uint16_t port, const std::vector<Elink> & elinks, int thread_id, std::string pub_info) {
m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_zero_copy_publisher(cfg->network.daq_network_mode, ip, port,
cfg->network.netio_pages, cfg->network.netio_pagesize, cfg->max_daq_chunk_size,
buf->dma_get_vaddr(), buf->dma_get_size(), pub_info, std::chrono::milliseconds{cfg->stats.monitoring_period_ms}), cfg->poll_time, false, true, cfg->l1id_check, thread_id);
};
auto dcs_zc_reader_factory = [&](int udmaid, std::shared_ptr<BUF> buf, const std::string& ip, uint16_t port, const std::vector<Elink> & elinks, std::string pub_info) {
m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_zero_copy_publisher(cfg->network.dcs_network_mode, ip, port,
cfg->network.dcs_netio_pages, cfg->network.dcs_netio_pagesize, cfg->max_dcs_chunk_size,
buf->dma_get_vaddr(), buf->dma_get_size(), pub_info, std::chrono::milliseconds{cfg->stats.monitoring_period_ms}), cfg->poll_time, false, true, cfg->l1id_check);
};
//for interrupt-driven mode
auto irq_listener = [&](int udmaid){
ERS_INFO(std::format("Running interrupt listener for unique DMA ID {}", udmaid));
while(m_running){
dma_buffers.at(udmaid)->dma_wait_for_data_irq();
dma_buffers.at(udmaid)->dma_increase_irq_counter();
for (auto& r : m_readers.at(udmaid)){
r.fire_publisher_async_signal();
}
}
};
int tot_elinks{0};
//Open devices, allocate buffers and bus
for (auto & dev : devices){
int dev_no = dev->get_device_number();
ERS_INFO(std::format("Opening PCIe endpoint {}", dev_no));
unsigned int lock_mask = dev->make_dma_lock_mask(cfg->resource.dma_ids);
int ret = dev->open_device(lock_mask);
if(ret != 0 ){
ers::error(felix_log::tohost_issue(ERS_HERE, std::format("ToHost - cannot open device {} ", dev_no)));
throw std::runtime_error("Fatal error: cannot open requested device");
}
flx_tohost_format fmt = dev->get_tohost_data_format();
switch (fmt) {
case flx_tohost_format::TOHOST_SUBCHUNK_TRAILER:
ERS_INFO(std::format("Device {} uses TOHOST_SUBCHUNK_TRAILER data format", dev_no));
break;
case flx_tohost_format::TOHOST_SUBCHUNK_HEADER:
ERS_INFO(std::format("Device {} uses TOHOST_SUBCHUNK_HEADER data format", dev_no));
break;
default:
ers::error(felix_log::tohost_issue(ERS_HERE, std::format("ToHost - Device {}: data format {} not recognised. Assuming TOHOST_SUBCHUNK_TRAILER", dev_no, static_cast<int>(fmt))));
}
for(int dmaid : cfg->resource.dma_ids){
int unique_id = cfg->get_unique_dmaid(dmaid, dev_no);
auto dma_buf = dma_buffers.at(unique_id);
make_bus(dmaid, unique_id);
std::string dma_name = "tohost-d" + std::to_string(dev_no) + "-" + std::to_string(dmaid);
ERS_INFO(std::format("Allocating DMA buffer {} (PCIe endpoint {}, DMAID {})", dma_name, dev_no, dmaid));
dma_buf->allocate_buffer(cfg->resource.cmem_buffersize, dma_name, cfg->vmem, cfg->resource.free_previous_cmem);
dma_buf->dma_start_continuous();
if (cfg->poll_time == 0){
dma_buf->irq_data_enable();
}
std::vector<Elink> ttc2h_elinks = dma_buf->get_elinks_of_type(elink_type_t::TTC);
std::vector<Elink> ic_elinks = dma_buf->get_elinks_of_type(elink_type_t::IC);
std::vector<Elink> dcs_elinks = dma_buf->get_elinks_of_type(elink_type_t::DCS);
std::vector<Elink> ic_dcs_elinks(ic_elinks.begin(), ic_elinks.end());
ic_dcs_elinks.insert(ic_dcs_elinks.end(), dcs_elinks.begin(), dcs_elinks.end());
bool has_daq = not dma_buf->get_elinks_of_type(elink_type_t::DAQ).empty();
bool has_ttc = not ttc2h_elinks.empty();
bool has_dcs = not ic_dcs_elinks.empty();
if ((has_daq and cfg->network.daq_unbuffered)
or (has_dcs and cfg->network.dcs_unbuffered)) {
dma_buf->set_zero_copy_reader();
}
if (dma_buf->has_zero_copy_reader() and ((has_daq and has_ttc) or (has_daq and has_dcs) or (has_dcs and has_ttc))) {
throw felix_log::tohost_issue(std::format("Zero-copy/unbuffered mode requires exactly one publisher type. "
"DMA buffer {} has multiple types enabled (DAQ: {}, TTC: {}, DCS: {})",
dmaid, has_daq, has_ttc, has_dcs));
}
// Collect elinks to publish on bus
std::vector<felixbus::FelixBusElinkInfo> bus_info{};
//DAQ
int thread{0};
for (const std::vector<Elink> & elinks :
dma_buf->split_elinks_of_type(elink_type_t::DAQ, cfg->daq_threads)) {
for (const auto& e : elinks) {
ERS_INFO(std::format("E-link {}:\t{:#4x}\t{:#18x}\t(channel {} egroup {} epath {}), streams {}",
tot_elinks, e.lid, e.fid, dev->get_channel(e.lid), dev->get_egroup(e.lid),
dev->get_epath(e.lid), (e.has_streams ? 'Y' : 'N')));
++tot_elinks;
}
const auto base_port = cfg->network.ports.at(unique_id);
const auto computed_port = base_port == 0 ? base_port : base_port + 1000u * thread;
if (computed_port > std::numeric_limits<uint16_t>::max()) {
ers::warning(felix_log::tohost_issue(std::format("Port {} number overflow! Starting port is too big", computed_port)));
}
const uint16_t port = static_cast<uint16_t>(computed_port);
// Used to create an event loop with this information. Needed to trace back the publisher thread
// The same is valid for all the pub_info that appear below.
const std::string daq_pub_info = std::format("d{}D{}t{}DAQ", dev_no, dmaid, thread);
if (cfg->network.daq_unbuffered) {
ERS_INFO("Enabling zero-copy publication of DAQ e-links");
daq_zc_reader_factory(unique_id, dma_buf, cfg->network.ip, port, elinks, thread, daq_pub_info);
}
else {
daq_reader_factory(unique_id, dma_buf, port, elinks, thread, daq_pub_info);
}
const uint16_t actual_port = m_readers.at(unique_id).back().get_port();
std::ranges::transform(elinks, std::back_inserter(bus_info), [this, &actual_port](const Elink& e) {
return felixbus::FelixBusElinkInfo{
.fid = e.fid,
.ip = cfg->network.ip,
.port = actual_port,
.unbuffered = cfg->network.daq_unbuffered,
.pubsub = true,
.tcp = cfg->network.daq_network_mode == NetworkMode::tcp,
.stream = e.has_streams,
.netio_pages = cfg->network.netio_pages,
.netio_pagesize = cfg->network.netio_pagesize,
.blocks = cfg->network.daq_send_blocks,
};
});
ERS_INFO(std::format("PCIe endpoint {}, DMA {}: DAQ thread {} publishing on port {}", dev_no, dmaid, thread, actual_port));
++thread;
}
//TTC2H
if (has_ttc) {
const uint16_t port = cfg->network.ttc_ports.at(unique_id);
const std::string ttc_pub_info = std::format("d{}D{}TTC", dev_no, dmaid);
ttc_reader_factory(unique_id, dma_buf, port, ttc2h_elinks, ttc_pub_info);
const uint16_t assigned_port = m_readers.at(unique_id).back().get_port();
std::ranges::transform(ttc2h_elinks, std::back_inserter(bus_info), [this, &assigned_port](const Elink& elink) {
return felixbus::FelixBusElinkInfo{
.fid = elink.fid,
.ip = cfg->network.ip,
.port = assigned_port,
.unbuffered = false,
.pubsub = true,
.tcp = cfg->network.daq_network_mode == NetworkMode::tcp,
.stream = elink.has_streams,
.netio_pages = cfg->network.ttc_netio_pages,
.netio_pagesize = cfg->network.ttc_netio_pagesize,
.blocks = cfg->network.ttc_send_blocks,
};
});
ERS_INFO(std::format("Device {}, DMA {}: TTC2H elink published on port {}", dev_no, dmaid, assigned_port));
auto& e = ttc2h_elinks[0];
ERS_INFO(std::format("E-link {}:\t{:#4x}\t{:#18x}\t(channel {} egroup {} epath {}), type {}",
tot_elinks, e.lid, e.fid, dev->get_channel(e.lid), dev->get_egroup(e.lid),
dev->get_epath(e.lid), dev->get_elink_type_str(e.type)));
++tot_elinks;
}
if ( has_dcs ) {
const uint16_t port = cfg->network.dcs_ports.at(unique_id);
const std::string dcs_pub_info = std::format("d{}D{}DCS", dev_no, dmaid);
if (cfg->network.dcs_unbuffered) {
ERS_INFO("Enabling zero-copy publication of DCS e-links");
dcs_zc_reader_factory(unique_id, dma_buf, cfg->network.dcs_ip, port, ic_dcs_elinks, dcs_pub_info);
}
else {
dcs_reader_factory(unique_id, dma_buf, port, ic_dcs_elinks, dcs_pub_info);
}
const uint16_t assigned_port = m_readers.at(unique_id).back().get_port();
std::ranges::transform(ic_dcs_elinks, std::back_inserter(bus_info), [this, &assigned_port](const Elink& elink) {
return felixbus::FelixBusElinkInfo{
.fid = elink.fid,
.ip = cfg->network.dcs_ip,
.port = assigned_port,
.unbuffered = cfg->network.dcs_unbuffered,
.pubsub = true,
.tcp = cfg->network.dcs_network_mode == NetworkMode::tcp,
.stream = elink.has_streams,
.netio_pages = cfg->network.dcs_netio_pages,
.netio_pagesize = cfg->network.dcs_netio_pagesize,
.blocks = cfg->network.dcs_send_blocks,
};
});
ERS_INFO(std::format("Device {}, DMA {}: DCS elinks published on port {}", dev_no, dmaid, assigned_port));
for (auto& e : ic_dcs_elinks) {
ERS_INFO(std::format("E-link {}:\t{:#4x}\t{:#18x}\t(channel {} egroup {} epath {}), type {}",
tot_elinks, e.lid, e.fid, dev->get_channel(e.lid), dev->get_egroup(e.lid),
dev->get_epath(e.lid), dev->get_elink_type_str(e.type)));
++tot_elinks;
}
}
bus_map.at(unique_id).publish(bus_info);
//run interrupt-listening threads
if(cfg->poll_time == 0){
irq_listeners.push_back(std::thread(irq_listener, std::ref(unique_id)));
}
} // DMAs
} // Devices
}
template <class CFG, class DEV, class BUF>
void ToHost<CFG, DEV, BUF>::print_monitoring()
{
for (auto & wtype : m_mon) {
wtype.create_new_message(m_hostname);
//devices
for (const auto& dev : devices) {
const unsigned int num_dev = dev->get_device_number();
const ToHostDeviceStats dev_stats(num_dev);
wtype.append_device_stats(dev_stats);
//buffers
for (const auto& dma : dma_buffers) {
//Discard DMA buffers of other devices
const unsigned int dma_buf_device = cfg->udmaid_to_deviceid(dma.first);
if (dma_buf_device != num_dev) {
continue;
}
const unsigned int dmaid = cfg->udmaid_to_dmaid(dma.first);
const ToHostDmaStats dma_stats(
dmaid,
dma.second->dma_get_free_MB(),
dma.second->dma_get_irq_counter()
);
wtype.append_dma_stats(dma_stats);
//readers
if (m_readers.count(dma.first) > 0 ){ //for DMAs with no e-links/readers
for (auto& reader : m_readers.at(dma.first)) {
const int thread_id = reader.get_thread_id();
const ToHostReaderStats reader_stats (
Device::get_elink_type_str(reader.get_elink_type()),
thread_id,
reader.get_network_subscriptions(),
reader.get_network_resource_counter(), //non-thread safe read op
reader.get_network_resource_available_calls() //non-thread safe read op
);
wtype.append_reader_stats(reader_stats);
//elinks
const auto elink_mon_info = reader.get_reader_elink_stats();
for(const auto & elink_stats : elink_mon_info) {
wtype.append_elink_stats(elink_stats);
}
}
}
}
}
wtype.write_message();
}
}
template <class CFG, class DEV, class BUF>
std::vector<ToHostMonitor> ToHost<CFG, DEV, BUF>::create_monitoring_writers(const std::vector<MonitoringType>& writer_types) {
std::vector<ToHostMonitor> writers;
for (const auto& writer_type : writer_types) {
writers.emplace_back(add_monitoring_writer(writer_type));
}
return writers;
}
template <class CFG, class DEV, class BUF>
std::unique_ptr<Writer> ToHost<CFG, DEV, BUF>::add_monitoring_writer(const MonitoringType& writer_type) {
switch (writer_type) {
case MonitoringType::FIFO:
return std::make_unique<FIFOWriter>(cfg->stats.monitoring_fifo);
case MonitoringType::Prometheus: {
return std::make_unique<PrometheusWriter>(cfg->stats.prometheus_port);
}
case MonitoringType::WebIS:
return std::make_unique<WebISWriter>();
default:
throw std::invalid_argument("Unknown writer type");
}
}