Program Listing for File fromhost.hpp
↰ Return to documentation for file (fromhost.hpp)
#include <cstdint>
#include <limits>
#include <list>
#include <map>
#include <csignal>
#include <format>
#include <memory>
#include "config/config.hpp"
#include "felixbus/FelixBusElinkInfo.hpp"
#include "fromhost_writer.hpp"
#include "ers/ers.h"
#include "bus.hpp"
#include "util.hpp"
#include "utility.hpp"
#include "monitoring/fifo_writer.hpp"
#include "monitoring/prometheus_writer.hpp"
#include "monitoring/webis_writer.hpp"
ERS_DECLARE_ISSUE(felix_log, fromhost_issue, issue_message, ((std::string)issue_message))
template <class CFG, class DEV, class BUF>
class FromHost {
public:
explicit FromHost(std::unique_ptr<CFG> config) : cfg{std::move(config)}, m_hostname{Util::get_full_hostname()}, m_mon{std::move(create_monitoring_writers(cfg->stats.local_monitoring_types))}{};
void start();
void stop();
void print_monitoring();
public:
std::unique_ptr<CFG> cfg;
std::list<std::shared_ptr<DEV>> devices;
std::map<int, std::shared_ptr<BUF>> dma_buffers; //use device number as id
private:
std::string m_hostname{};
std::vector<FromHostMonitor> m_mon;
std::map<int, Bus> m_bus_map;
std::map<int, std::list<FromHostWriter<BUF>>> m_writers;
[[nodiscard]] std::vector<FromHostMonitor> create_monitoring_writers(const std::vector<MonitoringType>& writer_types);
[[nodiscard]] std::unique_ptr<Writer> add_monitoring_writer(const MonitoringType& writer_type);
};
template <class CFG, class DEV, class BUF>
void FromHost<CFG, DEV, BUF>::start()
{
ERS_INFO(std::format("Number of PCIe endpoints to serve {}", cfg->get_number_devices()));
//Open devices, allocate buffers and bus
auto netiobuf_writer_factory = [&](int dev_no, std::shared_ptr<BUF> buf, uint16_t port, const std::vector<Elink> &elinks, std::string rcv_info) {
m_writers[dev_no].emplace_back(buf, elinks, network::utility::create_buffered_receiver(cfg->network.dcs_network_mode, cfg->network.ip, port,
cfg->network.netio_pages, cfg->network.netio_pagesize, rcv_info));
};
auto netiounbuf_writer_factory = [&](int dev_no, std::shared_ptr<BUF> buf, uint16_t port, const std::vector<Elink> &elinks, std::string rcv_info) {
m_writers[dev_no].emplace_back(buf, elinks, network::utility::create_unbuffered_receiver(cfg->network.dcs_network_mode, cfg->network.ip, port,
cfg->network.netio_pages, cfg->network.netio_pagesize, rcv_info));
};
int tot_elinks{0};
for (auto & dev : devices){
//Get DMA ID
int dev_no = dev->get_device_number();
constexpr static auto no_lock_mask = 0u;
if(dev->open_device(no_lock_mask) != 0 ){
ers::error(felix_log::fromhost_issue(ERS_HERE, std::format("FromHost - cannot open device {} ", dev_no)));
continue;
};
const int dmaid = dev->get_fromhost_dmaid();
dev->close_device(); // Open and close device with lock_mask{0} just to recover DMA ID
ERS_INFO(std::format("Opening PCIe endpoint {}", dev_no));
const unsigned int lock_mask = dev->make_dma_lock_mask({dmaid});
if(dev->open_device(lock_mask) != 0 ){
ers::error(felix_log::fromhost_issue(ERS_HERE, std::format("FromHost - cannot open device {} ", dev_no)));
continue;
};
//Initialize bus and DMA buffers
auto dma_buf = dma_buffers.at(dev_no);
dma_buf->set_dmaid(dmaid);
auto data_format = dev->get_fromhost_data_format();
dma_buf->set_encoder_data_format(data_format);
m_bus_map.try_emplace(dev_no, cfg->network.bus_dir, cfg->network.bus_groupname, dmaid, cfg->network.verbose_bus);
std::string dma_name = "fromhost-d" + std::to_string(dev_no) + "-" + std::to_string(dmaid);
ERS_INFO(std::format("Allocating DMA buffer {} of size {} MB (PCIe endpoint {}, DMAID {})", dma_name, cfg->resource.cmem_buffersize/1024/1024, dev_no, dmaid));
dma_buf->allocate_buffer(cfg->resource.cmem_buffersize, dma_name, cfg->vmem, cfg->resource.free_previous_cmem);
dma_buf->dma_start_continuous();
std::vector<Elink> elinks = dma_buf->get_elinks();
for( const auto & e : elinks ) {
ERS_INFO(std::format("E-link {}: {:#4x} {:#18x} (channel {} egroup {} epath {})",
tot_elinks, e.lid, e.fid,
dev->get_channel(e.lid),
dev->get_egroup(e.lid),
dev->get_epath(e.lid)));
++tot_elinks;
}
const auto base_port = cfg->network.ports.at(dev_no);
const auto computed_port = base_port == 0 ? base_port : base_port + 1000u * dev_no;
if (computed_port > std::numeric_limits<uint16_t>::max()) {
ers::warning(felix_log::fromhost_issue(std::format("Port {} number overflow! Starting port is too big", computed_port)));
}
const uint16_t port = static_cast<uint16_t>(computed_port);
// Used to create an event loop with this information. Needed to trace back the receiver thread
const std::string receiver_info = std::format("d{}ToFlx", dev_no);
if(cfg->unbuffered) {
ERS_INFO("Requesting unbuffered communication to sender");
netiounbuf_writer_factory(dev_no, dma_buf, port, elinks, receiver_info);
}
else {
netiobuf_writer_factory(dev_no, dma_buf, port, elinks, receiver_info);
}
const uint16_t actual_port = m_writers.at(dev_no).back().get_port();
std::vector<felixbus::FelixBusElinkInfo> bus_info{};
std::ranges::transform(elinks, std::back_inserter(bus_info), [this, actual_port](const Elink& e) {
return felixbus::FelixBusElinkInfo{
.fid = e.fid,
.ip = cfg->network.ip,
.port = actual_port,
.unbuffered = cfg->unbuffered,
.pubsub = false,
.tcp = cfg->network.dcs_network_mode == NetworkMode::tcp,
.stream = e.has_streams,
.netio_pages = cfg->network.netio_pages,
.netio_pagesize = cfg->network.netio_pagesize,
};
});
m_bus_map.at(dev_no).publish(bus_info);
ERS_INFO(std::format("PCIe endpoint {}, DMA {} receiving data on port {}", dev_no, dmaid, actual_port));
}
}
template <class CFG, class DEV, class BUF>
void FromHost<CFG, DEV, BUF>::stop()
{
for (auto& dma : dma_buffers) {
dma.second->stop();
}
}
template <class CFG, class DEV, class BUF>
void FromHost<CFG, DEV, BUF>::print_monitoring()
{
for (auto & wtype : m_mon) {
wtype.create_new_message(m_hostname);
for (const auto& dev : devices) {
const unsigned int device = dev->get_device_number();
const FromHostDeviceStats dev_stats(device);
wtype.append_device_stats( dev_stats);
const auto dma = dma_buffers.at(device);
wtype.append_dma_stats(dma->get_monitoring_data());
//readers
for (auto& writer : m_writers.at(device)) {
const int thread_id = writer.get_thread_id();
const FromHostWriterStats w_stats(
thread_id,
0, //type TODO: RDMA, TCP?
0 //netio-next socket list doesn't provide a counter
);
wtype.append_writer_stats(w_stats);
//elinks
for(const auto & elink_stats : writer.get_writer_elink_stats()) {
wtype.append_elink_stats(elink_stats.second);
}
}
}
wtype.write_message();
}
}
template <class CFG, class DEV, class BUF>
std::vector<FromHostMonitor> FromHost<CFG, DEV, BUF>::create_monitoring_writers(const std::vector<MonitoringType>& writer_types) {
std::vector<FromHostMonitor> writers;
for (const auto& writer_type : writer_types) {
writers.emplace_back(add_monitoring_writer(writer_type));
}
return writers;
}
template <class CFG, class DEV, class BUF>
std::unique_ptr<Writer> FromHost<CFG, DEV, BUF>::add_monitoring_writer(const MonitoringType& writer_type) {
switch (writer_type) {
case MonitoringType::FIFO:
return std::make_unique<FIFOWriter>(cfg->stats.monitoring_fifo);
case MonitoringType::Prometheus: {
return std::make_unique<PrometheusWriter>(cfg->stats.prometheus_port);
}
case MonitoringType::WebIS:
return std::make_unique<WebISWriter>();
default:
throw std::invalid_argument("Unknown writer type");
}
}