.. _program_listing_file_fromhost.hpp: Program Listing for File fromhost.hpp ===================================== |exhale_lsh| :ref:`Return to documentation for file ` (``fromhost.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #include #include #include #include "config/config.hpp" #include "fromhost_writer.hpp" #include "network/netio_buffered_receiver.hpp" #include "network/netio_unbuffered_receiver.hpp" #include "log.hpp" #include "bus.hpp" #include "util.hpp" #include "utility.hpp" template class FromHost { public: explicit FromHost(std::unique_ptr config) : cfg(std::move(config)), m_hostname(Util::get_full_hostname()), m_mon(cfg->stats.monitoring_fifo){ }; void start(); void stop(); void print_monitoring(); public: std::unique_ptr cfg; std::list> devices; std::map> dma_buffers; //use device number as id private: std::string m_hostname; FromHostMonitor m_mon; std::map m_bus_map; std::map>> m_writers; }; template void FromHost::start() { log_init(cfg->verbose); LOG_INFO("Number of PCIe endpoints to serve %d", cfg->get_number_devices()); //Open devices, allocate buffers and bus auto netiobuf_writer_factory = [&](int dev_no, std::shared_ptr buf, Bus &bus, int port, const std::vector & elinks) { m_writers[dev_no].emplace_back(buf, elinks, network::utility::create_buffered_receiver(cfg->network.evloop_type, cfg->network.ip, port, bus, cfg->network.netio_pages, cfg->network.netio_pagesize), cfg->trickle); }; auto netiounbuf_writer_factory = [&](int dev_no, std::shared_ptr buf, Bus &bus, int port, const std::vector & elinks) { m_writers[dev_no].emplace_back(buf, elinks, network::utility::create_unbuffered_receiver(cfg->network.evloop_type, cfg->network.ip, port, bus, cfg->network.netio_pages, cfg->network.netio_pagesize)); }; int tot_elinks{0}; int thread{0}; for (auto & dev : devices){ //Get DMA ID int ret = dev->open_device(0); // Open and close device with lock_mask{0} just to recover DMA ID int dev_no = dev->get_device_number(); if(ret != 0 ){ LOG_ERR("Cannot open device %d.", dev_no); continue; }; int dmaid = (cfg->trickle) ? dev->get_trickle_dmaid() : dev->get_fromhost_dmaid(); dev->close_device(); // Open and close device with lock_mask{0} just to recover DMA ID LOG_INFO("Opening PCIe endpoint %u", dev_no); unsigned int lock_mask = dev->make_dma_lock_mask({dmaid}); ret = dev->open_device(lock_mask); if(ret != 0 ){ LOG_ERR("Cannot open device %d.", dev_no); continue; }; //Initialize bus and DMA buffers auto dma_buf = dma_buffers.at(dev_no); dma_buf->set_dmaid(dmaid); auto data_format = dev->get_fromhost_data_format(); dma_buf->set_encoder_data_format(data_format); m_bus_map.emplace(std::piecewise_construct, std::forward_as_tuple(dev_no), std::forward_as_tuple(cfg->network.bus_dir, cfg->network.bus_groupname, dmaid, cfg->network.verbose_bus) ); std::string dma_name = "fromhost-d" + std::to_string(dev_no) + "-" + std::to_string(dmaid); LOG_INFO("Allocating DMA buffer %s of size %u MB (PCIe endpoint %u, DMAID %d)", dma_name.c_str(), cfg->resource.cmem_buffersize/1024/1024, dev_no, dmaid); dma_buf->allocate_buffer(cfg->resource.cmem_buffersize, dma_name, cfg->vmem, cfg->resource.free_previous_cmem); dma_buf->dma_start_continuous(); std::vector elinks = dma_buf->get_elinks(); int port = cfg->network.ports.at(dev_no) + 1000*thread; LOG_INFO("PCIe endpoint %u, DMA %d: DAQ thread %d publishing on port %d", dev_no, dmaid, thread, port); for( const auto & e : elinks ) { LOG_INFO("E-link %d:\t0x%02X\t0x%016lx\t(channel %d egroup %d epath %d), type %s", tot_elinks, e.lid, e.fid, dev->get_channel(e.lid), dev->get_egroup(e.lid), dev->get_epath(e.lid), dev->get_elink_type_str(e.type).c_str()); ++tot_elinks; } if(cfg->unbuffered) { LOG_INFO("Requesting unbuffered communication to sender"); netiounbuf_writer_factory(dev_no, dma_buf, m_bus_map.at(dev_no), port, elinks); } else { netiobuf_writer_factory(dev_no, dma_buf, m_bus_map.at(dev_no), port, elinks); } ++thread; } } template void FromHost::stop() { for (auto& dma : dma_buffers) { dma.second->stop(); } } template void FromHost::print_monitoring() { //devices std::string ts = Util::ISO8601TimeUTC(); for (auto& d : devices) { unsigned int device = d->get_device_number(); FromHostDeviceStats s( device ); m_mon.append_device_stats(ts, m_hostname, s); auto dma = dma_buffers.at(device); unsigned int dmaid = cfg->udmaid_to_dmaid(dma->get_dmaid()); m_mon.append_dma_stats(ts, m_hostname, device, dma->get_monitoring_data()); //readers for (auto& w : m_writers.at(device)) { unsigned int writer_id = w.get_id(); FromHostWriterStats ws( writer_id, 0, //type TODO: RDMA, TCP? 0 //netio-next socket list doesn't provide a counter ); m_mon.append_writer_stats(ts, m_hostname, device, dmaid, ws); //elinks for(const auto & e : w.get_writer_elink_stats()) { m_mon.append_elink_stats(ts, m_hostname, device, dmaid, writer_id, e.second); } } } //m_mon.write_message(); //keep commented for tokenized writes }