Line data Source code
1 : #include <bits/types/struct_iovec.h>
2 : #include <cassert>
3 : #include <sstream>
4 :
5 : #include "network/netio_zerocopy_publisher.hpp"
6 : #include "tohost_monitor.hpp"
7 :
8 :
9 10 : NetioZerocopyPublisher::NetioZerocopyPublisher(
10 : const std::string &ip, uint32_t port, Bus& bus,
11 : unsigned int netio_pn, unsigned int netio_ps, unsigned int max_msg_size,
12 10 : uint64_t dma_buffer_vaddr, size_t dma_size)
13 10 : : m_ip{ip},
14 10 : m_port(port),
15 10 : m_bus(bus),
16 10 : m_dma_buffer{reinterpret_cast<void*>(dma_buffer_vaddr), dma_size, nullptr, 0},
17 10 : m_ct(new CompletionTable()),
18 10 : m_socket_attr{netio_pn, netio_ps},
19 30 : m_context(new netio_context())
20 : {
21 10 : netio_init(m_context.get());
22 10 : init_publish_socket(max_msg_size);
23 20 : m_event_loop_thread = std::thread([this, port]{ eventLoop(port); });
24 10 : }
25 :
26 :
27 0 : NetioZerocopyPublisher::NetioZerocopyPublisher(
28 : const std::string &ip, uint32_t port, Bus& bus,
29 : unsigned int netio_pn, unsigned int netio_ps, unsigned int max_msg_size,
30 0 : uint64_t dma_buffer_vaddr, size_t dma_size, NetioEventLoop& evloop)
31 0 : : m_ip{ip},
32 0 : m_port(port),
33 0 : m_bus(bus),
34 0 : m_dma_buffer{reinterpret_cast<void*>(dma_buffer_vaddr), dma_size, nullptr, 0},
35 0 : m_ct(new CompletionTable()),
36 0 : m_socket_attr{netio_pn, netio_ps},
37 0 : m_context(evloop.ctx)
38 : {
39 0 : init_publish_socket(max_msg_size);
40 0 : }
41 :
42 :
43 10 : void NetioZerocopyPublisher::init_publish_socket(unsigned int max_msg_size)
44 : {
45 10 : unsigned int max_msg_by_net = m_socket_attr.buffer_size - sizeof(felix_id_t) - sizeof(uint8_t);
46 10 : m_max_msg_size = max_msg_size < max_msg_by_net ? max_msg_size : max_msg_by_net;
47 10 : m_max_iov_len = NETIO_MAX_IOV_LEN -1;
48 10 : memset(&m_socket, 0, sizeof m_socket);
49 10 : netio_unbuffered_publish_socket_init(&m_socket, m_context.get(), m_ip.c_str(), m_port, &m_dma_buffer);
50 10 : LOG_DBG("Initialised netio-next unbuffered_publish_socket. Port %u PROVIDER: %s", m_port ,m_socket.lsocket.fi->fabric_attr->prov_name);
51 10 : m_socket.usr = this;
52 10 : m_socket.cb_msg_published = cb_on_msg_published;
53 :
54 10 : netio_timer_init(&(m_context->evloop), &m_timer);
55 10 : netio_signal_init(&(m_context->evloop), &m_signal);
56 10 : }
57 :
58 :
59 20 : NetioZerocopyPublisher::~NetioZerocopyPublisher()
60 : {
61 10 : netio_timer_stop(&m_timer);
62 10 : netio_terminate_signal(&(m_context->evloop));
63 10 : if ( m_event_loop_thread.get_id() != std::thread::id() ) {
64 10 : m_event_loop_thread.join();
65 : }
66 30 : }
67 :
68 10 : void NetioZerocopyPublisher::eventLoop(uint32_t port)
69 : {
70 10 : std::ostringstream out;
71 10 : out << "zerocopypub[" << port << "]";
72 10 : std::string s = out.str();
73 10 : pthread_setname_np(pthread_self(), s.c_str());
74 10 : netio_run(&(m_context->evloop));
75 10 : }
76 :
77 :
78 10 : bool NetioZerocopyPublisher::declare(const std::vector<Elink> &elinks)
79 : {
80 10 : return m_bus.publish(elinks, m_ip, m_port, m_socket_attr.num_buffers, m_socket_attr.buffer_size, true, true); //TODO: pass pages
81 : }
82 :
83 :
84 211584273 : Publisher::Result NetioZerocopyPublisher::publish(felix_id_t fid, iovec *iov,
85 : uint32_t iovlen, size_t bytes, uint32_t block_addr, std::uint8_t status)
86 : {
87 211584273 : if(iovlen == 0){return Publisher::Result::OK;}
88 :
89 : // Truncate too big messages.
90 : // Thresholds are the max number of IOV entries (sender) and the page size (receiver).
91 : // + 1 for status byte
92 211584273 : if ( iovlen > m_max_iov_len or bytes > m_max_msg_size ){
93 13801 : status = truncate_msg_if_too_large(iov, iovlen, status);
94 13801 : *static_cast<uint8_t*>(iov[0].iov_base) = status;
95 : }
96 :
97 : //remove status byte
98 211584273 : if(iov[0].iov_len > 0){
99 10 : uint8_t* iov_ptr = static_cast<uint8_t*>(iov[0].iov_base);
100 10 : iov[0].iov_base = static_cast<void*>(iov_ptr + 1);
101 10 : --iov[0].iov_len;
102 : }
103 :
104 211584273 : StreamCache &cache = m_cache[(fid >> 16) & 0xfff].m_streams[fid & 0xff];
105 211584273 : unsigned netio_flags = 0;
106 :
107 211584273 : if (cache.m_again == 0) {
108 205203907 : cache.key = block_addr ;
109 205203907 : m_ct->push(block_addr);
110 : }
111 6380366 : else if (cache.m_again == 1) { //AGAIN
112 6380366 : cache.key = block_addr;
113 : }
114 : else { //PARTIAL
115 : //don't touch key
116 : netio_flags |= NETIO_REENTRY;
117 : }
118 :
119 211584273 : auto r = netio_unbuffered_publishv_usr(&m_socket, fid, iov, iovlen,
120 : &cache.key, netio_flags, &cache.m_cache, status, 1);
121 :
122 211584273 : if(r == NETIO_STATUS_OK || r == NETIO_STATUS_OK_NOSUB) {
123 205203907 : cache.m_again = 0;
124 205203907 : return Publisher::Result::OK;
125 : }
126 : else if(r == NETIO_STATUS_AGAIN) {
127 : // No data were sent, we need to redo the whole call
128 6380366 : cache.m_again = 1;
129 6380366 : return Publisher::Result::AGAIN;
130 : }
131 : else if(r == NETIO_STATUS_PARTIAL) {
132 : // Some data were sent, we need to redo the call but set NETIO_REENTRY
133 0 : cache.m_again = 2;
134 0 : return Publisher::Result::PARTIAL;
135 : }
136 : else if (r == NETIO_ERROR_MAX_IOV_EXCEEDED) {
137 : // Message too large, discarded.
138 0 : cache.m_again = 0;
139 0 : LOG_WARN("Message too large, IOV count %lu. Discarded.", iovlen);
140 0 : m_ct->update(block_addr);
141 0 : return Publisher::Result::ERROR_TOO_BIG;
142 : }
143 : else {
144 : //NETIO_STATUS_ERROR message discarded.
145 0 : cache.m_again = 0;
146 0 : LOG_WARN("Netio error, message discarded");
147 0 : m_ct->update(block_addr);
148 0 : return Publisher::Result::ERROR;
149 : }
150 : }
151 :
152 :
153 0 : Publisher::Result NetioZerocopyPublisher::publish(felix_id_t fid, uint8_t* data, size_t len)
154 : {
155 0 : iovec iov;
156 0 : iov.iov_base = static_cast<void*>(data);
157 0 : iov.iov_len = len;
158 : // TODO: status byte not handled correctly
159 0 : return publish(fid, &iov, 1, len, 0, 0);
160 : }
161 :
162 :
163 0 : Publisher::Result NetioZerocopyPublisher::flush(felix_id_t fid)
164 : {
165 : //Nothing to flush in zero-copy mode, harmless
166 0 : return Publisher::Result::OK;
167 : }
168 :
169 :
170 10 : void NetioZerocopyPublisher::set_periodic_callback(
171 : uint32_t period_us, Callback callback)
172 : {
173 10 : m_read_callback = callback;
174 10 : m_signal.data = this;
175 6380376 : m_signal.cb = [](void *ptr)
176 : {
177 6380366 : NetioZerocopyPublisher* self = static_cast<NetioZerocopyPublisher*>(ptr);
178 6380366 : if (self->m_read_callback())
179 : {
180 6362696 : netio_signal_fire(&self->m_signal);
181 : }
182 6380366 : };
183 :
184 10 : m_timer.data = this;
185 79903 : m_timer.cb = [](void *ptr)
186 : {
187 79893 : NetioZerocopyPublisher* self = static_cast<NetioZerocopyPublisher*>(ptr);
188 79893 : if (self->m_read_callback())
189 : {
190 17670 : netio_signal_fire(&self->m_signal);
191 : }
192 79893 : };
193 10 : netio_timer_start_us(&m_timer, period_us);
194 10 : }
195 :
196 :
197 0 : void NetioZerocopyPublisher::set_asynch_callback(Callback callback)
198 : {
199 0 : m_read_callback = callback;
200 0 : m_signal.data = this;
201 0 : m_signal.cb = [](void *ptr)
202 : {
203 0 : NetioZerocopyPublisher* self = static_cast<NetioZerocopyPublisher*>(ptr);
204 0 : if (self->m_read_callback())
205 : {
206 0 : netio_signal_fire(&self->m_signal);
207 : }
208 0 : };
209 0 : }
210 :
211 :
212 0 : void NetioZerocopyPublisher::fire_asynch_callback()
213 : {
214 0 : netio_signal_fire(&m_signal);
215 0 : }
216 :
217 :
218 205203907 : void NetioZerocopyPublisher::on_msg_published(uint64_t key)
219 : {
220 205203907 : uint32_t offset = key & 0xffffffff;
221 205203907 : m_ct->update(offset);
222 205203907 : }
223 :
224 :
225 103 : uint32_t NetioZerocopyPublisher::get_resource_counter()
226 : {
227 103 : return netio_pubsocket_get_available_co(&m_socket);
228 : }
229 :
230 :
231 103 : uint32_t NetioZerocopyPublisher::get_subscription_number()
232 : {
233 103 : return m_socket.subscription_table.num_subscriptions;
234 : }
235 :
|