Program Listing for File fromhost.hpp
↰ Return to documentation for file (fromhost.hpp
)
#include <list>
#include <map>
#include <csignal>
#include "config/config.hpp"
#include "fromhost_writer.hpp"
#include "network/netio_buffered_receiver.hpp"
#include "network/netio_unbuffered_receiver.hpp"
#include "log.hpp"
#include "bus.hpp"
#include "util.hpp"
#include "utility.hpp"
template <class C, class D, class B>
class FromHost {
public:
explicit FromHost(std::unique_ptr<C> config) :
cfg(std::move(config)), m_hostname(Util::get_full_hostname()), m_mon(cfg->stats.monitoring_fifo){ };
void start();
void stop();
void print_monitoring();
public:
std::unique_ptr<C> cfg;
std::list<std::shared_ptr<D>> devices;
std::map<int, std::shared_ptr<B>> dma_buffers; //use device number as id
private:
std::string m_hostname;
FromHostMonitor m_mon;
std::map<int, Bus> m_bus_map;
std::map<int, std::list<FromHostWriter<B>>> m_writers;
};
template <class C, class D, class B>
void FromHost<C, D, B>::start()
{
log_init(cfg->verbose);
LOG_INFO("Number of PCIe endpoints to serve %d", cfg->get_number_devices());
//Open devices, allocate buffers and bus
auto netiobuf_writer_factory = [&](int dev_no, std::shared_ptr<B> buf, Bus &bus, int port, const std::vector<Elink> & elinks) {
m_writers[dev_no].emplace_back(buf, elinks, network::utility::create_buffered_receiver(cfg->network.evloop_type, cfg->network.ip, port, bus,
cfg->network.netio_pages, cfg->network.netio_pagesize), cfg->trickle);
};
auto netiounbuf_writer_factory = [&](int dev_no, std::shared_ptr<B> buf, Bus &bus, int port, const std::vector<Elink> & elinks) {
m_writers[dev_no].emplace_back(buf, elinks, network::utility::create_unbuffered_receiver(cfg->network.evloop_type, cfg->network.ip, port, bus,
cfg->network.netio_pages, cfg->network.netio_pagesize));
};
int tot_elinks{0};
int thread{0};
for (auto & dev : devices){
//Get DMA ID
int ret = dev->open_device(0); // Open and close device with lock_mask{0} just to recover DMA ID
int dev_no = dev->get_device_number();
if(ret != 0 ){
LOG_ERR("Cannot open device %d.", dev_no);
continue;
};
int dmaid = (cfg->trickle) ? dev->get_trickle_dmaid() : dev->get_fromhost_dmaid();
dev->close_device(); // Open and close device with lock_mask{0} just to recover DMA ID
LOG_INFO("Opening PCIe endpoint %u", dev_no);
unsigned int lock_mask = dev->make_dma_lock_mask({dmaid});
ret = dev->open_device(lock_mask);
if(ret != 0 ){
LOG_ERR("Cannot open device %d.", dev_no);
continue;
};
//Initialize bus and DMA buffers
auto dma_buf = dma_buffers.at(dev_no);
dma_buf->set_dmaid(dmaid);
auto data_format = dev->get_fromhost_data_format();
dma_buf->set_encoder_data_format(data_format);
m_bus_map.emplace(std::piecewise_construct,
std::forward_as_tuple(dev_no),
std::forward_as_tuple(cfg->network.bus_dir, cfg->network.bus_groupname, dmaid, cfg->network.verbose_bus)
);
std::string dma_name = "fromhost-d" + std::to_string(dev_no) + "-" + std::to_string(dmaid);
LOG_INFO("Allocating DMA buffer %s of size %u MB (PCIe endpoint %u, DMAID %d)", dma_name.c_str(), cfg->resource.cmem_buffersize/1024/1024, dev_no, dmaid);
dma_buf->allocate_buffer(cfg->resource.cmem_buffersize, dma_name, cfg->vmem, cfg->resource.free_previous_cmem);
dma_buf->dma_start_continuous();
std::vector<Elink> elinks = dma_buf->get_elinks();
int port = cfg->network.ports.at(dev_no) + 1000*thread;
LOG_INFO("PCIe endpoint %u, DMA %d: DAQ thread %d publishing on port %d", dev_no, dmaid, thread, port);
for( const auto & e : elinks ) {
LOG_INFO("E-link %d:\t0x%02X\t0x%016lx\t(channel %d egroup %d epath %d), type %s",
tot_elinks, e.lid, e.fid, dev->get_channel(e.lid), dev->get_egroup(e.lid),
dev->get_epath(e.lid), dev->get_elink_type_str(e.type).c_str());
++tot_elinks;
}
if(cfg->unbuffered) {
LOG_INFO("Requesting unbuffered communication to sender");
netiounbuf_writer_factory(dev_no, dma_buf, m_bus_map.at(dev_no), port, elinks);
}
else {
netiobuf_writer_factory(dev_no, dma_buf, m_bus_map.at(dev_no), port, elinks);
}
++thread;
}
}
template <class C, class D, class B>
void FromHost<C, D, B>::stop()
{
for (auto& dma : dma_buffers) {
dma.second->stop();
}
}
template <class C, class D, class B>
void FromHost<C, D, B>::print_monitoring()
{
//devices
std::string ts = Util::ISO8601TimeUTC();
for (auto& d : devices) {
unsigned int device = d->get_device_number();
FromHostDeviceStats s(
device
);
m_mon.append_device_stats(ts, m_hostname, s);
auto dma = dma_buffers.at(device);
unsigned int dmaid = cfg->udmaid_to_dmaid(dma->get_dmaid());
m_mon.append_dma_stats(ts, m_hostname, device, dma->get_monitoring_data());
//readers
for (auto& w : m_writers.at(device)) {
unsigned int writer_id = w.get_id();
FromHostWriterStats ws(
writer_id,
0, //type TODO: RDMA, TCP?
0 //netio-next socket list doesn't provide a counter
);
m_mon.append_writer_stats(ts, m_hostname, device, dmaid, ws);
//elinks
for(const auto & e : w.get_writer_elink_stats()) {
m_mon.append_elink_stats(ts, m_hostname, device, dmaid, writer_id, e.second);
}
}
}
//m_mon.write_message(); //keep commented for tokenized writes
}