Line data Source code
1 : #ifndef FELIX_NETIOZEROCOPYPUBLISHER_H_ 2 : #define FELIX_NETIOZEROCOPYPUBLISHER_H_ 3 : 4 : #include <array> 5 : #include <string> 6 : #include <thread> 7 : 8 : #include "netio/netio.h" 9 : #include "netio_evloop.hpp" 10 : #include "bus.hpp" 11 : #include "publisher.hpp" 12 : #include "completion_table.hpp" 13 : 14 : 15 : class NetioZerocopyPublisher : public Publisher 16 : { 17 : public: 18 : explicit NetioZerocopyPublisher(const std::string &ip, uint32_t port, Bus& bus, 19 : unsigned int netio_pn, unsigned int netio_ps, unsigned int max_msg_size, 20 : uint64_t dma_buffer_vaddr, size_t dma_size); 21 : 22 : explicit NetioZerocopyPublisher(const std::string &ip, uint32_t port, Bus& bus, 23 : unsigned int netio_pn, unsigned int netio_ps, unsigned int max_msg_size, 24 : uint64_t dma_buffer_vaddr, size_t dma_size, NetioEventLoop& evloop); 25 : 26 : ~NetioZerocopyPublisher(); 27 : 28 : NetioZerocopyPublisher(const NetioZerocopyPublisher &) = delete; 29 : NetioZerocopyPublisher &operator=(const NetioZerocopyPublisher &) = delete; 30 : 31 : bool declare(const std::vector<Elink> &elinks) override; 32 : 33 : Result publish(felix_id_t fid, iovec *iov, uint32_t iovlen, size_t bytes, uint32_t block_addr, std::uint8_t status) override; 34 : 35 : Result publish(felix_id_t fid, uint8_t* data, size_t len) override; 36 : 37 : Result flush(felix_id_t fid) override; 38 : 39 : void set_periodic_callback(uint32_t period_us, Callback callback) override; 40 : 41 : void set_asynch_callback(Callback callback) override; 42 : 43 : void fire_asynch_callback() override; 44 : 45 12878486 : const CompletionTable* get_completion_table() override {return m_ct.get();}; 46 : 47 : uint32_t get_resource_counter() override; 48 : 49 : uint32_t get_subscription_number() override; 50 : 51 103 : uint64_t get_resource_available_calls() override {return 0;}; 52 : 53 : private: 54 : void eventLoop(uint32_t port); 55 : void init_publish_socket(unsigned int max_msg_size); 56 : 57 : void on_msg_published(uint64_t); 58 : 59 205203907 : static void cb_on_msg_published(struct netio_unbuffered_publish_socket* socket, uint64_t key) { 60 205203907 : static_cast<NetioZerocopyPublisher*>(socket->usr)->on_msg_published(key); 61 205203907 : } 62 : 63 : 64 : private: 65 : struct StreamCache 66 : { 67 : StreamCache() 68 : { 69 : netio_subscription_cache_init(&m_cache); 70 : } 71 : int m_again = 0; 72 : uint64_t key = 0; 73 : netio_subscription_cache m_cache; 74 : }; 75 : 76 : struct LinkCache 77 : { 78 : std::array<StreamCache, 0x100> m_streams; 79 : }; 80 : 81 : const std::string m_ip; 82 : const uint32_t m_port; 83 : Bus& m_bus; 84 : netio_buffer m_dma_buffer; 85 : std::unique_ptr<CompletionTable> m_ct; 86 : netio_unbuffered_socket_attr m_socket_attr; 87 : std::shared_ptr<netio_context> m_context; 88 : netio_unbuffered_publish_socket m_socket; 89 : netio_signal m_signal; //interrupt-driven 90 : netio_timer m_timer; //polling readout 91 : Callback m_read_callback; 92 : std::array<LinkCache, 0x1000> m_cache; 93 : std::thread m_event_loop_thread; 94 : }; 95 : 96 : #endif /* FELIX_NETIOZEROCOPYPUBLISHER_H_ */