Line data Source code
1 : #include <cstdint>
2 : #include <list>
3 : #include <map>
4 : #include <csignal>
5 : #include "device.hpp"
6 : #include "flxcard/FlxCard.h"
7 : #include "config/config.hpp"
8 : #include "tohost_reader.hpp"
9 : #include "network/netio_buffered_publisher.hpp"
10 : #include "network/netio_zerocopy_publisher.hpp"
11 : #include "log.hpp"
12 : #include "bus.hpp"
13 : #include "util.hpp"
14 : #include "utility.hpp"
15 :
16 :
17 : /**
18 : * Core of felix-tohost and felix-file2host.
19 : */
20 : template <class CFG, class DEV, class BUF>
21 : class ToHost {
22 : public:
23 : /**
24 : * @brief ToHost constructor: scalable readout of multiple devices and DMA buffers.
25 : * @details This constructor initialises the object; it does not start reading operations.
26 : * @param config configuration parameters.
27 : */
28 22 : explicit ToHost(std::unique_ptr<CFG> config) :
29 22 : cfg(std::move(config)), m_hostname(Util::get_full_hostname()), m_mon(cfg->stats.monitoring_fifo)
30 22 : {m_running = true;};
31 :
32 : /**
33 : * @brief Open devices, allocate buffers, create publishers and start
34 : * reading data.
35 : */
36 : void start();
37 :
38 : /**
39 : * @brief retrieve and write monitoring information. To be executed by the
40 : * thread that creates an instance of this class.
41 : */
42 : void print_monitoring();
43 :
44 : /**
45 : * @brief Stop reading all associated DMA buffers.
46 : */
47 : void stop();
48 :
49 : public:
50 : std::unique_ptr<CFG> cfg;
51 : std::list<std::shared_ptr<DEV>> devices;
52 : std::map<int, std::shared_ptr<BUF>> dma_buffers;
53 :
54 : private:
55 : bool m_running;
56 : std::string m_hostname;
57 : ToHostMonitor m_mon;
58 : std::map<int, Bus> bus_map;
59 : std::map<int, std::list<ToHostReader<BUF>>> m_readers;
60 : std::vector<std::thread> irq_listeners;
61 : void make_bus(int dmaid, int unique_dmaid);
62 : };
63 :
64 :
65 : template <class CFG, class DEV, class BUF>
66 22 : void ToHost<CFG, DEV, BUF>::make_bus(int dmaid, int unique_dmaid)
67 : {
68 22 : if(bus_map.find(unique_dmaid) == bus_map.end()){
69 22 : bus_map.emplace(std::piecewise_construct,
70 : std::forward_as_tuple(unique_dmaid),
71 22 : std::forward_as_tuple(cfg->network.bus_dir, cfg->network.bus_groupname, dmaid, cfg->network.verbose_bus)
72 : );
73 : }
74 22 : }
75 :
76 : template <class CFG, class DEV, class BUF>
77 22 : void ToHost<CFG, DEV, BUF>::stop()
78 : {
79 22 : m_running = false;
80 44 : for (auto& dma : dma_buffers) {
81 22 : dma.second->stop();
82 : }
83 22 : }
84 :
85 : template <class CFG, class DEV, class BUF>
86 22 : void ToHost<CFG, DEV, BUF>::start()
87 : {
88 22 : log_init(cfg->verbose);
89 22 : LOG_INFO("Number of PCIe endpoints to read %d", cfg->get_number_devices());
90 22 : LOG_INFO("Polling period %u us", cfg->poll_time);
91 : //Open devices, allocate buffers and bus
92 42 : auto daq_reader_factory = [&](int udmaid, std::shared_ptr<BUF> buf, Bus &bus, int port, const std::vector<Elink> & elinks) {
93 20 : m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_buffered_publisher(cfg->network.evloop_type, cfg->network.ip, port, bus,
94 10 : cfg->network.netio_pages, cfg->network.netio_pagesize,
95 10 : cfg->network.netio_watermark, cfg->network.netio_timeout,
96 20 : cfg->max_daq_chunk_size), cfg->poll_time, false, cfg->l1id_check);
97 : };
98 :
99 22 : auto dcs_reader_factory = [&](int udmaid, std::shared_ptr<BUF> buf, Bus &bus, int port, const std::vector<Elink> & elinks) {
100 0 : m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_buffered_publisher(cfg->network.evloop_type, cfg->network.dcs_ip, port, bus,
101 0 : cfg->network.dcs_netio_pages, cfg->network.dcs_netio_pagesize,
102 0 : cfg->network.dcs_netio_watermark, cfg->network.dcs_netio_timeout,
103 0 : cfg->max_dcs_chunk_size), cfg->poll_time, false, 0);
104 : };
105 :
106 30 : auto ttc_reader_factory = [&](int udmaid, std::shared_ptr<BUF> buf, Bus &bus, int port, const std::vector<Elink> & elinks) {
107 8 : m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_buffered_publisher(cfg->network.evloop_type, cfg->network.ip, port, bus,
108 4 : cfg->network.ttc_netio_pages, cfg->network.ttc_netio_pagesize,
109 4 : cfg->network.ttc_netio_watermark, cfg->network.ttc_netio_timeout,
110 8 : UINT32_MAX), cfg->poll_time, false, cfg->l1id_check);
111 : };
112 :
113 42 : auto daq_zc_reader_factory = [&](int udmaid, std::shared_ptr<BUF> buf, Bus &bus, const std::string& ip, int port, const std::vector<Elink> & elinks) {
114 20 : m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_zero_copy_publisher(cfg->network.evloop_type, ip, port, bus,
115 10 : cfg->network.netio_pages, cfg->network.netio_pagesize, cfg->max_daq_chunk_size,
116 20 : buf->dma_get_vaddr(), buf->dma_get_size()), cfg->poll_time, true, cfg->l1id_check);
117 : };
118 :
119 22 : auto dcs_zc_reader_factory = [&](int udmaid, std::shared_ptr<BUF> buf, Bus &bus, const std::string& ip, int port, const std::vector<Elink> & elinks) {
120 0 : m_readers[udmaid].emplace_back(buf, elinks, network::utility::create_zero_copy_publisher(cfg->network.evloop_type, ip, port, bus,
121 0 : cfg->network.dcs_netio_pages, cfg->network.dcs_netio_pagesize, cfg->max_dcs_chunk_size,
122 0 : buf->dma_get_vaddr(), buf->dma_get_size()), cfg->poll_time, true, cfg->l1id_check);
123 : };
124 :
125 : //for interrupt-driven mode
126 22 : auto irq_listener = [&](int udmaid){
127 0 : LOG_INFO("Running interrupt listener for unique DMA ID %d", udmaid);
128 0 : while(m_running){
129 0 : dma_buffers.at(udmaid)->dma_wait_for_data_irq();
130 0 : dma_buffers.at(udmaid)->dma_increase_irq_counter();
131 0 : for (auto& r : m_readers.at(udmaid)){
132 0 : r.fire_publisher_async_signal();
133 : }
134 : }
135 : };
136 :
137 22 : int tot_elinks{0};
138 : //Open devices, allocate buffers and bus
139 44 : for (auto & dev : devices){
140 :
141 22 : int dev_no = dev->get_device_number();
142 22 : LOG_DBG("Opening PCIe endpoint %u", dev_no);
143 22 : unsigned int lock_mask = dev->make_dma_lock_mask(cfg->resource.dma_ids);
144 22 : int ret = dev->open_device(lock_mask);
145 22 : if(ret != 0 ){
146 0 : LOG_ERR("Cannot open device %d.", dev_no);
147 0 : throw std::runtime_error("Fatal error: cannot open requested device");
148 : }
149 :
150 22 : flx_tohost_format fmt = dev->get_tohost_data_format();
151 22 : switch (fmt) {
152 21 : case flx_tohost_format::TOHOST_SUBCHUNK_TRAILER:
153 21 : LOG_INFO("Device %d uses TOHOST_SUBCHUNK_TRAILER data format", dev_no);
154 : break;
155 1 : case flx_tohost_format::TOHOST_SUBCHUNK_HEADER:
156 1 : LOG_INFO("Device %d uses TOHOST_SUBCHUNK_HEADER data format", dev_no);
157 : break;
158 0 : default:
159 0 : LOG_ERR("Device %d: ToHost data format %d not recognised. Assuming TOHOST_SUBCHUNK_TRAILER", dev_no, fmt);
160 : }
161 :
162 :
163 66 : for(int id : cfg->resource.dma_ids){
164 :
165 22 : int unique_id = cfg->get_unique_dmaid(id, dev_no);
166 22 : auto dma_buf = dma_buffers.at(unique_id);
167 22 : make_bus(id, unique_id);
168 :
169 66 : std::string dma_name = "tohost-d" + std::to_string(dev_no) + "-" + std::to_string(id);
170 22 : LOG_INFO("Allocating DMA buffer %s (PCIe endpoint %u, DMAID %d)", dma_name.c_str(), dev_no, id);
171 22 : dma_buf->allocate_buffer(cfg->resource.cmem_buffersize, dma_name, cfg->vmem, cfg->resource.free_previous_cmem);
172 22 : dma_buf->dma_start_continuous();
173 :
174 22 : if (cfg->poll_time == 0){
175 22 : dma_buf->irq_data_enable();
176 : }
177 :
178 22 : std::vector<Elink> ttc2h_elinks = dma_buf->get_elinks_of_type(elink_type_t::TTC);
179 22 : std::vector<Elink> ic_elinks = dma_buf->get_elinks_of_type(elink_type_t::IC);
180 22 : std::vector<Elink> dcs_elinks = dma_buf->get_elinks_of_type(elink_type_t::DCS);
181 22 : std::vector<Elink> ic_dcs_elinks(ic_elinks.begin(), ic_elinks.end());
182 22 : ic_dcs_elinks.insert(ic_dcs_elinks.end(), dcs_elinks.begin(), dcs_elinks.end());
183 :
184 22 : bool has_daq = not dma_buf->get_elinks_of_type(elink_type_t::DAQ).empty();
185 22 : bool has_ttc = not ttc2h_elinks.empty();
186 22 : bool has_dcs = not ic_dcs_elinks.empty();
187 20 : if ((has_daq and cfg->network.daq_unbuffered)
188 32 : or (has_dcs and cfg->network.dcs_unbuffered)) {
189 10 : dma_buf->set_zero_copy_reader();
190 : }
191 :
192 : //DAQ
193 22 : int thread{0};
194 42 : for (const std::vector<Elink> & elinks :
195 22 : dma_buf->split_elinks_of_type(elink_type_t::DAQ, cfg->daq_threads)) {
196 20 : int port = cfg->network.ports.at(unique_id)+1000*thread;
197 20 : LOG_INFO("PCIe endpoint %u, DMA %d: DAQ thread %d publishing on port %d", dev_no, id, thread, port);
198 52 : for( const auto & e : elinks ){
199 60 : LOG_INFO("E-link %d:\t0x%02X\t0x%016lx\t(channel %d egroup %d epath %d), type %s, streams %c",
200 : tot_elinks, e.lid, e.fid, dev->get_channel(e.lid), dev->get_egroup(e.lid), dev->get_epath(e.lid),
201 : dev->get_elink_type_str(e.type).c_str(), e.has_streams ? 'Y' : 'N');
202 32 : ++tot_elinks;
203 : }
204 20 : if(cfg->network.daq_unbuffered) {
205 10 : LOG_INFO("Enabling zero-copy poublication of DAQ e-links");
206 30 : daq_zc_reader_factory(unique_id, dma_buf, bus_map.at(unique_id), cfg->network.ip, port, elinks);
207 : }
208 : else {
209 30 : daq_reader_factory(unique_id, dma_buf, bus_map.at(unique_id), port, elinks);
210 : }
211 20 : ++thread;
212 : }
213 :
214 : //TTC2H
215 22 : if ( has_ttc ) {
216 4 : int port = cfg->network.ttc_ports.at(unique_id);
217 4 : LOG_INFO("Device %u, DMA %d: TTC2H elink published on port %d", dev_no, id, port);
218 4 : auto & e = ttc2h_elinks[0];
219 4 : LOG_INFO("E-link %d:\t0x%02X\t0x%016lx\t(channel %d egroup %d epath %d), type %s",
220 : tot_elinks, e.lid, e.fid, dev->get_channel(e.lid), dev->get_egroup(e.lid),
221 : dev->get_epath(e.lid), dev->get_elink_type_str(e.type).c_str());
222 8 : ttc_reader_factory(unique_id, dma_buf, bus_map.at(unique_id), port, ttc2h_elinks);
223 4 : ++tot_elinks;
224 : }
225 :
226 22 : if ( has_dcs ) {
227 0 : int port = cfg->network.dcs_ports.at(unique_id);
228 0 : LOG_INFO("Device %u, DMA %d: DCS elinks published on port %d", dev_no, id, port);
229 0 : for( auto & e : ic_dcs_elinks ){
230 0 : LOG_INFO("E-link %d:\t0x%02X\t0x%016lx\t(channel %d egroup %d epath %d), type %s",
231 : tot_elinks, e.lid, e.fid, dev->get_channel(e.lid), dev->get_egroup(e.lid),
232 : dev->get_epath(e.lid), dev->get_elink_type_str(e.type).c_str());
233 0 : ++tot_elinks;
234 : }
235 :
236 0 : if(cfg->network.dcs_unbuffered) {
237 0 : LOG_INFO("Enabling zero-copy poublication of DCS e-links");
238 0 : dcs_zc_reader_factory(unique_id, dma_buf, bus_map.at(unique_id), cfg->network.dcs_ip, port, ic_dcs_elinks);
239 : }
240 : else {
241 0 : dcs_reader_factory(unique_id, dma_buf, bus_map.at(unique_id), port, ic_dcs_elinks);
242 : }
243 : }
244 :
245 : //run interrupt-listening threads
246 22 : if(cfg->poll_time == 0){
247 0 : irq_listeners.push_back(std::thread(irq_listener, std::ref(unique_id)));
248 : }
249 :
250 : } // DMAs
251 : } // Devices
252 22 : }
253 :
254 :
255 : template <class CFG, class DEV, class BUF>
256 177 : void ToHost<CFG, DEV, BUF>::print_monitoring()
257 : {
258 : //devices
259 177 : std::string ts = Util::ISO8601TimeUTC();
260 354 : for (auto& d : devices) {
261 177 : unsigned int device = d->get_device_number();
262 177 : ToHostDeviceStats s(
263 : device
264 : );
265 177 : m_mon.append_device_stats(ts, m_hostname, s);
266 : //buffers
267 354 : for (auto& dma : dma_buffers) {
268 : //Discard DMA buffers of other devices
269 177 : unsigned int dma_buf_device = cfg->udmaid_to_deviceid(dma.first);
270 177 : if (dma_buf_device != device) {
271 0 : continue;
272 : }
273 177 : unsigned int dmaid = cfg->udmaid_to_dmaid(dma.first);
274 177 : ToHostDmaStats ds(
275 : dmaid,
276 177 : dma.second->dma_get_free_MB(),
277 177 : dma.second->dma_get_irq_counter()
278 : );
279 177 : m_mon.append_dma_stats(ts, m_hostname, device, ds);
280 : //readers
281 177 : if (m_readers.count(dma.first) > 0 ){ //for DMAs with no e-links/readers
282 365 : for (auto& r : m_readers.at(dma.first)) {
283 188 : unsigned int reader_id = r.get_id();
284 188 : ToHostReaderStats rs (
285 : reader_id,
286 188 : static_cast<int>(r.get_elink_type()),
287 : r.get_network_subscriptions(),
288 : r.get_network_resource_counter(), //non-thread safe read op
289 : r.get_network_resource_available_calls() //non-thread safe read op
290 : );
291 188 : m_mon.append_reader_stats(ts, m_hostname, device, dmaid, rs);
292 : //elinks
293 188 : auto elink_mon_info = r.get_reader_elink_stats();
294 489 : for(const auto & e : elink_mon_info) {
295 301 : m_mon.append_elink_stats(ts, m_hostname, device, dmaid, reader_id, e);
296 : }
297 : }
298 : }
299 : }
300 : }
301 : // m_mon.write_message(); //keep commented for tokenized writes
302 177 : }
|