Line data Source code
1 : #include "network/netio_unbuffered_receiver.hpp" 2 : #include "log.hpp" 3 : 4 0 : NetioUnbufferedReceiver::NetioUnbufferedReceiver(const std::string &ip, uint32_t port, Bus& bus, 5 0 : unsigned int netio_pn, unsigned int netio_ps) 6 0 : : m_ip{ip}, 7 0 : m_port(port), 8 0 : m_bus(bus), 9 0 : m_socket_attr{netio_pn, netio_ps}, 10 0 : m_context(new netio_context()) 11 : { 12 0 : netio_init(m_context.get()); 13 0 : init_listen_socket(); 14 0 : m_event_loop_thread = std::thread([this, port]{ eventLoop(port); }); 15 0 : } 16 : 17 : 18 0 : NetioUnbufferedReceiver::NetioUnbufferedReceiver(const std::string &ip, uint32_t port, Bus& bus, 19 0 : unsigned int netio_pn, unsigned int netio_ps, NetioEventLoop& evloop) 20 0 : : m_ip{ip}, 21 0 : m_port(port), 22 0 : m_bus(bus), 23 0 : m_socket_attr{netio_pn, netio_ps}, 24 0 : m_context(evloop.ctx) 25 : { 26 0 : init_listen_socket(); 27 0 : } 28 : 29 : 30 0 : void NetioUnbufferedReceiver::init_listen_socket() 31 : { 32 0 : memset(&m_socket, 0, sizeof m_socket); 33 0 : netio_init_listen_socket(&m_socket, m_context.get(), &m_socket_attr); 34 0 : m_socket.usr = this; 35 0 : m_socket.cb_connection_established = cb_on_connection_established; 36 0 : m_socket.cb_connection_closed = cb_on_connection_close; 37 0 : m_socket.cb_msg_received = cb_msg_received; 38 0 : netio_listen(&m_socket, netio_hostname(m_ip.c_str()), m_port); 39 0 : LOG_DBG("Initialised netio-next listen_socket. Port %u PROVIDER: %s", m_port, m_socket.fi->fabric_attr->prov_name); 40 0 : } 41 : 42 : 43 0 : NetioUnbufferedReceiver::~NetioUnbufferedReceiver() 44 : { 45 0 : netio_terminate_signal(&(m_context->evloop)); 46 0 : if ( m_event_loop_thread.get_id() != std::thread::id() ) { 47 0 : m_event_loop_thread.join(); 48 : } 49 0 : } 50 : 51 : 52 0 : bool NetioUnbufferedReceiver::declare(const std::vector<Elink> &elinks) 53 : { 54 0 : bool pubsub = false; 55 0 : bool unbuffered = true; 56 0 : uint32_t n_pages = m_socket_attr.num_buffers; 57 0 : uint32_t sz_page = m_socket_attr.buffer_size; 58 0 : return m_bus.publish(elinks, m_ip, m_port, n_pages, sz_page, pubsub, unbuffered); 59 : } 60 : 61 : 62 0 : void NetioUnbufferedReceiver::eventLoop(uint32_t port) 63 : { 64 0 : std::ostringstream out; 65 0 : out << "rec[" << port << "]"; 66 0 : std::string s = out.str(); 67 0 : pthread_setname_np(pthread_self(), s.c_str()); 68 : 69 0 : netio_run(&(m_context->evloop)); 70 0 : }