LCOV - code coverage report
Current view: top level - src/network - netio_buffered_receiver.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 39 48 81.2 %
Date: 2025-09-09 12:09:29 Functions: 6 7 85.7 %

          Line data    Source code
       1             : #include "network/netio_buffered_receiver.hpp"
       2             : #include "log.hpp"
       3             : 
       4           5 : NetioBufferedReceiver::NetioBufferedReceiver(const std::string &ip, uint32_t port, Bus& bus,
       5           5 :     unsigned int netio_pn, unsigned int netio_ps)
       6           5 :     : m_ip{ip},
       7           5 :       m_port(port),
       8           5 :       m_bus(bus),
       9           5 :       m_socket_attr{netio_pn, netio_ps, 0, 0},
      10          10 :       m_context(new netio_context())
      11             : {
      12           5 :     netio_init(m_context.get());
      13           5 :     init_listen_socket();
      14          10 :     m_event_loop_thread = std::thread([this, port]{ eventLoop(port); });
      15           5 : }
      16             : 
      17             : 
      18           0 : NetioBufferedReceiver::NetioBufferedReceiver(const std::string &ip, uint32_t port, Bus& bus,
      19           0 :     unsigned int netio_pn, unsigned int netio_ps, NetioEventLoop& evloop)
      20           0 :     : m_ip{ip},
      21           0 :       m_port(port),
      22           0 :       m_bus(bus),
      23           0 :       m_socket_attr{netio_pn, netio_ps, 0, 0},
      24           0 :       m_context(evloop.ctx)
      25             : {
      26           0 :     init_listen_socket();
      27           0 : }
      28             : 
      29             : 
      30           5 : void NetioBufferedReceiver::init_listen_socket()
      31             : {
      32           5 :     memset(&m_socket, 0, sizeof m_socket);
      33           5 :     netio_buffered_listen_socket_init(&m_socket, m_context.get(), &m_socket_attr);
      34           5 :     m_socket.usr = this;
      35           5 :     m_socket.cb_connection_established = cb_on_connection_established;
      36           5 :     m_socket.cb_connection_closed = cb_on_connection_close;
      37           5 :     m_socket.cb_msg_received = cb_buffered_msg_received;
      38           5 :     netio_buffered_listen(&m_socket, netio_hostname(m_ip.c_str()), m_port);
      39           5 :     LOG_DBG("Initialised netio-next buffered_listen_socket. Port %u PROVIDER: %s", m_port, m_socket.listen_socket.fi->fabric_attr->prov_name);
      40           5 : }
      41             : 
      42             : 
      43          15 : NetioBufferedReceiver::~NetioBufferedReceiver()
      44             : {
      45           5 :     netio_terminate_signal(&(m_context->evloop));
      46           5 :     if ( m_event_loop_thread.get_id() != std::thread::id() ) {
      47           5 :         m_event_loop_thread.join();
      48             :     }
      49          15 : }
      50             : 
      51             : 
      52           5 : bool NetioBufferedReceiver::declare(const std::vector<Elink> &elinks)
      53             : {
      54           5 :     bool pubsub = false;
      55           5 :     bool unbuffered = false;
      56           5 :     uint32_t n_pages = m_socket_attr.num_pages;
      57           5 :     uint32_t sz_page = m_socket_attr.pagesize;
      58           5 :     return m_bus.publish(elinks, m_ip, m_port, n_pages, sz_page, pubsub, unbuffered);
      59             : }
      60             : 
      61             : 
      62           5 : void NetioBufferedReceiver::eventLoop(uint32_t port)
      63             : {
      64           5 :     std::ostringstream out;
      65           5 :     out << "rec[" << port << "]";
      66           5 :     std::string s = out.str();
      67           5 :     pthread_setname_np(pthread_self(), s.c_str());
      68             : 
      69           5 :     netio_run(&(m_context->evloop));
      70           5 : }

Generated by: LCOV version 1.0