.. _program_listing_file_tohost.hpp: Program Listing for File tohost.hpp =================================== |exhale_lsh| :ref:`Return to documentation for file ` (``tohost.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #include #include #include #include #include "device.hpp" #include "flxcard/FlxCard.h" #include "config/config.hpp" #include "tohost_reader.hpp" #include "network/netio_buffered_publisher.hpp" #include "network/netio_zerocopy_publisher.hpp" #include "log.hpp" #include "bus.hpp" #include "util.hpp" #include "utility.hpp" template class ToHost { public: explicit ToHost(std::unique_ptr config) : cfg(std::move(config)), m_hostname(Util::get_full_hostname()), m_mon(cfg->stats.monitoring_fifo) {m_running = true;}; void start(); void print_monitoring(); void stop(); public: std::unique_ptr cfg; std::list> devices; std::map> dma_buffers; private: bool m_running; std::string m_hostname; ToHostMonitor m_mon; std::map bus_map; std::map>> m_readers; std::vector irq_listeners; void make_bus(int dmaid, int unique_dmaid); }; template void ToHost::make_bus(int dmaid, int unique_dmaid) { if(bus_map.find(unique_dmaid) == bus_map.end()){ bus_map.emplace(std::piecewise_construct, std::forward_as_tuple(unique_dmaid), std::forward_as_tuple(cfg->network.bus_dir, cfg->network.bus_groupname, dmaid, cfg->network.verbose_bus) ); } } template void ToHost::stop() { m_running = false; for (auto& dma : dma_buffers) { dma.second->stop(); } } template void ToHost::start() { log_init(cfg->verbose); LOG_INFO("Number of PCIe endpoints to read %d", cfg->get_number_devices()); LOG_INFO("Polling period %u us", cfg->poll_time); //Open devices, allocate buffers and bus auto daq_reader_factory = [&](int udmaid, std::shared_ptr buf, Bus &bus, int port, const std::vector & elinks) { m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_buffered_publisher(cfg->network.evloop_type, cfg->network.ip, port, bus, cfg->network.netio_pages, cfg->network.netio_pagesize, cfg->network.netio_watermark, cfg->network.netio_timeout, cfg->max_daq_chunk_size), cfg->poll_time, false, cfg->l1id_check); }; auto dcs_reader_factory = [&](int udmaid, std::shared_ptr buf, Bus &bus, int port, const std::vector & elinks) { m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_buffered_publisher(cfg->network.evloop_type, cfg->network.dcs_ip, port, bus, cfg->network.dcs_netio_pages, cfg->network.dcs_netio_pagesize, cfg->network.dcs_netio_watermark, cfg->network.dcs_netio_timeout, cfg->max_dcs_chunk_size), cfg->poll_time, false, 0); }; auto ttc_reader_factory = [&](int udmaid, std::shared_ptr buf, Bus &bus, int port, const std::vector & elinks) { m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_buffered_publisher(cfg->network.evloop_type, cfg->network.ip, port, bus, cfg->network.ttc_netio_pages, cfg->network.ttc_netio_pagesize, cfg->network.ttc_netio_watermark, cfg->network.ttc_netio_timeout, UINT32_MAX), cfg->poll_time, false, cfg->l1id_check); }; auto daq_zc_reader_factory = [&](int udmaid, std::shared_ptr buf, Bus &bus, const std::string& ip, int port, const std::vector & elinks) { m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_zero_copy_publisher(cfg->network.evloop_type, ip, port, bus, cfg->network.netio_pages, cfg->network.netio_pagesize, cfg->max_daq_chunk_size, buf->dma_get_vaddr(), buf->dma_get_size()), cfg->poll_time, true, cfg->l1id_check); }; auto dcs_zc_reader_factory = [&](int udmaid, std::shared_ptr buf, Bus &bus, const std::string& ip, int port, const std::vector & elinks) { m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_zero_copy_publisher(cfg->network.evloop_type, ip, port, bus, cfg->network.dcs_netio_pages, cfg->network.dcs_netio_pagesize, cfg->max_dcs_chunk_size, buf->dma_get_vaddr(), buf->dma_get_size()), cfg->poll_time, true, cfg->l1id_check); }; //for interrupt-driven mode auto irq_listener = [&](int udmaid){ LOG_INFO("Running interrupt listener for unique DMA ID %d", udmaid); while(m_running){ dma_buffers.at(udmaid)->dma_wait_for_data_irq(); dma_buffers.at(udmaid)->dma_increase_irq_counter(); for (auto& r : m_readers.at(udmaid)){ r.fire_publisher_async_signal(); } } }; int tot_elinks{0}; //Open devices, allocate buffers and bus for (auto & dev : devices){ int dev_no = dev->get_device_number(); LOG_DBG("Opening PCIe endpoint %u", dev_no); unsigned int lock_mask = dev->make_dma_lock_mask(cfg->resource.dma_ids); int ret = dev->open_device(lock_mask); if(ret != 0 ){ LOG_ERR("Cannot open device %d.", dev_no); throw std::runtime_error("Fatal error: cannot open requested device"); } flx_tohost_format fmt = dev->get_tohost_data_format(); switch (fmt) { case flx_tohost_format::TOHOST_SUBCHUNK_TRAILER: LOG_INFO("Device %d uses TOHOST_SUBCHUNK_TRAILER data format", dev_no); break; case flx_tohost_format::TOHOST_SUBCHUNK_HEADER: LOG_INFO("Device %d uses TOHOST_SUBCHUNK_HEADER data format", dev_no); break; default: LOG_ERR("Device %d: ToHost data format %d not recognised. Assuming TOHOST_SUBCHUNK_TRAILER", dev_no, fmt); } for(int id : cfg->resource.dma_ids){ int unique_id = cfg->get_unique_dmaid(id, dev_no); auto dma_buf = dma_buffers.at(unique_id); make_bus(id, unique_id); std::string dma_name = "tohost-d" + std::to_string(dev_no) + "-" + std::to_string(id); LOG_INFO("Allocating DMA buffer %s (PCIe endpoint %u, DMAID %d)", dma_name.c_str(), dev_no, id); dma_buf->allocate_buffer(cfg->resource.cmem_buffersize, dma_name, cfg->vmem, cfg->resource.free_previous_cmem); dma_buf->dma_start_continuous(); if (cfg->poll_time == 0){ dma_buf->irq_data_enable(); } std::vector ttc2h_elinks = dma_buf->get_elinks_of_type(elink_type_t::TTC); std::vector ic_elinks = dma_buf->get_elinks_of_type(elink_type_t::IC); std::vector dcs_elinks = dma_buf->get_elinks_of_type(elink_type_t::DCS); std::vector ic_dcs_elinks(ic_elinks.begin(), ic_elinks.end()); ic_dcs_elinks.insert(ic_dcs_elinks.end(), dcs_elinks.begin(), dcs_elinks.end()); bool has_daq = not dma_buf->get_elinks_of_type(elink_type_t::DAQ).empty(); bool has_ttc = not ttc2h_elinks.empty(); bool has_dcs = not ic_dcs_elinks.empty(); if ((has_daq and cfg->network.daq_unbuffered) or (has_dcs and cfg->network.dcs_unbuffered)) { dma_buf->set_zero_copy_reader(); } //DAQ int thread{0}; for (const std::vector & elinks : dma_buf->split_elinks_of_type(elink_type_t::DAQ, cfg->daq_threads)) { int port = cfg->network.ports.at(unique_id)+1000*thread; LOG_INFO("PCIe endpoint %u, DMA %d: DAQ thread %d publishing on port %d", dev_no, id, thread, port); for( const auto & e : elinks ){ LOG_INFO("E-link %d:\t0x%02X\t0x%016lx\t(channel %d egroup %d epath %d), type %s, streams %c", tot_elinks, e.lid, e.fid, dev->get_channel(e.lid), dev->get_egroup(e.lid), dev->get_epath(e.lid), dev->get_elink_type_str(e.type).c_str(), e.has_streams ? 'Y' : 'N'); ++tot_elinks; } if(cfg->network.daq_unbuffered) { LOG_INFO("Enabling zero-copy poublication of DAQ e-links"); daq_zc_reader_factory(unique_id, dma_buf, bus_map.at(unique_id), cfg->network.ip, port, elinks); } else { daq_reader_factory(unique_id, dma_buf, bus_map.at(unique_id), port, elinks); } ++thread; } //TTC2H if ( has_ttc ) { int port = cfg->network.ttc_ports.at(unique_id); LOG_INFO("Device %u, DMA %d: TTC2H elink published on port %d", dev_no, id, port); auto & e = ttc2h_elinks[0]; LOG_INFO("E-link %d:\t0x%02X\t0x%016lx\t(channel %d egroup %d epath %d), type %s", tot_elinks, e.lid, e.fid, dev->get_channel(e.lid), dev->get_egroup(e.lid), dev->get_epath(e.lid), dev->get_elink_type_str(e.type).c_str()); ttc_reader_factory(unique_id, dma_buf, bus_map.at(unique_id), port, ttc2h_elinks); ++tot_elinks; } if ( has_dcs ) { int port = cfg->network.dcs_ports.at(unique_id); LOG_INFO("Device %u, DMA %d: DCS elinks published on port %d", dev_no, id, port); for( auto & e : ic_dcs_elinks ){ LOG_INFO("E-link %d:\t0x%02X\t0x%016lx\t(channel %d egroup %d epath %d), type %s", tot_elinks, e.lid, e.fid, dev->get_channel(e.lid), dev->get_egroup(e.lid), dev->get_epath(e.lid), dev->get_elink_type_str(e.type).c_str()); ++tot_elinks; } if(cfg->network.dcs_unbuffered) { LOG_INFO("Enabling zero-copy poublication of DCS e-links"); dcs_zc_reader_factory(unique_id, dma_buf, bus_map.at(unique_id), cfg->network.dcs_ip, port, ic_dcs_elinks); } else { dcs_reader_factory(unique_id, dma_buf, bus_map.at(unique_id), port, ic_dcs_elinks); } } //run interrupt-listening threads if(cfg->poll_time == 0){ irq_listeners.push_back(std::thread(irq_listener, std::ref(unique_id))); } } // DMAs } // Devices } template void ToHost::print_monitoring() { //devices std::string ts = Util::ISO8601TimeUTC(); for (auto& d : devices) { unsigned int device = d->get_device_number(); ToHostDeviceStats s( device ); m_mon.append_device_stats(ts, m_hostname, s); //buffers for (auto& dma : dma_buffers) { //Discard DMA buffers of other devices unsigned int dma_buf_device = cfg->udmaid_to_deviceid(dma.first); if (dma_buf_device != device) { continue; } unsigned int dmaid = cfg->udmaid_to_dmaid(dma.first); ToHostDmaStats ds( dmaid, dma.second->dma_get_free_MB(), dma.second->dma_get_irq_counter() ); m_mon.append_dma_stats(ts, m_hostname, device, ds); //readers if (m_readers.count(dma.first) > 0 ){ //for DMAs with no e-links/readers for (auto& r : m_readers.at(dma.first)) { unsigned int reader_id = r.get_id(); ToHostReaderStats rs ( reader_id, static_cast(r.get_elink_type()), r.get_network_subscriptions(), r.get_network_resource_counter(), //non-thread safe read op r.get_network_resource_available_calls() //non-thread safe read op ); m_mon.append_reader_stats(ts, m_hostname, device, dmaid, rs); //elinks auto elink_mon_info = r.get_reader_elink_stats(); for(const auto & e : elink_mon_info) { m_mon.append_elink_stats(ts, m_hostname, device, dmaid, reader_id, e); } } } } } // m_mon.write_message(); //keep commented for tokenized writes }