LCOV - code coverage report
Current view: top level - src - tohost.hpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 115 143 80.4 %
Date: 2025-09-09 12:09:29 Functions: 8 11 72.7 %

          Line data    Source code
       1             : #include <cstdint>
       2             : #include <list>
       3             : #include <map>
       4             : #include <csignal>
       5             : #include "device.hpp"
       6             : #include "flxcard/FlxCard.h"
       7             : #include "config/config.hpp"
       8             : #include "tohost_reader.hpp"
       9             : #include "network/netio_buffered_publisher.hpp"
      10             : #include "network/netio_zerocopy_publisher.hpp"
      11             : #include "log.hpp"
      12             : #include "bus.hpp"
      13             : #include "util.hpp"
      14             : #include "utility.hpp"
      15             : 
      16             : 
      17             : /**
      18             :  * Core of felix-tohost and felix-file2host.
      19             :  */
      20             : template <class CFG, class DEV, class BUF>
      21             : class ToHost {
      22             :     public:
      23             :         /**
      24             :          * @brief ToHost constructor: scalable readout of multiple devices and DMA buffers.
      25             :          * @details This constructor initialises the object; it does not start reading operations.
      26             :          * @param config configuration parameters.
      27             :          */
      28          22 :         explicit ToHost(std::unique_ptr<CFG> config) :
      29          22 :             cfg(std::move(config)), m_hostname(Util::get_full_hostname()), m_mon(cfg->stats.monitoring_fifo)
      30          22 :             {m_running = true;};
      31             : 
      32             :         /**
      33             :          * @brief Open devices, allocate buffers, create publishers and start
      34             :          * reading data.
      35             :          */
      36             :         void start();
      37             : 
      38             :         /**
      39             :          * @brief retrieve and write monitoring information. To be executed by the
      40             :          * thread that creates an instance of this class.
      41             :          */
      42             :         void print_monitoring();
      43             : 
      44             :         /**
      45             :          * @brief Stop reading all associated DMA buffers.
      46             :          */
      47             :         void stop();
      48             : 
      49             :     public:
      50             :         std::unique_ptr<CFG> cfg;
      51             :         std::list<std::shared_ptr<DEV>> devices;
      52             :         std::map<int, std::shared_ptr<BUF>> dma_buffers;
      53             : 
      54             :     private:
      55             :         bool m_running;
      56             :         std::string m_hostname;
      57             :         ToHostMonitor m_mon;
      58             :         std::map<int, Bus> bus_map;
      59             :         std::map<int, std::list<ToHostReader<BUF>>> m_readers;
      60             :         std::vector<std::thread> irq_listeners;
      61             :         void make_bus(int dmaid, int unique_dmaid);
      62             : };
      63             : 
      64             : 
      65             : template <class CFG, class DEV, class BUF>
      66          22 : void ToHost<CFG, DEV, BUF>::make_bus(int dmaid, int unique_dmaid)
      67             : {
      68          22 :     if(bus_map.find(unique_dmaid) == bus_map.end()){
      69          22 :         bus_map.emplace(std::piecewise_construct,
      70             :             std::forward_as_tuple(unique_dmaid),
      71          22 :             std::forward_as_tuple(cfg->network.bus_dir, cfg->network.bus_groupname, dmaid, cfg->network.verbose_bus)
      72             :             );
      73             :         }
      74          22 : }
      75             : 
      76             : template <class CFG, class DEV, class BUF>
      77          22 : void ToHost<CFG, DEV, BUF>::stop()
      78             : {
      79          22 :     m_running = false;
      80          44 :     for (auto& dma : dma_buffers) {
      81          22 :         dma.second->stop();
      82             :     }
      83          22 : }
      84             : 
      85             : template <class CFG, class DEV, class BUF>
      86          22 : void ToHost<CFG, DEV, BUF>::start()
      87             : {
      88          22 :     log_init(cfg->verbose);
      89          22 :     LOG_INFO("Number of PCIe endpoints to read %d", cfg->get_number_devices());
      90          22 :     LOG_INFO("Polling period %u us", cfg->poll_time);
      91             :     //Open devices, allocate buffers and bus
      92          42 :     auto daq_reader_factory = [&](int udmaid, std::shared_ptr<BUF> buf, Bus &bus, int port, const std::vector<Elink> & elinks) {
      93          20 :         m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_buffered_publisher(cfg->network.evloop_type, cfg->network.ip, port, bus,
      94          10 :             cfg->network.netio_pages, cfg->network.netio_pagesize,
      95          10 :             cfg->network.netio_watermark, cfg->network.netio_timeout,
      96          20 :             cfg->max_daq_chunk_size), cfg->poll_time, false, cfg->l1id_check);
      97             :     };
      98             : 
      99          22 :     auto dcs_reader_factory = [&](int udmaid, std::shared_ptr<BUF> buf, Bus &bus, int port, const std::vector<Elink> & elinks) {
     100           0 :         m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_buffered_publisher(cfg->network.evloop_type, cfg->network.dcs_ip, port, bus,
     101           0 :             cfg->network.dcs_netio_pages, cfg->network.dcs_netio_pagesize,
     102           0 :             cfg->network.dcs_netio_watermark, cfg->network.dcs_netio_timeout,
     103           0 :             cfg->max_dcs_chunk_size), cfg->poll_time, false, 0);
     104             :     };
     105             : 
     106          30 :     auto ttc_reader_factory = [&](int udmaid, std::shared_ptr<BUF> buf, Bus &bus, int port, const std::vector<Elink> & elinks) {
     107           8 :         m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_buffered_publisher(cfg->network.evloop_type, cfg->network.ip, port, bus,
     108           4 :             cfg->network.ttc_netio_pages, cfg->network.ttc_netio_pagesize,
     109           4 :             cfg->network.ttc_netio_watermark, cfg->network.ttc_netio_timeout,
     110           8 :             UINT32_MAX), cfg->poll_time, false, cfg->l1id_check);
     111             :     };
     112             : 
     113          42 :     auto daq_zc_reader_factory = [&](int udmaid, std::shared_ptr<BUF> buf, Bus &bus, const std::string& ip, int port, const std::vector<Elink> & elinks) {
     114          20 :         m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_zero_copy_publisher(cfg->network.evloop_type, ip, port, bus,
     115          10 :             cfg->network.netio_pages, cfg->network.netio_pagesize, cfg->max_daq_chunk_size,
     116          20 :             buf->dma_get_vaddr(), buf->dma_get_size()), cfg->poll_time, true, cfg->l1id_check);
     117             :     };
     118             : 
     119          22 :     auto dcs_zc_reader_factory = [&](int udmaid, std::shared_ptr<BUF> buf, Bus &bus, const std::string& ip, int port, const std::vector<Elink> & elinks) {
     120           0 :         m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_zero_copy_publisher(cfg->network.evloop_type, ip, port, bus,
     121           0 :             cfg->network.dcs_netio_pages, cfg->network.dcs_netio_pagesize, cfg->max_dcs_chunk_size,
     122           0 :             buf->dma_get_vaddr(), buf->dma_get_size()), cfg->poll_time, true, cfg->l1id_check);
     123             :     };
     124             : 
     125             :     //for interrupt-driven mode
     126          22 :     auto irq_listener = [&](int udmaid){
     127           0 :         LOG_INFO("Running interrupt listener for unique DMA ID %d", udmaid);
     128           0 :         while(m_running){
     129           0 :             dma_buffers.at(udmaid)->dma_wait_for_data_irq();
     130           0 :             dma_buffers.at(udmaid)->dma_increase_irq_counter();
     131           0 :             for (auto& r :  m_readers.at(udmaid)){
     132           0 :                 r.fire_publisher_async_signal();
     133             :             }
     134             :         }
     135             :     };
     136             : 
     137          22 :     int tot_elinks{0};
     138             :     //Open devices, allocate buffers and bus
     139          44 :     for (auto & dev : devices){
     140             : 
     141          22 :         int dev_no = dev->get_device_number();
     142          22 :         LOG_DBG("Opening PCIe endpoint %u", dev_no);
     143          22 :         unsigned int lock_mask = dev->make_dma_lock_mask(cfg->resource.dma_ids);
     144          22 :         int ret = dev->open_device(lock_mask);
     145          22 :         if(ret != 0 ){
     146           0 :             LOG_ERR("Cannot open device %d.", dev_no);
     147           0 :             throw std::runtime_error("Fatal error: cannot open requested device");
     148             :         }
     149             : 
     150          22 :         flx_tohost_format fmt = dev->get_tohost_data_format();
     151          22 :         switch (fmt) {
     152          21 :             case flx_tohost_format::TOHOST_SUBCHUNK_TRAILER:
     153          21 :                 LOG_INFO("Device %d uses TOHOST_SUBCHUNK_TRAILER data format", dev_no);
     154             :                 break;
     155           1 :         case flx_tohost_format::TOHOST_SUBCHUNK_HEADER:
     156           1 :                 LOG_INFO("Device %d uses TOHOST_SUBCHUNK_HEADER data format", dev_no);
     157             :                 break;
     158           0 :         default:
     159           0 :             LOG_ERR("Device %d: ToHost data format %d not recognised. Assuming TOHOST_SUBCHUNK_TRAILER", dev_no, fmt);
     160             :         }
     161             : 
     162             : 
     163          66 :         for(int id : cfg->resource.dma_ids){
     164             : 
     165          22 :             int unique_id = cfg->get_unique_dmaid(id, dev_no);
     166          22 :             auto dma_buf = dma_buffers.at(unique_id);
     167          22 :             make_bus(id, unique_id);
     168             : 
     169          66 :             std::string dma_name = "tohost-d" + std::to_string(dev_no) + "-" + std::to_string(id);
     170          22 :             LOG_INFO("Allocating DMA buffer %s (PCIe endpoint %u, DMAID %d)", dma_name.c_str(), dev_no, id);
     171          22 :             dma_buf->allocate_buffer(cfg->resource.cmem_buffersize, dma_name, cfg->vmem, cfg->resource.free_previous_cmem);
     172          22 :             dma_buf->dma_start_continuous();
     173             : 
     174          22 :             if (cfg->poll_time == 0){
     175          22 :                 dma_buf->irq_data_enable();
     176             :             }
     177             : 
     178          22 :             std::vector<Elink> ttc2h_elinks = dma_buf->get_elinks_of_type(elink_type_t::TTC);
     179          22 :             std::vector<Elink> ic_elinks = dma_buf->get_elinks_of_type(elink_type_t::IC);
     180          22 :             std::vector<Elink> dcs_elinks = dma_buf->get_elinks_of_type(elink_type_t::DCS);
     181          22 :             std::vector<Elink> ic_dcs_elinks(ic_elinks.begin(), ic_elinks.end());
     182          22 :             ic_dcs_elinks.insert(ic_dcs_elinks.end(), dcs_elinks.begin(), dcs_elinks.end());
     183             : 
     184          22 :             bool has_daq = not dma_buf->get_elinks_of_type(elink_type_t::DAQ).empty();
     185          22 :             bool has_ttc = not ttc2h_elinks.empty();
     186          22 :             bool has_dcs = not ic_dcs_elinks.empty();
     187          20 :             if ((has_daq and cfg->network.daq_unbuffered)
     188          32 :                     or (has_dcs and cfg->network.dcs_unbuffered)) {
     189          10 :                 dma_buf->set_zero_copy_reader();
     190             :             }
     191             : 
     192             :             //DAQ
     193          22 :             int thread{0};
     194          42 :             for (const std::vector<Elink> & elinks :
     195          22 :                 dma_buf->split_elinks_of_type(elink_type_t::DAQ, cfg->daq_threads)) {
     196          20 :                 int port = cfg->network.ports.at(unique_id)+1000*thread;
     197          20 :                 LOG_INFO("PCIe endpoint %u, DMA %d: DAQ thread %d publishing on port %d", dev_no, id, thread, port);
     198          52 :                 for( const auto & e : elinks ){
     199          60 :                     LOG_INFO("E-link %d:\t0x%02X\t0x%016lx\t(channel %d egroup %d epath %d), type %s, streams %c",
     200             :                         tot_elinks, e.lid, e.fid, dev->get_channel(e.lid), dev->get_egroup(e.lid), dev->get_epath(e.lid),
     201             :                         dev->get_elink_type_str(e.type).c_str(), e.has_streams ? 'Y' : 'N');
     202          32 :                     ++tot_elinks;
     203             :                 }
     204          20 :                 if(cfg->network.daq_unbuffered) {
     205          10 :                     LOG_INFO("Enabling zero-copy poublication of DAQ e-links");
     206          30 :                     daq_zc_reader_factory(unique_id, dma_buf, bus_map.at(unique_id), cfg->network.ip, port, elinks);
     207             :                 }
     208             :                 else {
     209          30 :                     daq_reader_factory(unique_id, dma_buf, bus_map.at(unique_id), port, elinks);
     210             :                 }
     211          20 :                 ++thread;
     212             :             }
     213             : 
     214             :             //TTC2H
     215          22 :             if ( has_ttc ) {
     216           4 :                 int port = cfg->network.ttc_ports.at(unique_id);
     217           4 :                 LOG_INFO("Device %u, DMA %d: TTC2H elink published on port %d", dev_no, id, port);
     218           4 :                 auto & e = ttc2h_elinks[0];
     219           4 :                 LOG_INFO("E-link %d:\t0x%02X\t0x%016lx\t(channel %d egroup %d epath %d), type %s",
     220             :                     tot_elinks, e.lid, e.fid, dev->get_channel(e.lid), dev->get_egroup(e.lid),
     221             :                     dev->get_epath(e.lid), dev->get_elink_type_str(e.type).c_str());
     222           8 :                 ttc_reader_factory(unique_id, dma_buf, bus_map.at(unique_id), port, ttc2h_elinks);
     223           4 :                 ++tot_elinks;
     224             :             }
     225             : 
     226          22 :             if ( has_dcs ) {
     227           0 :                 int port = cfg->network.dcs_ports.at(unique_id);
     228           0 :                 LOG_INFO("Device %u, DMA %d: DCS elinks published on port %d", dev_no, id, port);
     229           0 :                 for( auto & e : ic_dcs_elinks ){
     230           0 :                     LOG_INFO("E-link %d:\t0x%02X\t0x%016lx\t(channel %d egroup %d epath %d), type %s",
     231             :                         tot_elinks, e.lid, e.fid, dev->get_channel(e.lid), dev->get_egroup(e.lid),
     232             :                         dev->get_epath(e.lid), dev->get_elink_type_str(e.type).c_str());
     233           0 :                     ++tot_elinks;
     234             :                 }
     235             : 
     236           0 :                 if(cfg->network.dcs_unbuffered) {
     237           0 :                     LOG_INFO("Enabling zero-copy poublication of DCS e-links");
     238           0 :                     dcs_zc_reader_factory(unique_id, dma_buf, bus_map.at(unique_id), cfg->network.dcs_ip, port, ic_dcs_elinks);
     239             :                 }
     240             :                 else {
     241           0 :                     dcs_reader_factory(unique_id, dma_buf, bus_map.at(unique_id), port, ic_dcs_elinks);
     242             :                 }
     243             :             }
     244             : 
     245             :             //run interrupt-listening threads
     246          22 :             if(cfg->poll_time == 0){
     247           0 :                 irq_listeners.push_back(std::thread(irq_listener, std::ref(unique_id)));
     248             :             }
     249             : 
     250             :         } // DMAs
     251             :     } // Devices
     252          22 : }
     253             : 
     254             : 
     255             : template <class CFG, class DEV, class BUF>
     256         177 : void ToHost<CFG, DEV, BUF>::print_monitoring()
     257             : {
     258             :     //devices
     259         177 :     std::string ts = Util::ISO8601TimeUTC();
     260         354 :     for (auto& d : devices) {
     261         177 :         unsigned int device = d->get_device_number();
     262         177 :         ToHostDeviceStats s(
     263             :             device
     264             :         );
     265         177 :         m_mon.append_device_stats(ts, m_hostname, s);
     266             :         //buffers
     267         354 :         for (auto& dma : dma_buffers) {
     268             :             //Discard DMA buffers of other devices
     269         177 :             unsigned int dma_buf_device = cfg->udmaid_to_deviceid(dma.first);
     270         177 :             if (dma_buf_device != device) {
     271           0 :                 continue;
     272             :             }
     273         177 :             unsigned int dmaid = cfg->udmaid_to_dmaid(dma.first);
     274         177 :             ToHostDmaStats ds(
     275             :                 dmaid,
     276         177 :                 dma.second->dma_get_free_MB(),
     277         177 :                 dma.second->dma_get_irq_counter()
     278             :             );
     279         177 :             m_mon.append_dma_stats(ts, m_hostname, device, ds);
     280             :             //readers
     281         177 :             if (m_readers.count(dma.first) > 0 ){ //for DMAs with no e-links/readers
     282         365 :                 for (auto& r : m_readers.at(dma.first)) {
     283         188 :                     unsigned int reader_id = r.get_id();
     284         188 :                     ToHostReaderStats rs (
     285             :                         reader_id,
     286         188 :                         static_cast<int>(r.get_elink_type()),
     287             :                         r.get_network_subscriptions(),
     288             :                         r.get_network_resource_counter(), //non-thread safe read op
     289             :                         r.get_network_resource_available_calls()  //non-thread safe read op
     290             :                     );
     291         188 :                     m_mon.append_reader_stats(ts, m_hostname, device, dmaid, rs);
     292             :                     //elinks
     293         188 :                     auto elink_mon_info = r.get_reader_elink_stats();
     294         489 :                     for(const auto & e : elink_mon_info) {
     295         301 :                         m_mon.append_elink_stats(ts, m_hostname, device, dmaid, reader_id, e);
     296             :                     }
     297             :                 }
     298             :             }
     299             :         }
     300             :     }
     301             :     // m_mon.write_message(); //keep commented for tokenized writes
     302         177 : }

Generated by: LCOV version 1.0