LCOV - code coverage report
Current view: top level - src - ZeroCopyPublisher.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 39 39 100.0 %
Date: 2025-11-29 00:25:29 Functions: 5 5 100.0 %

          Line data    Source code
       1             : #include "felix-server/ZeroCopyPublisher.hpp"
       2             : 
       3             : #include <algorithm>
       4             : 
       5             : #include <netio3-backend/Netio3Backend.hpp>
       6             : #include <netio3/NetioPublisher.hpp>
       7             : 
       8             : #include "felix-server/Exceptions.hpp"
       9             : #include "felix-server/Utility.hpp"
      10             : 
      11          11 : felix_server::ZeroCopyPublisher::ZeroCopyPublisher(Settings settings,
      12             :                                                    std::shared_ptr<netio3::BaseEventLoop> evloop,
      13          11 :                                                    std::span<const std::uint64_t> tags) :
      14          22 :   m_settings{std::move(settings)},
      15          11 :   m_bus_writer{
      16          11 :     m_settings.bus_path,
      17          11 :     m_settings.bus_group,
      18             :   },
      19          12 :   m_publisher{{
      20          11 :                 .backend_type = m_settings.network_type,
      21          11 :                 .backend_mode = utility::get_network_mode(m_settings.network_type),
      22          11 :                 .buffersize = m_settings.buffer_size,
      23          11 :                 .mr_start = m_settings.mr_start,
      24             :                 .method = netio3::SendMethod::ZERO_COPY,
      25             :               },
      26          11 :               std::move(evloop),
      27             :               {
      28          11 :                 m_settings.ip,
      29          11 :                 m_settings.port,
      30             :               }}
      31             : {
      32             :   try {
      33          10 :     declare(tags);
      34           2 :   } catch (const std::exception& e) {
      35           2 :     throw BusError(e);
      36           2 :   }
      37           8 :   m_publisher.set_on_subscription(m_settings.on_sub);
      38           8 :   m_publisher.set_on_unsubscription(m_settings.on_unsub);
      39           8 :   m_publisher.set_on_publish_completed(m_settings.on_publish_completed);
      40          16 : }
      41             : 
      42           1 : netio3::NetioPublisherStatus felix_server::ZeroCopyPublisher::publish(std::uint64_t tag,
      43             :                                                                       std::span<const iovec> data,
      44             :                                                                       bool retry,
      45             :                                                                       std::uint8_t user_status,
      46             :                                                                       std::uint64_t key)
      47             : {
      48           1 :   return m_publisher.publish(tag, data, retry, user_status, key);
      49             : }
      50             : 
      51           3 : netio3::NetioPublisherStatus felix_server::ZeroCopyPublisher::publish(std::uint64_t tag,
      52             :                                                                       std::span<std::uint8_t> data,
      53             :                                                                       bool retry,
      54             :                                                                       std::uint8_t user_status,
      55             :                                                                       std::uint64_t key)
      56             : {
      57           3 :   return m_publisher.publish(tag, data, retry, user_status, key);
      58             : }
      59             : 
      60          10 : void felix_server::ZeroCopyPublisher::declare(std::span<const std::uint64_t> tags)
      61             : {
      62          10 :   auto info = std::vector<felixbus::FelixBusInfo>{};
      63          10 :   info.reserve(tags.size());
      64          10 :   std::ranges::transform(tags, std::back_inserter(info), [this](const auto& tag) {
      65             :     return felixbus::FelixBusInfo{
      66             :       .fid = tag,
      67          34 :       .ip = m_publisher.get_endpoint().address(),
      68          34 :       .port = m_publisher.get_endpoint().port(),
      69             :       .unbuffered = false,
      70             :       .pubsub = true,
      71          17 :       .tcp = m_settings.network_type == netio3::NetworkType::ASYNCMSG,
      72             :       .stream = false,
      73          17 :       .netio_pages = m_settings.num_buffers_receiver,
      74          17 :       .netio_pagesize = m_settings.buffer_size,
      75          34 :     };
      76         119 :   });
      77          10 :   m_bus_writer.publish(info, m_settings.bus_filename);
      78          10 : }

Generated by: LCOV version 1.0