Line data Source code
1 : #include <cassert>
2 : #include <sstream>
3 :
4 : #include "network/netio_buffered_publisher.hpp"
5 :
6 :
7 14 : NetioBufferedPublisher::NetioBufferedPublisher(
8 : const std::string &ip, uint32_t port,
9 : Bus& bus, unsigned int netio_pn, unsigned int netio_ps,
10 14 : unsigned int netio_wm, unsigned int netio_to, unsigned int max_msg_size)
11 14 : : m_ip{ip},
12 14 : m_port(port),
13 14 : m_bus(bus),
14 14 : m_socket_attr{netio_pn, netio_ps, netio_wm, netio_to},
15 28 : m_context(new netio_context())
16 : {
17 14 : netio_init(m_context.get());
18 14 : init_publish_socket(max_msg_size);
19 28 : m_event_loop_thread = std::thread([this, port]{ eventLoop(port); });
20 14 : }
21 :
22 :
23 0 : NetioBufferedPublisher::NetioBufferedPublisher(
24 : const std::string &ip, uint32_t port,
25 : Bus& bus, unsigned int netio_pn, unsigned int netio_ps, unsigned int netio_wm,
26 0 : unsigned int netio_to, unsigned int max_msg_size, NetioEventLoop& evloop)
27 0 : : m_ip{ip},
28 0 : m_port(port),
29 0 : m_bus(bus),
30 0 : m_socket_attr{netio_pn, netio_ps, netio_wm, netio_to},
31 0 : m_context(evloop.ctx)
32 : {
33 0 : init_publish_socket(max_msg_size);
34 0 : }
35 :
36 :
37 14 : void NetioBufferedPublisher::init_publish_socket(unsigned int max_msg_size)
38 : {
39 14 : unsigned int max_msg_by_net = m_socket_attr.pagesize - sizeof(felix_id_t) - sizeof(uint32_t); //netio buffer structure: fid | size | msg
40 14 : m_max_msg_size = max_msg_size < max_msg_by_net ? max_msg_size : max_msg_by_net;
41 14 : m_max_iov_len = 0; //No limit, all copied in one IOV in netio-next
42 :
43 14 : memset(&m_socket, 0, sizeof m_socket);
44 14 : netio_publish_socket_init(&m_socket, m_context.get(), m_ip.c_str(), m_port, &m_socket_attr);
45 14 : LOG_DBG("Initialised netio-next buffered_publish_socket. Port %u PROVIDER: %s", m_port, m_socket.lsocket.fi->fabric_attr->prov_name);
46 14 : netio_timer_init(&(m_context->evloop), &m_timer);
47 14 : netio_signal_init(&(m_context->evloop), &m_signal);
48 :
49 14 : m_socket.usr = this;
50 14 : m_socket.cb_buffer_available = cb_on_buffer_available;
51 14 : }
52 :
53 :
54 42 : NetioBufferedPublisher::~NetioBufferedPublisher()
55 : {
56 14 : netio_timer_stop(&m_timer);
57 14 : netio_terminate_signal(&(m_context->evloop));
58 14 : if ( m_event_loop_thread.get_id() != std::thread::id() ) {
59 14 : m_event_loop_thread.join();
60 : }
61 42 : }
62 :
63 :
64 14 : void NetioBufferedPublisher::eventLoop(uint32_t port)
65 : {
66 14 : std::ostringstream out;
67 14 : out << "pub[" << port << "]";
68 14 : std::string s = out.str();
69 14 : pthread_setname_np(pthread_self(), s.c_str());
70 :
71 14 : netio_run(&(m_context->evloop));
72 14 : }
73 :
74 :
75 14 : bool NetioBufferedPublisher::declare(const std::vector<Elink> &elinks)
76 : {
77 14 : return m_bus.publish(elinks, m_ip, m_port, m_socket_attr.num_pages, m_socket_attr.pagesize);
78 : }
79 :
80 :
81 602661329 : Publisher::Result NetioBufferedPublisher::publish(
82 : felix_id_t fid, iovec *iov, uint32_t iovlen, size_t bytes, uint32_t block_addr, std::uint8_t status)
83 : {
84 602661329 : StreamCache &cache = m_cache[(fid >> 16) & 0xfff].m_streams[fid & 0xff];
85 :
86 : // Truncate too big messages.
87 : // The threshold is the netio page size, used also by the receiver.
88 : // + 1 for status byte
89 602661329 : if (bytes >= m_max_msg_size){
90 17822 : status = truncate_msg_if_too_large(iov, iovlen, status);
91 17822 : *static_cast<uint8_t*>(iov[0].iov_base) = status;
92 : }
93 :
94 602661329 : auto r = netio_buffered_publishv(&m_socket, fid, iov, iovlen,
95 : cache.m_again, &cache.m_cache);
96 602682820 : cache.m_again = (r == NETIO_STATUS_AGAIN);
97 602682820 : return (Publisher::Result)r;
98 : }
99 :
100 :
101 12464650 : Publisher::Result NetioBufferedPublisher::flush(felix_id_t fid)
102 : {
103 12464650 : StreamCache &cache = m_cache[(fid >> 16) & 0xfff].m_streams[fid & 0xff];
104 12464650 : netio_buffered_publish_flush(&m_socket, fid, &cache.m_cache);
105 12464650 : return Publisher::Result::OK;
106 : }
107 :
108 :
109 0 : Publisher::Result NetioBufferedPublisher::publish(felix_id_t fid, uint8_t* data, size_t len)
110 : {
111 0 : StreamCache &cache = m_cache[(fid >> 16) & 0xfff].m_streams[fid & 0xff];
112 0 : auto r = netio_buffered_publish(&m_socket, fid, static_cast<void*>(data), len, cache.m_again, &cache.m_cache);
113 0 : cache.m_again = (r == NETIO_STATUS_AGAIN);
114 0 : return (Publisher::Result)r;
115 : }
116 :
117 :
118 14 : void NetioBufferedPublisher::set_periodic_callback(
119 : uint32_t period_us, Callback callback)
120 : {
121 14 : m_read_callback = callback;
122 14 : m_signal.data = this;
123 430502 : m_signal.cb = [](void *ptr)
124 : {
125 430488 : NetioBufferedPublisher* self = static_cast<NetioBufferedPublisher*>(ptr);
126 430488 : if (self->m_read_callback())
127 : {
128 214533 : netio_signal_fire(&self->m_signal);
129 : }
130 430488 : };
131 :
132 14 : m_timer.data = this;
133 50107 : m_timer.cb = [](void *ptr)
134 : {
135 50093 : NetioBufferedPublisher* self = static_cast<NetioBufferedPublisher*>(ptr);
136 50093 : if (self->m_read_callback())
137 : {
138 3881 : netio_signal_fire(&self->m_signal);
139 : }
140 50093 : };
141 14 : netio_timer_start_us(&m_timer, period_us);
142 14 : }
143 :
144 :
145 212074 : void NetioBufferedPublisher::on_buffer_available()
146 : {
147 212074 : ++m_buf_available_calls;
148 212074 : netio_signal_fire(&m_signal);
149 212074 : }
150 :
151 :
152 0 : void NetioBufferedPublisher::set_asynch_callback(Callback callback)
153 : {
154 0 : m_read_callback = callback;
155 0 : m_signal.data = this;
156 0 : m_signal.cb = [](void *ptr)
157 : {
158 0 : NetioBufferedPublisher* self = static_cast<NetioBufferedPublisher*>(ptr);
159 0 : if (self->m_read_callback())
160 : {
161 0 : netio_signal_fire(&self->m_signal);
162 : }
163 0 : };
164 0 : }
165 :
166 :
167 0 : void NetioBufferedPublisher::fire_asynch_callback()
168 : {
169 0 : netio_signal_fire(&m_signal);
170 0 : }
171 :
172 85 : uint32_t NetioBufferedPublisher::get_subscription_number()
173 : {
174 85 : return m_socket.subscription_table.num_subscriptions;
175 : }
176 :
177 :
178 85 : uint32_t NetioBufferedPublisher::get_resource_counter()
179 : {
180 85 : return netio_pubsocket_get_minimum_pages(&m_socket);
181 : }
|