Line data Source code
1 : #include "felix-server/BufferedPublisher.hpp"
2 :
3 : #include <algorithm>
4 :
5 : #include <netio3-backend/Netio3Backend.hpp>
6 : #include <netio3/NetioPublisher.hpp>
7 :
8 : #include "felix-server/Exceptions.hpp"
9 : #include "felix-server/Utility.hpp"
10 :
11 11 : felix_server::BufferedPublisher::BufferedPublisher(Settings settings,
12 : std::shared_ptr<netio3::BaseEventLoop> evloop,
13 11 : std::span<const std::uint64_t> tags) :
14 22 : m_settings{std::move(settings)},
15 11 : m_bus_writer{
16 11 : m_settings.bus_path,
17 11 : m_settings.bus_group,
18 : },
19 12 : m_publisher{{
20 11 : .backend_type = m_settings.network_type,
21 11 : .backend_mode = utility::get_network_mode(m_settings.network_type),
22 11 : .buffersize = m_settings.buffer_size,
23 11 : .nbuffers = m_settings.buffer_count,
24 11 : .flush_interval = m_settings.buffer_timeout,
25 : .method = netio3::SendMethod::BUFFERED,
26 : },
27 11 : std::move(evloop),
28 : {
29 11 : m_settings.ip,
30 11 : m_settings.port,
31 : }}
32 : {
33 : try {
34 10 : declare(tags);
35 2 : } catch (const std::exception& e) {
36 2 : throw BusError(e);
37 2 : }
38 8 : m_publisher.set_on_subscription(m_settings.on_sub);
39 8 : m_publisher.set_on_unsubscription(m_settings.on_unsub);
40 16 : }
41 :
42 1 : netio3::NetioPublisherStatus felix_server::BufferedPublisher::publish(std::uint64_t tag,
43 : std::span<const iovec> data,
44 : bool retry,
45 : std::uint8_t user_status)
46 : {
47 1 : return m_publisher.publish(tag, data, retry, user_status);
48 : }
49 :
50 3 : netio3::NetioPublisherStatus felix_server::BufferedPublisher::publish(
51 : std::uint64_t tag,
52 : std::span<const std::uint8_t> data,
53 : bool retry,
54 : std::uint8_t user_status)
55 : {
56 3 : return m_publisher.publish(tag, data, retry, user_status);
57 : }
58 :
59 10 : void felix_server::BufferedPublisher::declare(std::span<const std::uint64_t> tags)
60 : {
61 10 : auto info = std::vector<felixbus::FelixBusInfo>{};
62 10 : info.reserve(tags.size());
63 10 : std::ranges::transform(tags, std::back_inserter(info), [this](const auto& tag) {
64 : return felixbus::FelixBusInfo{
65 : .fid = tag,
66 36 : .ip = m_publisher.get_endpoint().address(),
67 36 : .port = m_publisher.get_endpoint().port(),
68 : .unbuffered = false,
69 : .pubsub = true,
70 18 : .tcp = m_settings.network_type == netio3::NetworkType::ASYNCMSG,
71 : .stream = false,
72 18 : .netio_pages = m_settings.buffer_count,
73 18 : .netio_pagesize = m_settings.buffer_size,
74 36 : };
75 126 : });
76 10 : m_bus_writer.publish(info, m_settings.bus_filename);
77 10 : }
|