Line data Source code
1 : #include <list>
2 : #include <map>
3 : #include <csignal>
4 : #include "config/config.hpp"
5 : #include "fromhost_writer.hpp"
6 : #include "network/netio_buffered_receiver.hpp"
7 : #include "network/netio_unbuffered_receiver.hpp"
8 : #include "log.hpp"
9 : #include "bus.hpp"
10 : #include "util.hpp"
11 : #include "utility.hpp"
12 :
13 :
14 : /**
15 : * Class at the core of felix-toflx and felix-file2flx.
16 : * Templeted over Config, Device and ToHostBuffer classes.
17 : * */
18 : template <class C, class D, class B>
19 : class FromHost {
20 :
21 : public:
22 5 : explicit FromHost(std::unique_ptr<C> config) :
23 10 : cfg(std::move(config)), m_hostname(Util::get_full_hostname()), m_mon(cfg->stats.monitoring_fifo){ };
24 :
25 : /**
26 : * @brief allocate all resources and start thread(s).
27 : */
28 : void start();
29 :
30 : /**
31 : * @brief stop transfers, join all created threads.
32 : */
33 : void stop();
34 :
35 : /**
36 : * @brief print monitoring information in FIFO.
37 : */
38 : void print_monitoring();
39 :
40 : public:
41 : std::unique_ptr<C> cfg;
42 : std::list<std::shared_ptr<D>> devices;
43 : std::map<int, std::shared_ptr<B>> dma_buffers; //use device number as id
44 :
45 : private:
46 : std::string m_hostname;
47 : FromHostMonitor m_mon;
48 : std::map<int, Bus> m_bus_map;
49 : std::map<int, std::list<FromHostWriter<B>>> m_writers;
50 : };
51 :
52 :
53 : template <class C, class D, class B>
54 5 : void FromHost<C, D, B>::start()
55 : {
56 5 : log_init(cfg->verbose);
57 5 : LOG_INFO("Number of PCIe endpoints to serve %d", cfg->get_number_devices());
58 :
59 : //Open devices, allocate buffers and bus
60 15 : auto netiobuf_writer_factory = [&](int dev_no, std::shared_ptr<B> buf, Bus &bus, int port, const std::vector<Elink> & elinks) {
61 10 : m_writers[dev_no].emplace_back(buf, elinks, network::utility::create_buffered_receiver(cfg->network.evloop_type, cfg->network.ip, port, bus,
62 5 : cfg->network.netio_pages, cfg->network.netio_pagesize), cfg->trickle);
63 : };
64 :
65 5 : auto netiounbuf_writer_factory = [&](int dev_no, std::shared_ptr<B> buf, Bus &bus, int port, const std::vector<Elink> & elinks) {
66 0 : m_writers[dev_no].emplace_back(buf, elinks, network::utility::create_unbuffered_receiver(cfg->network.evloop_type, cfg->network.ip, port, bus,
67 0 : cfg->network.netio_pages, cfg->network.netio_pagesize));
68 : };
69 :
70 5 : int tot_elinks{0};
71 5 : int thread{0};
72 :
73 15 : for (auto & dev : devices){
74 :
75 : //Get DMA ID
76 5 : int ret = dev->open_device(0); // Open and close device with lock_mask{0} just to recover DMA ID
77 5 : int dev_no = dev->get_device_number();
78 5 : if(ret != 0 ){
79 0 : LOG_ERR("Cannot open device %d.", dev_no);
80 0 : continue;
81 : };
82 5 : int dmaid = (cfg->trickle) ? dev->get_trickle_dmaid() : dev->get_fromhost_dmaid();
83 5 : dev->close_device(); // Open and close device with lock_mask{0} just to recover DMA ID
84 :
85 5 : LOG_INFO("Opening PCIe endpoint %u", dev_no);
86 5 : unsigned int lock_mask = dev->make_dma_lock_mask({dmaid});
87 5 : ret = dev->open_device(lock_mask);
88 5 : if(ret != 0 ){
89 0 : LOG_ERR("Cannot open device %d.", dev_no);
90 0 : continue;
91 : };
92 :
93 : //Initialize bus and DMA buffers
94 10 : auto dma_buf = dma_buffers.at(dev_no);
95 10 : dma_buf->set_dmaid(dmaid);
96 :
97 5 : auto data_format = dev->get_fromhost_data_format();
98 5 : dma_buf->set_encoder_data_format(data_format);
99 :
100 10 : m_bus_map.emplace(std::piecewise_construct,
101 : std::forward_as_tuple(dev_no),
102 5 : std::forward_as_tuple(cfg->network.bus_dir, cfg->network.bus_groupname, dmaid, cfg->network.verbose_bus)
103 : );
104 :
105 15 : std::string dma_name = "fromhost-d" + std::to_string(dev_no) + "-" + std::to_string(dmaid);
106 5 : LOG_INFO("Allocating DMA buffer %s of size %u MB (PCIe endpoint %u, DMAID %d)", dma_name.c_str(), cfg->resource.cmem_buffersize/1024/1024, dev_no, dmaid);
107 5 : dma_buf->allocate_buffer(cfg->resource.cmem_buffersize, dma_name, cfg->vmem, cfg->resource.free_previous_cmem);
108 :
109 5 : dma_buf->dma_start_continuous();
110 :
111 5 : std::vector<Elink> elinks = dma_buf->get_elinks();
112 5 : int port = cfg->network.ports.at(dev_no) + 1000*thread;
113 :
114 5 : LOG_INFO("PCIe endpoint %u, DMA %d: DAQ thread %d publishing on port %d", dev_no, dmaid, thread, port);
115 10 : for( const auto & e : elinks ) {
116 5 : LOG_INFO("E-link %d:\t0x%02X\t0x%016lx\t(channel %d egroup %d epath %d), type %s",
117 : tot_elinks, e.lid, e.fid, dev->get_channel(e.lid), dev->get_egroup(e.lid),
118 : dev->get_epath(e.lid), dev->get_elink_type_str(e.type).c_str());
119 5 : ++tot_elinks;
120 : }
121 :
122 5 : if(cfg->unbuffered) {
123 0 : LOG_INFO("Requesting unbuffered communication to sender");
124 0 : netiounbuf_writer_factory(dev_no, dma_buf, m_bus_map.at(dev_no), port, elinks);
125 : }
126 : else {
127 15 : netiobuf_writer_factory(dev_no, dma_buf, m_bus_map.at(dev_no), port, elinks);
128 : }
129 5 : ++thread;
130 : }
131 :
132 5 : }
133 :
134 :
135 : template <class C, class D, class B>
136 5 : void FromHost<C, D, B>::stop()
137 : {
138 10 : for (auto& dma : dma_buffers) {
139 5 : dma.second->stop();
140 : }
141 5 : }
142 :
143 :
144 : template <class C, class D, class B>
145 21 : void FromHost<C, D, B>::print_monitoring()
146 : {
147 : //devices
148 21 : std::string ts = Util::ISO8601TimeUTC();
149 42 : for (auto& d : devices) {
150 21 : unsigned int device = d->get_device_number();
151 21 : FromHostDeviceStats s(
152 : device
153 : );
154 21 : m_mon.append_device_stats(ts, m_hostname, s);
155 21 : auto dma = dma_buffers.at(device);
156 21 : unsigned int dmaid = cfg->udmaid_to_dmaid(dma->get_dmaid());
157 21 : m_mon.append_dma_stats(ts, m_hostname, device, dma->get_monitoring_data());
158 : //readers
159 42 : for (auto& w : m_writers.at(device)) {
160 21 : unsigned int writer_id = w.get_id();
161 21 : FromHostWriterStats ws(
162 : writer_id,
163 : 0, //type TODO: RDMA, TCP?
164 : 0 //netio-next socket list doesn't provide a counter
165 : );
166 21 : m_mon.append_writer_stats(ts, m_hostname, device, dmaid, ws);
167 : //elinks
168 63 : for(const auto & e : w.get_writer_elink_stats()) {
169 21 : m_mon.append_elink_stats(ts, m_hostname, device, dmaid, writer_id, e.second);
170 : }
171 : }
172 : }
173 : //m_mon.write_message(); //keep commented for tokenized writes
174 21 : }
|