LCOV - code coverage report
Current view: top level - src - BufferedPublisher.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 39 39 100.0 %
Date: 2025-11-29 00:25:29 Functions: 5 5 100.0 %

          Line data    Source code
       1             : #include "felix-server/BufferedPublisher.hpp"
       2             : 
       3             : #include <algorithm>
       4             : 
       5             : #include <netio3-backend/Netio3Backend.hpp>
       6             : #include <netio3/NetioPublisher.hpp>
       7             : 
       8             : #include "felix-server/Exceptions.hpp"
       9             : #include "felix-server/Utility.hpp"
      10             : 
      11          11 : felix_server::BufferedPublisher::BufferedPublisher(Settings settings,
      12             :                                                    std::shared_ptr<netio3::BaseEventLoop> evloop,
      13          11 :                                                    std::span<const std::uint64_t> tags) :
      14          22 :   m_settings{std::move(settings)},
      15          11 :   m_bus_writer{
      16          11 :     m_settings.bus_path,
      17          11 :     m_settings.bus_group,
      18             :   },
      19          12 :   m_publisher{{
      20          11 :                 .backend_type = m_settings.network_type,
      21          11 :                 .backend_mode = utility::get_network_mode(m_settings.network_type),
      22          11 :                 .buffersize = m_settings.buffer_size,
      23          11 :                 .nbuffers = m_settings.buffer_count,
      24          11 :                 .flush_interval = m_settings.buffer_timeout,
      25             :                 .method = netio3::SendMethod::BUFFERED,
      26             :               },
      27          11 :               std::move(evloop),
      28             :               {
      29          11 :                 m_settings.ip,
      30          11 :                 m_settings.port,
      31             :               }}
      32             : {
      33             :   try {
      34          10 :     declare(tags);
      35           2 :   } catch (const std::exception& e) {
      36           2 :     throw BusError(e);
      37           2 :   }
      38           8 :   m_publisher.set_on_subscription(m_settings.on_sub);
      39           8 :   m_publisher.set_on_unsubscription(m_settings.on_unsub);
      40          16 : }
      41             : 
      42           1 : netio3::NetioPublisherStatus felix_server::BufferedPublisher::publish(std::uint64_t tag,
      43             :                                                                       std::span<const iovec> data,
      44             :                                                                       bool retry,
      45             :                                                                       std::uint8_t user_status)
      46             : {
      47           1 :   return m_publisher.publish(tag, data, retry, user_status);
      48             : }
      49             : 
      50           3 : netio3::NetioPublisherStatus felix_server::BufferedPublisher::publish(
      51             :   std::uint64_t tag,
      52             :   std::span<const std::uint8_t> data,
      53             :   bool retry,
      54             :   std::uint8_t user_status)
      55             : {
      56           3 :   return m_publisher.publish(tag, data, retry, user_status);
      57             : }
      58             : 
      59          10 : void felix_server::BufferedPublisher::declare(std::span<const std::uint64_t> tags)
      60             : {
      61          10 :   auto info = std::vector<felixbus::FelixBusInfo>{};
      62          10 :   info.reserve(tags.size());
      63          10 :   std::ranges::transform(tags, std::back_inserter(info), [this](const auto& tag) {
      64             :     return felixbus::FelixBusInfo{
      65             :       .fid = tag,
      66          36 :       .ip = m_publisher.get_endpoint().address(),
      67          36 :       .port = m_publisher.get_endpoint().port(),
      68             :       .unbuffered = false,
      69             :       .pubsub = true,
      70          18 :       .tcp = m_settings.network_type == netio3::NetworkType::ASYNCMSG,
      71             :       .stream = false,
      72          18 :       .netio_pages = m_settings.buffer_count,
      73          18 :       .netio_pagesize = m_settings.buffer_size,
      74          36 :     };
      75         126 :   });
      76          10 :   m_bus_writer.publish(info, m_settings.bus_filename);
      77          10 : }

Generated by: LCOV version 1.0