Line data Source code
1 : #include "network/netio_buffered_receiver.hpp"
2 : #include "log.hpp"
3 :
4 5 : NetioBufferedReceiver::NetioBufferedReceiver(const std::string &ip, uint32_t port, Bus& bus,
5 5 : unsigned int netio_pn, unsigned int netio_ps)
6 5 : : m_ip{ip},
7 5 : m_port(port),
8 5 : m_bus(bus),
9 5 : m_socket_attr{netio_pn, netio_ps, 0, 0},
10 10 : m_context(new netio_context())
11 : {
12 5 : netio_init(m_context.get());
13 5 : init_listen_socket();
14 10 : m_event_loop_thread = std::thread([this, port]{ eventLoop(port); });
15 5 : }
16 :
17 :
18 0 : NetioBufferedReceiver::NetioBufferedReceiver(const std::string &ip, uint32_t port, Bus& bus,
19 0 : unsigned int netio_pn, unsigned int netio_ps, NetioEventLoop& evloop)
20 0 : : m_ip{ip},
21 0 : m_port(port),
22 0 : m_bus(bus),
23 0 : m_socket_attr{netio_pn, netio_ps, 0, 0},
24 0 : m_context(evloop.ctx)
25 : {
26 0 : init_listen_socket();
27 0 : }
28 :
29 :
30 5 : void NetioBufferedReceiver::init_listen_socket()
31 : {
32 5 : memset(&m_socket, 0, sizeof m_socket);
33 5 : netio_buffered_listen_socket_init(&m_socket, m_context.get(), &m_socket_attr);
34 5 : m_socket.usr = this;
35 5 : m_socket.cb_connection_established = cb_on_connection_established;
36 5 : m_socket.cb_connection_closed = cb_on_connection_close;
37 5 : m_socket.cb_msg_received = cb_buffered_msg_received;
38 5 : netio_buffered_listen(&m_socket, netio_hostname(m_ip.c_str()), m_port);
39 5 : LOG_DBG("Initialised netio-next buffered_listen_socket. Port %u PROVIDER: %s", m_port, m_socket.listen_socket.fi->fabric_attr->prov_name);
40 5 : }
41 :
42 :
43 15 : NetioBufferedReceiver::~NetioBufferedReceiver()
44 : {
45 5 : netio_terminate_signal(&(m_context->evloop));
46 5 : if ( m_event_loop_thread.get_id() != std::thread::id() ) {
47 5 : m_event_loop_thread.join();
48 : }
49 15 : }
50 :
51 :
52 5 : bool NetioBufferedReceiver::declare(const std::vector<Elink> &elinks)
53 : {
54 5 : bool pubsub = false;
55 5 : bool unbuffered = false;
56 5 : uint32_t n_pages = m_socket_attr.num_pages;
57 5 : uint32_t sz_page = m_socket_attr.pagesize;
58 5 : return m_bus.publish(elinks, m_ip, m_port, n_pages, sz_page, pubsub, unbuffered);
59 : }
60 :
61 :
62 5 : void NetioBufferedReceiver::eventLoop(uint32_t port)
63 : {
64 5 : std::ostringstream out;
65 5 : out << "rec[" << port << "]";
66 5 : std::string s = out.str();
67 5 : pthread_setname_np(pthread_self(), s.c_str());
68 :
69 5 : netio_run(&(m_context->evloop));
70 5 : }
|