Line data Source code
1 : #ifndef FELIX_NETIOBUFFEREDPUBLISHER_H_ 2 : #define FELIX_NETIOBUFFEREDPUBLISHER_H_ 3 : 4 : #include <array> 5 : #include <string> 6 : #include <thread> 7 : 8 : #include "netio/netio.h" 9 : #include "bus.hpp" 10 : #include "publisher.hpp" 11 : #include "netio_evloop.hpp" 12 : 13 : 14 : class NetioBufferedPublisher : public Publisher 15 : { 16 : public: 17 : explicit NetioBufferedPublisher(const std::string &ip, uint32_t port, Bus& bus, 18 : unsigned int netio_pn, unsigned int netio_ps, unsigned int netio_wm, 19 : unsigned int netio_to, unsigned int max_msg_size); 20 : 21 : explicit NetioBufferedPublisher(const std::string &ip, uint32_t port, Bus& bus, 22 : unsigned int netio_pn, unsigned int netio_ps, unsigned int netio_wm, 23 : unsigned int netio_to, unsigned int max_msg_size, NetioEventLoop& evloop); 24 : 25 : ~NetioBufferedPublisher(); 26 : 27 : NetioBufferedPublisher(const NetioBufferedPublisher &) = delete; 28 : NetioBufferedPublisher &operator=(const NetioBufferedPublisher &) = delete; 29 : 30 : bool declare(const std::vector<Elink> &elinks) override; 31 : 32 : Result publish(felix_id_t fid, iovec *iov, uint32_t iovlen, size_t bytes, uint32_t block_addr, std::uint8_t status) override; 33 : 34 : Result publish(felix_id_t fid, uint8_t* data, size_t len) override; 35 : 36 : Result flush(felix_id_t fid) override; 37 : 38 : void set_periodic_callback(uint32_t period_us, Callback callback) override; 39 : 40 : void set_asynch_callback(Callback callback) override; 41 : 42 : void fire_asynch_callback() override; 43 : 44 0 : const CompletionTable* get_completion_table() override {return nullptr;}; 45 : 46 : uint32_t get_resource_counter() override; 47 : 48 : uint32_t get_subscription_number() override; 49 : 50 85 : uint64_t get_resource_available_calls() override {return m_buf_available_calls;}; 51 : 52 : private: 53 : void eventLoop(uint32_t port); 54 : 55 : void on_buffer_available(); 56 212074 : static void cb_on_buffer_available(struct netio_publish_socket* socket) { 57 212074 : static_cast<NetioBufferedPublisher*>(socket->usr)->on_buffer_available(); 58 212074 : } 59 : 60 : void init_publish_socket(unsigned int max_msg_size); 61 : 62 : private: 63 : struct StreamCache 64 : { 65 : StreamCache() 66 : { 67 : netio_subscription_cache_init(&m_cache); 68 : } 69 : int m_again = 0; 70 : netio_subscription_cache m_cache; 71 : }; 72 : 73 : struct LinkCache 74 : { 75 : std::array<StreamCache, 0x100> m_streams; 76 : }; 77 : 78 : const std::string m_ip; 79 : const uint32_t m_port; 80 : Bus& m_bus; 81 : netio_buffered_socket_attr m_socket_attr; 82 : std::shared_ptr<netio_context> m_context; 83 : netio_publish_socket m_socket; 84 : netio_signal m_signal; //interrupt-driven 85 : netio_timer m_timer; //polling readout 86 : Callback m_read_callback; 87 : std::array<LinkCache, 0x1000> m_cache; 88 : std::thread m_event_loop_thread; 89 : uint64_t m_buf_available_calls = 0; 90 : }; 91 : 92 : #endif /* FELIX_NETIOBUFFEREDPUBLISHER_H_ */