LCOV - code coverage report
Current view: top level - src/network - netio_buffered_publisher.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 74 100 74.0 %
Date: 2025-09-09 12:09:29 Functions: 14 19 73.7 %

          Line data    Source code
       1             : #include <cassert>
       2             : #include <sstream>
       3             : 
       4             : #include "network/netio_buffered_publisher.hpp"
       5             : 
       6             : 
       7          14 : NetioBufferedPublisher::NetioBufferedPublisher(
       8             :     const std::string &ip, uint32_t port,
       9             :     Bus& bus, unsigned int netio_pn, unsigned int netio_ps,
      10          14 :     unsigned int netio_wm, unsigned int netio_to, unsigned int max_msg_size)
      11          14 :     : m_ip{ip},
      12          14 :       m_port(port),
      13          14 :       m_bus(bus),
      14          14 :       m_socket_attr{netio_pn, netio_ps, netio_wm, netio_to},
      15          28 :       m_context(new netio_context())
      16             : {
      17          14 :     netio_init(m_context.get());
      18          14 :     init_publish_socket(max_msg_size);
      19          28 :     m_event_loop_thread = std::thread([this, port]{ eventLoop(port); });
      20          14 : }
      21             : 
      22             : 
      23           0 : NetioBufferedPublisher::NetioBufferedPublisher(
      24             :     const std::string &ip, uint32_t port,
      25             :     Bus& bus, unsigned int netio_pn, unsigned int netio_ps, unsigned int netio_wm,
      26           0 :     unsigned int netio_to, unsigned int max_msg_size, NetioEventLoop& evloop)
      27           0 :     : m_ip{ip},
      28           0 :       m_port(port),
      29           0 :       m_bus(bus),
      30           0 :       m_socket_attr{netio_pn, netio_ps, netio_wm, netio_to},
      31           0 :       m_context(evloop.ctx)
      32             : {
      33           0 :     init_publish_socket(max_msg_size);
      34           0 : }
      35             : 
      36             : 
      37          14 : void NetioBufferedPublisher::init_publish_socket(unsigned int max_msg_size)
      38             : {
      39          14 :     unsigned int max_msg_by_net = m_socket_attr.pagesize - sizeof(felix_id_t) - sizeof(uint32_t); //netio buffer structure: fid | size | msg
      40          14 :     m_max_msg_size = max_msg_size < max_msg_by_net ? max_msg_size : max_msg_by_net;
      41          14 :     m_max_iov_len = 0; //No limit, all copied in one IOV in netio-next
      42             : 
      43          14 :     memset(&m_socket, 0, sizeof m_socket);
      44          14 :     netio_publish_socket_init(&m_socket, m_context.get(), m_ip.c_str(), m_port, &m_socket_attr);
      45          14 :     LOG_DBG("Initialised netio-next buffered_publish_socket. Port %u PROVIDER: %s", m_port, m_socket.lsocket.fi->fabric_attr->prov_name);
      46          14 :     netio_timer_init(&(m_context->evloop), &m_timer);
      47          14 :     netio_signal_init(&(m_context->evloop), &m_signal);
      48             : 
      49          14 :     m_socket.usr = this;
      50          14 :     m_socket.cb_buffer_available = cb_on_buffer_available;
      51          14 : }
      52             : 
      53             : 
      54          42 : NetioBufferedPublisher::~NetioBufferedPublisher()
      55             : {
      56          14 :     netio_timer_stop(&m_timer);
      57          14 :     netio_terminate_signal(&(m_context->evloop));
      58          14 :     if ( m_event_loop_thread.get_id() != std::thread::id() ) {
      59          14 :         m_event_loop_thread.join();
      60             :     }
      61          42 : }
      62             : 
      63             : 
      64          14 : void NetioBufferedPublisher::eventLoop(uint32_t port)
      65             : {
      66          14 :     std::ostringstream out;
      67          14 :     out << "pub[" << port << "]";
      68          14 :     std::string s = out.str();
      69          14 :     pthread_setname_np(pthread_self(), s.c_str());
      70             : 
      71          14 :     netio_run(&(m_context->evloop));
      72          14 : }
      73             : 
      74             : 
      75          14 : bool NetioBufferedPublisher::declare(const std::vector<Elink> &elinks)
      76             : {
      77          14 :     return m_bus.publish(elinks, m_ip, m_port, m_socket_attr.num_pages, m_socket_attr.pagesize);
      78             : }
      79             : 
      80             : 
      81   602661329 : Publisher::Result NetioBufferedPublisher::publish(
      82             :     felix_id_t fid, iovec *iov, uint32_t iovlen, size_t bytes, uint32_t block_addr, std::uint8_t status)
      83             : {
      84   602661329 :     StreamCache &cache = m_cache[(fid >> 16) & 0xfff].m_streams[fid & 0xff];
      85             : 
      86             :     // Truncate too big messages. 
      87             :     // The threshold is the netio page size, used also by the receiver.
      88             :     // + 1 for status byte
      89   602661329 :     if (bytes >= m_max_msg_size){
      90       17822 :         status = truncate_msg_if_too_large(iov, iovlen, status);
      91       17822 :         *static_cast<uint8_t*>(iov[0].iov_base) = status;
      92             :     }
      93             : 
      94   602661329 :     auto r = netio_buffered_publishv(&m_socket, fid, iov, iovlen,
      95             :                                      cache.m_again, &cache.m_cache);
      96   602682820 :     cache.m_again = (r == NETIO_STATUS_AGAIN);
      97   602682820 :     return (Publisher::Result)r;
      98             : }
      99             : 
     100             : 
     101    12464650 : Publisher::Result NetioBufferedPublisher::flush(felix_id_t fid)
     102             : {
     103    12464650 :     StreamCache &cache = m_cache[(fid >> 16) & 0xfff].m_streams[fid & 0xff];
     104    12464650 :     netio_buffered_publish_flush(&m_socket, fid, &cache.m_cache);
     105    12464650 :     return Publisher::Result::OK;
     106             : }
     107             : 
     108             : 
     109           0 : Publisher::Result NetioBufferedPublisher::publish(felix_id_t fid, uint8_t* data, size_t len)
     110             : {
     111           0 :     StreamCache &cache = m_cache[(fid >> 16) & 0xfff].m_streams[fid & 0xff];
     112           0 :     auto r = netio_buffered_publish(&m_socket, fid, static_cast<void*>(data), len, cache.m_again, &cache.m_cache);
     113           0 :     cache.m_again = (r == NETIO_STATUS_AGAIN);
     114           0 :     return (Publisher::Result)r;
     115             : }
     116             : 
     117             : 
     118          14 : void NetioBufferedPublisher::set_periodic_callback(
     119             :     uint32_t period_us, Callback callback)
     120             : {
     121          14 :     m_read_callback = callback;
     122          14 :     m_signal.data = this;
     123      430502 :     m_signal.cb = [](void *ptr)
     124             :     {
     125      430488 :         NetioBufferedPublisher* self = static_cast<NetioBufferedPublisher*>(ptr);
     126      430488 :         if (self->m_read_callback())
     127             :         {
     128      214533 :             netio_signal_fire(&self->m_signal);
     129             :         }
     130      430488 :     };
     131             : 
     132          14 :     m_timer.data = this;
     133       50107 :     m_timer.cb = [](void *ptr)
     134             :     {
     135       50093 :         NetioBufferedPublisher* self = static_cast<NetioBufferedPublisher*>(ptr);
     136       50093 :         if (self->m_read_callback())
     137             :         {
     138        3881 :             netio_signal_fire(&self->m_signal);
     139             :         }
     140       50093 :     };
     141          14 :     netio_timer_start_us(&m_timer, period_us);
     142          14 : }
     143             : 
     144             : 
     145      212074 : void NetioBufferedPublisher::on_buffer_available()
     146             : {
     147      212074 :     ++m_buf_available_calls;
     148      212074 :     netio_signal_fire(&m_signal);
     149      212074 : }
     150             : 
     151             : 
     152           0 : void NetioBufferedPublisher::set_asynch_callback(Callback callback)
     153             : {
     154           0 :     m_read_callback = callback;
     155           0 :     m_signal.data = this;
     156           0 :     m_signal.cb = [](void *ptr)
     157             :     {
     158           0 :         NetioBufferedPublisher* self = static_cast<NetioBufferedPublisher*>(ptr);
     159           0 :         if (self->m_read_callback())
     160             :         {
     161           0 :             netio_signal_fire(&self->m_signal);
     162             :         }
     163           0 :     };
     164           0 : }
     165             : 
     166             : 
     167           0 : void NetioBufferedPublisher::fire_asynch_callback()
     168             : {
     169           0 :     netio_signal_fire(&m_signal);
     170           0 : }
     171             : 
     172          85 : uint32_t NetioBufferedPublisher::get_subscription_number()
     173             : {
     174          85 :     return m_socket.subscription_table.num_subscriptions;
     175             : }
     176             : 
     177             : 
     178          85 : uint32_t NetioBufferedPublisher::get_resource_counter()
     179             : {
     180          85 :     return netio_pubsocket_get_minimum_pages(&m_socket);
     181             : }

Generated by: LCOV version 1.0