Line data Source code
1 : #include "felix-server/ZeroCopyPublisher.hpp"
2 :
3 : #include <algorithm>
4 :
5 : #include <netio3-backend/Netio3Backend.hpp>
6 : #include <netio3/NetioPublisher.hpp>
7 :
8 : #include "felix-server/Exceptions.hpp"
9 : #include "felix-server/Utility.hpp"
10 :
11 11 : felix_server::ZeroCopyPublisher::ZeroCopyPublisher(Settings settings,
12 : std::shared_ptr<netio3::BaseEventLoop> evloop,
13 11 : std::span<const std::uint64_t> tags) :
14 22 : m_settings{std::move(settings)},
15 11 : m_bus_writer{
16 11 : m_settings.bus_path,
17 11 : m_settings.bus_group,
18 : },
19 12 : m_publisher{{
20 11 : .backend_type = m_settings.network_type,
21 11 : .backend_mode = utility::get_network_mode(m_settings.network_type),
22 11 : .buffersize = m_settings.buffer_size,
23 11 : .mr_start = m_settings.mr_start,
24 : .method = netio3::SendMethod::ZERO_COPY,
25 : },
26 11 : std::move(evloop),
27 : {
28 11 : m_settings.ip,
29 11 : m_settings.port,
30 : }}
31 : {
32 : try {
33 10 : declare(tags);
34 2 : } catch (const std::exception& e) {
35 2 : throw BusError(e);
36 2 : }
37 8 : m_publisher.set_on_subscription(m_settings.on_sub);
38 8 : m_publisher.set_on_unsubscription(m_settings.on_unsub);
39 8 : m_publisher.set_on_publish_completed(m_settings.on_publish_completed);
40 16 : }
41 :
42 1 : netio3::NetioPublisherStatus felix_server::ZeroCopyPublisher::publish(std::uint64_t tag,
43 : std::span<const iovec> data,
44 : bool retry,
45 : std::uint8_t user_status,
46 : std::uint64_t key)
47 : {
48 1 : return m_publisher.publish(tag, data, retry, user_status, key);
49 : }
50 :
51 3 : netio3::NetioPublisherStatus felix_server::ZeroCopyPublisher::publish(std::uint64_t tag,
52 : std::span<std::uint8_t> data,
53 : bool retry,
54 : std::uint8_t user_status,
55 : std::uint64_t key)
56 : {
57 3 : return m_publisher.publish(tag, data, retry, user_status, key);
58 : }
59 :
60 10 : void felix_server::ZeroCopyPublisher::declare(std::span<const std::uint64_t> tags)
61 : {
62 10 : auto info = std::vector<felixbus::FelixBusInfo>{};
63 10 : info.reserve(tags.size());
64 10 : std::ranges::transform(tags, std::back_inserter(info), [this](const auto& tag) {
65 : return felixbus::FelixBusInfo{
66 : .fid = tag,
67 34 : .ip = m_publisher.get_endpoint().address(),
68 34 : .port = m_publisher.get_endpoint().port(),
69 : .unbuffered = false,
70 : .pubsub = true,
71 17 : .tcp = m_settings.network_type == netio3::NetworkType::ASYNCMSG,
72 : .stream = false,
73 17 : .netio_pages = m_settings.num_buffers_receiver,
74 17 : .netio_pagesize = m_settings.buffer_size,
75 34 : };
76 119 : });
77 10 : m_bus_writer.publish(info, m_settings.bus_filename);
78 10 : }
|