LCOV - code coverage report
Current view: top level - src/network - netio_zerocopy_publisher.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 86 126 68.3 %
Date: 2025-09-09 12:09:29 Functions: 13 19 68.4 %

          Line data    Source code
       1             : #include <bits/types/struct_iovec.h>
       2             : #include <cassert>
       3             : #include <sstream>
       4             : 
       5             : #include "network/netio_zerocopy_publisher.hpp"
       6             : #include "tohost_monitor.hpp"
       7             : 
       8             : 
       9          10 : NetioZerocopyPublisher::NetioZerocopyPublisher(
      10             :     const std::string &ip, uint32_t port, Bus& bus,
      11             :     unsigned int netio_pn, unsigned int netio_ps, unsigned int max_msg_size,
      12          10 :     uint64_t dma_buffer_vaddr, size_t dma_size)
      13          10 :     : m_ip{ip},
      14          10 :       m_port(port),
      15          10 :       m_bus(bus),
      16          10 :       m_dma_buffer{reinterpret_cast<void*>(dma_buffer_vaddr), dma_size, nullptr, 0},
      17          10 :       m_ct(new CompletionTable()),
      18          10 :       m_socket_attr{netio_pn, netio_ps},
      19          30 :       m_context(new netio_context())
      20             : {
      21          10 :     netio_init(m_context.get());
      22          10 :     init_publish_socket(max_msg_size);
      23          20 :     m_event_loop_thread = std::thread([this, port]{ eventLoop(port); });
      24          10 : }
      25             : 
      26             : 
      27           0 : NetioZerocopyPublisher::NetioZerocopyPublisher(
      28             :     const std::string &ip, uint32_t port, Bus& bus,
      29             :     unsigned int netio_pn, unsigned int netio_ps, unsigned int max_msg_size,
      30           0 :     uint64_t dma_buffer_vaddr, size_t dma_size, NetioEventLoop& evloop)
      31           0 :     : m_ip{ip},
      32           0 :       m_port(port),
      33           0 :       m_bus(bus),
      34           0 :       m_dma_buffer{reinterpret_cast<void*>(dma_buffer_vaddr), dma_size, nullptr, 0},
      35           0 :       m_ct(new CompletionTable()),
      36           0 :       m_socket_attr{netio_pn, netio_ps},
      37           0 :       m_context(evloop.ctx)
      38             : {
      39           0 :     init_publish_socket(max_msg_size);
      40           0 : }
      41             : 
      42             : 
      43          10 : void NetioZerocopyPublisher::init_publish_socket(unsigned int max_msg_size)
      44             : {
      45          10 :     unsigned int max_msg_by_net = m_socket_attr.buffer_size - sizeof(felix_id_t) - sizeof(uint8_t);
      46          10 :     m_max_msg_size = max_msg_size < max_msg_by_net ? max_msg_size : max_msg_by_net;
      47          10 :     m_max_iov_len = NETIO_MAX_IOV_LEN -1;
      48          10 :     memset(&m_socket, 0, sizeof m_socket);
      49          10 :     netio_unbuffered_publish_socket_init(&m_socket, m_context.get(), m_ip.c_str(), m_port, &m_dma_buffer);
      50          10 :     LOG_DBG("Initialised netio-next unbuffered_publish_socket. Port %u PROVIDER: %s", m_port ,m_socket.lsocket.fi->fabric_attr->prov_name);
      51          10 :     m_socket.usr = this;
      52          10 :     m_socket.cb_msg_published = cb_on_msg_published;
      53             : 
      54          10 :     netio_timer_init(&(m_context->evloop), &m_timer);
      55          10 :     netio_signal_init(&(m_context->evloop), &m_signal);
      56          10 : }
      57             : 
      58             : 
      59          20 : NetioZerocopyPublisher::~NetioZerocopyPublisher()
      60             : {
      61          10 :     netio_timer_stop(&m_timer);
      62          10 :     netio_terminate_signal(&(m_context->evloop));
      63          10 :     if ( m_event_loop_thread.get_id() != std::thread::id() ) {
      64          10 :         m_event_loop_thread.join();
      65             :     }
      66          30 : }
      67             : 
      68          10 : void NetioZerocopyPublisher::eventLoop(uint32_t port)
      69             : {
      70          10 :     std::ostringstream out;
      71          10 :     out << "zerocopypub[" << port << "]";
      72          10 :     std::string s = out.str();
      73          10 :     pthread_setname_np(pthread_self(), s.c_str());
      74          10 :     netio_run(&(m_context->evloop));
      75          10 : }
      76             : 
      77             : 
      78          10 : bool NetioZerocopyPublisher::declare(const std::vector<Elink> &elinks)
      79             : {
      80          10 :     return m_bus.publish(elinks, m_ip, m_port, m_socket_attr.num_buffers, m_socket_attr.buffer_size, true, true); //TODO: pass pages
      81             : }
      82             : 
      83             : 
      84   211584273 : Publisher::Result NetioZerocopyPublisher::publish(felix_id_t fid, iovec *iov,
      85             :      uint32_t iovlen, size_t bytes, uint32_t block_addr, std::uint8_t status)
      86             : {
      87   211584273 :     if(iovlen == 0){return Publisher::Result::OK;}
      88             : 
      89             :     // Truncate too big messages.
      90             :     // Thresholds are the max number of IOV entries (sender) and the page size (receiver).
      91             :     // + 1 for status byte
      92   211584273 :     if ( iovlen > m_max_iov_len or bytes > m_max_msg_size ){
      93       13801 :         status = truncate_msg_if_too_large(iov, iovlen, status);
      94       13801 :         *static_cast<uint8_t*>(iov[0].iov_base) = status;
      95             :     }
      96             : 
      97             :     //remove status byte
      98   211584273 :     if(iov[0].iov_len > 0){
      99          10 :         uint8_t* iov_ptr = static_cast<uint8_t*>(iov[0].iov_base);
     100          10 :         iov[0].iov_base = static_cast<void*>(iov_ptr + 1);
     101          10 :         --iov[0].iov_len;
     102             :     }
     103             : 
     104   211584273 :     StreamCache &cache = m_cache[(fid >> 16) & 0xfff].m_streams[fid & 0xff];
     105   211584273 :     unsigned netio_flags = 0;
     106             :     
     107   211584273 :     if (cache.m_again == 0) {
     108   205203907 :         cache.key = block_addr ;
     109   205203907 :         m_ct->push(block_addr);
     110             :     }
     111     6380366 :     else if (cache.m_again == 1) { //AGAIN
     112     6380366 :         cache.key = block_addr;
     113             :     }
     114             :     else { //PARTIAL
     115             :         //don't touch key
     116             :         netio_flags |= NETIO_REENTRY;
     117             :     }
     118             : 
     119   211584273 :     auto r = netio_unbuffered_publishv_usr(&m_socket, fid, iov, iovlen,
     120             :         &cache.key, netio_flags, &cache.m_cache, status, 1);
     121             : 
     122   211584273 :     if(r == NETIO_STATUS_OK || r == NETIO_STATUS_OK_NOSUB) {
     123   205203907 :         cache.m_again = 0;
     124   205203907 :         return Publisher::Result::OK;
     125             :     }
     126             :     else if(r == NETIO_STATUS_AGAIN) {
     127             :         // No data were sent, we need to redo the whole call
     128     6380366 :         cache.m_again = 1;
     129     6380366 :         return Publisher::Result::AGAIN;
     130             :     }
     131             :     else if(r == NETIO_STATUS_PARTIAL) {
     132             :         // Some data were sent, we need to redo the call but set NETIO_REENTRY
     133           0 :         cache.m_again = 2;
     134           0 :         return Publisher::Result::PARTIAL;
     135             :     }
     136             :     else if (r == NETIO_ERROR_MAX_IOV_EXCEEDED) {
     137             :         // Message too large, discarded.
     138           0 :         cache.m_again = 0;
     139           0 :         LOG_WARN("Message too large, IOV count %lu. Discarded.", iovlen);
     140           0 :         m_ct->update(block_addr);
     141           0 :         return Publisher::Result::ERROR_TOO_BIG;
     142             :     }
     143             :     else {
     144             :         //NETIO_STATUS_ERROR message discarded.
     145           0 :         cache.m_again = 0;
     146           0 :         LOG_WARN("Netio error, message discarded");
     147           0 :         m_ct->update(block_addr);
     148           0 :         return Publisher::Result::ERROR;
     149             :     }
     150             : }
     151             : 
     152             : 
     153           0 : Publisher::Result NetioZerocopyPublisher::publish(felix_id_t fid, uint8_t* data, size_t len)
     154             : {
     155           0 :     iovec iov;
     156           0 :     iov.iov_base = static_cast<void*>(data);
     157           0 :     iov.iov_len = len;
     158             :     // TODO: status byte not handled correctly
     159           0 :     return publish(fid, &iov, 1, len, 0, 0);
     160             : }
     161             : 
     162             : 
     163           0 : Publisher::Result NetioZerocopyPublisher::flush(felix_id_t fid)
     164             : {
     165             :     //Nothing to flush in zero-copy mode, harmless
     166           0 :     return Publisher::Result::OK;
     167             : }
     168             : 
     169             : 
     170          10 : void NetioZerocopyPublisher::set_periodic_callback(
     171             :     uint32_t period_us, Callback callback)
     172             : {
     173          10 :     m_read_callback = callback;
     174          10 :     m_signal.data = this;
     175     6380376 :     m_signal.cb = [](void *ptr)
     176             :     {
     177     6380366 :         NetioZerocopyPublisher* self = static_cast<NetioZerocopyPublisher*>(ptr);
     178     6380366 :         if (self->m_read_callback())
     179             :         {
     180     6362696 :             netio_signal_fire(&self->m_signal);
     181             :         }
     182     6380366 :     };
     183             : 
     184          10 :     m_timer.data = this;
     185       79903 :     m_timer.cb = [](void *ptr)
     186             :     {
     187       79893 :         NetioZerocopyPublisher* self = static_cast<NetioZerocopyPublisher*>(ptr);
     188       79893 :         if (self->m_read_callback())
     189             :         {
     190       17670 :             netio_signal_fire(&self->m_signal);
     191             :         }
     192       79893 :     };
     193          10 :     netio_timer_start_us(&m_timer, period_us);
     194          10 : }
     195             : 
     196             : 
     197           0 : void NetioZerocopyPublisher::set_asynch_callback(Callback callback)
     198             : {
     199           0 :     m_read_callback = callback;
     200           0 :     m_signal.data = this;
     201           0 :     m_signal.cb = [](void *ptr)
     202             :     {
     203           0 :         NetioZerocopyPublisher* self = static_cast<NetioZerocopyPublisher*>(ptr);
     204           0 :         if (self->m_read_callback())
     205             :         {
     206           0 :             netio_signal_fire(&self->m_signal);
     207             :         }
     208           0 :     };
     209           0 : }
     210             : 
     211             : 
     212           0 : void NetioZerocopyPublisher::fire_asynch_callback()
     213             : {
     214           0 :     netio_signal_fire(&m_signal);
     215           0 : }
     216             : 
     217             : 
     218   205203907 : void NetioZerocopyPublisher::on_msg_published(uint64_t key)
     219             : {
     220   205203907 :     uint32_t offset = key & 0xffffffff;
     221   205203907 :     m_ct->update(offset);
     222   205203907 : }
     223             : 
     224             : 
     225         103 : uint32_t NetioZerocopyPublisher::get_resource_counter()
     226             : {
     227         103 :     return netio_pubsocket_get_available_co(&m_socket);
     228             : }
     229             : 
     230             : 
     231         103 : uint32_t NetioZerocopyPublisher::get_subscription_number()
     232             : {
     233         103 :     return m_socket.subscription_table.num_subscriptions;
     234             : }
     235             : 

Generated by: LCOV version 1.0