LCOV - code coverage report
Current view: top level - src - fromhost.hpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 61 69 88.4 %
Date: 2025-09-09 12:09:29 Functions: 5 6 83.3 %

          Line data    Source code
       1             : #include <list>
       2             : #include <map>
       3             : #include <csignal>
       4             : #include "config/config.hpp"
       5             : #include "fromhost_writer.hpp"
       6             : #include "network/netio_buffered_receiver.hpp"
       7             : #include "network/netio_unbuffered_receiver.hpp"
       8             : #include "log.hpp"
       9             : #include "bus.hpp"
      10             : #include "util.hpp"
      11             : #include "utility.hpp"
      12             : 
      13             : 
      14             : /**
      15             :  * Class at the core of felix-toflx and felix-file2flx.
      16             :  * Templeted over Config, Device and ToHostBuffer classes.
      17             :  * */
      18             : template <class C, class D, class B>
      19             : class FromHost {
      20             : 
      21             :     public:
      22           5 :         explicit FromHost(std::unique_ptr<C> config) :
      23          10 :             cfg(std::move(config)), m_hostname(Util::get_full_hostname()), m_mon(cfg->stats.monitoring_fifo){ };
      24             : 
      25             :         /**
      26             :          * @brief allocate all resources and start thread(s).
      27             :          */
      28             :         void start();
      29             : 
      30             :         /**
      31             :          * @brief stop transfers, join all created threads.
      32             :          */
      33             :         void stop();
      34             : 
      35             :         /**
      36             :          * @brief print monitoring information in FIFO.
      37             :          */
      38             :         void print_monitoring();
      39             : 
      40             :     public:
      41             :         std::unique_ptr<C> cfg;
      42             :         std::list<std::shared_ptr<D>> devices;
      43             :         std::map<int, std::shared_ptr<B>> dma_buffers; //use device number as id
      44             : 
      45             :     private:
      46             :         std::string m_hostname;
      47             :         FromHostMonitor m_mon;
      48             :         std::map<int, Bus> m_bus_map;
      49             :         std::map<int, std::list<FromHostWriter<B>>> m_writers;
      50             : };
      51             : 
      52             : 
      53             : template <class C, class D, class B>
      54           5 : void FromHost<C, D, B>::start()
      55             : {
      56           5 :     log_init(cfg->verbose);
      57           5 :     LOG_INFO("Number of PCIe endpoints to serve %d", cfg->get_number_devices());
      58             : 
      59             :     //Open devices, allocate buffers and bus
      60          15 :     auto netiobuf_writer_factory = [&](int dev_no, std::shared_ptr<B> buf, Bus &bus, int port, const std::vector<Elink> & elinks) {
      61          10 :         m_writers[dev_no].emplace_back(buf, elinks, network::utility::create_buffered_receiver(cfg->network.evloop_type, cfg->network.ip, port, bus,
      62           5 :             cfg->network.netio_pages, cfg->network.netio_pagesize), cfg->trickle);
      63             :     };
      64             : 
      65           5 :     auto netiounbuf_writer_factory = [&](int dev_no, std::shared_ptr<B> buf, Bus &bus, int port, const std::vector<Elink> & elinks) {
      66           0 :         m_writers[dev_no].emplace_back(buf, elinks, network::utility::create_unbuffered_receiver(cfg->network.evloop_type, cfg->network.ip, port, bus,
      67           0 :             cfg->network.netio_pages, cfg->network.netio_pagesize));
      68             :     };
      69             : 
      70           5 :     int tot_elinks{0};
      71           5 :     int thread{0};
      72             : 
      73          15 :     for (auto & dev : devices){
      74             : 
      75             :         //Get DMA ID
      76           5 :         int ret = dev->open_device(0); // Open and close device with lock_mask{0} just to recover DMA ID
      77           5 :         int dev_no = dev->get_device_number();
      78           5 :         if(ret != 0 ){
      79           0 :             LOG_ERR("Cannot open device %d.", dev_no);
      80           0 :             continue;
      81             :         };
      82           5 :         int dmaid = (cfg->trickle) ? dev->get_trickle_dmaid() : dev->get_fromhost_dmaid();
      83           5 :         dev->close_device(); // Open and close device with lock_mask{0} just to recover DMA ID
      84             : 
      85           5 :         LOG_INFO("Opening PCIe endpoint %u", dev_no);
      86           5 :         unsigned int lock_mask = dev->make_dma_lock_mask({dmaid});
      87           5 :         ret = dev->open_device(lock_mask);
      88           5 :         if(ret != 0 ){
      89           0 :             LOG_ERR("Cannot open device %d.", dev_no);
      90           0 :             continue;
      91             :         };
      92             : 
      93             :         //Initialize bus and  DMA buffers
      94          10 :         auto dma_buf = dma_buffers.at(dev_no);
      95          10 :         dma_buf->set_dmaid(dmaid);
      96             : 
      97           5 :         auto data_format = dev->get_fromhost_data_format();
      98           5 :         dma_buf->set_encoder_data_format(data_format);
      99             : 
     100          10 :         m_bus_map.emplace(std::piecewise_construct,
     101             :                 std::forward_as_tuple(dev_no),
     102           5 :                 std::forward_as_tuple(cfg->network.bus_dir, cfg->network.bus_groupname, dmaid, cfg->network.verbose_bus)
     103             :         );
     104             : 
     105          15 :         std::string dma_name = "fromhost-d" + std::to_string(dev_no) + "-" + std::to_string(dmaid);
     106           5 :         LOG_INFO("Allocating DMA buffer %s of size %u MB (PCIe endpoint %u, DMAID %d)", dma_name.c_str(), cfg->resource.cmem_buffersize/1024/1024, dev_no, dmaid);
     107           5 :         dma_buf->allocate_buffer(cfg->resource.cmem_buffersize, dma_name, cfg->vmem, cfg->resource.free_previous_cmem);
     108             :         
     109           5 :         dma_buf->dma_start_continuous();
     110             :     
     111           5 :         std::vector<Elink> elinks = dma_buf->get_elinks();
     112           5 :         int port = cfg->network.ports.at(dev_no) + 1000*thread;
     113             : 
     114           5 :         LOG_INFO("PCIe endpoint %u, DMA %d: DAQ thread %d publishing on port %d", dev_no, dmaid, thread, port);
     115          10 :         for( const auto & e : elinks ) {
     116           5 :             LOG_INFO("E-link %d:\t0x%02X\t0x%016lx\t(channel %d egroup %d epath %d), type %s",
     117             :                 tot_elinks, e.lid, e.fid, dev->get_channel(e.lid), dev->get_egroup(e.lid),
     118             :                 dev->get_epath(e.lid), dev->get_elink_type_str(e.type).c_str());
     119           5 :             ++tot_elinks;
     120             :         }
     121             : 
     122           5 :         if(cfg->unbuffered) {
     123           0 :             LOG_INFO("Requesting unbuffered communication to sender");
     124           0 :             netiounbuf_writer_factory(dev_no, dma_buf, m_bus_map.at(dev_no), port, elinks);
     125             :         }
     126             :         else {
     127          15 :             netiobuf_writer_factory(dev_no, dma_buf, m_bus_map.at(dev_no), port, elinks);
     128             :         }
     129           5 :         ++thread;
     130             :     }
     131             : 
     132           5 : }
     133             : 
     134             : 
     135             : template <class C, class D, class B>
     136           5 : void FromHost<C, D, B>::stop()
     137             : {
     138          10 :     for (auto& dma : dma_buffers) {
     139           5 :         dma.second->stop();
     140             :     }
     141           5 : }
     142             : 
     143             : 
     144             : template <class C, class D, class B>
     145          21 : void FromHost<C, D, B>::print_monitoring()
     146             : {
     147             :     //devices
     148          21 :     std::string ts = Util::ISO8601TimeUTC();
     149          42 :     for (auto& d : devices) {
     150          21 :         unsigned int device = d->get_device_number();
     151          21 :         FromHostDeviceStats s(
     152             :             device
     153             :         );
     154          21 :         m_mon.append_device_stats(ts, m_hostname, s);
     155          21 :         auto dma = dma_buffers.at(device);
     156          21 :         unsigned int dmaid = cfg->udmaid_to_dmaid(dma->get_dmaid());
     157          21 :         m_mon.append_dma_stats(ts, m_hostname, device, dma->get_monitoring_data());
     158             :         //readers
     159          42 :         for (auto& w : m_writers.at(device)) {
     160          21 :             unsigned int writer_id = w.get_id();
     161          21 :             FromHostWriterStats ws(
     162             :                 writer_id,
     163             :                 0, //type TODO: RDMA, TCP?
     164             :                 0  //netio-next socket list doesn't provide a counter
     165             :             );
     166          21 :             m_mon.append_writer_stats(ts, m_hostname, device, dmaid, ws);
     167             :             //elinks
     168          63 :             for(const auto & e : w.get_writer_elink_stats()) {
     169          21 :                 m_mon.append_elink_stats(ts, m_hostname, device, dmaid, writer_id, e.second);
     170             :             }
     171             :         }
     172             :     }
     173             :     //m_mon.write_message(); //keep commented for tokenized writes
     174          21 : }

Generated by: LCOV version 1.0