Line data Source code
1 : #ifndef FELIX_BUFFERED_RECEIVER_H_ 2 : #define FELIX_BUFFERED_RECEIVER_H_ 3 : 4 : #include <thread> 5 : 6 : #include "netio/netio.h" 7 : #include "netio_evloop.hpp" 8 : #include "receiver.hpp" 9 : #include "bus.hpp" 10 : #include "felix/felix_toflx.hpp" 11 : 12 : class NetioBufferedReceiver : public Receiver { 13 : 14 : public: 15 : explicit NetioBufferedReceiver(const std::string &ip, uint32_t port, Bus& bus, 16 : unsigned int netio_pn, unsigned int netio_ps); 17 : 18 : explicit NetioBufferedReceiver(const std::string &ip, uint32_t port, Bus& bus, 19 : unsigned int netio_pn, unsigned int netio_ps, NetioEventLoop& evloop); 20 : 21 : ~NetioBufferedReceiver(); 22 : 23 : bool declare(const std::vector<Elink> &elinks) override; 24 : 25 5 : void set_conn_open_callback(OnConnOpen callback) override {m_on_conn_open = callback;}; 26 : 27 5 : void set_conn_close_callback(OnConnClose callback) override {m_on_conn_close = callback;}; 28 : 29 5 : void set_on_msg_callback(OnMsg callback) override {m_on_msg = callback;}; 30 : 31 0 : int get_number_of_connections() override {return -1;} //Unsupported: need entries of netio_socket_list linked list. 32 : 33 : private: 34 : void eventLoop(uint32_t port); 35 : void init_listen_socket(); 36 : 37 5 : static void cb_on_connection_established(struct netio_buffered_recv_socket* socket) { 38 5 : std::string ep = std::to_string(socket->recv_socket.eqfd); //Would like to have an IP here 39 5 : NetioBufferedReceiver* object = static_cast<NetioBufferedReceiver*>(socket->lsocket->usr); 40 5 : if (object->m_on_conn_open) { 41 5 : object->m_on_conn_open(ep); 42 : } 43 5 : } 44 : 45 5 : static void cb_on_connection_close(struct netio_buffered_recv_socket* socket) { 46 5 : std::string ep = std::to_string(socket->recv_socket.eqfd); //Would like to have an IP here 47 5 : NetioBufferedReceiver* object = static_cast<NetioBufferedReceiver*>(socket->lsocket->usr); 48 5 : if (object->m_on_conn_close) { 49 5 : object->m_on_conn_close(ep); 50 : } 51 5 : } 52 : 53 10004 : static void cb_buffered_msg_received(struct netio_buffered_recv_socket* socket, void* data, size_t len) { 54 10004 : uint8_t* dataptr = static_cast<uint8_t*>(data); 55 10004 : std::vector<ToFlxMessage> messages{}; 56 : 57 10004 : size_t pos = 0; 58 20008 : while (pos < len) { 59 10004 : if (len - pos < sizeof(ToFlxHeader)) { 60 0 : messages.push_back({.status = ToFlxMessage::Status::HeaderNotDecoded}); 61 0 : break; 62 : } 63 : 64 10004 : ToFlxHeader hdr; 65 10004 : std::memcpy(&hdr, dataptr + pos, sizeof(ToFlxHeader)); 66 10004 : pos += sizeof(ToFlxHeader); 67 : 68 10004 : if (hdr.length == 0) { 69 0 : messages.push_back({.status = ToFlxMessage::Status::InvalidMsgLength}); 70 0 : break; 71 : } 72 : 73 10004 : if (len - pos < hdr.length) { 74 0 : messages.push_back({.status = ToFlxMessage::Status::MessageNotDecoded}); 75 0 : break; 76 : } 77 : 78 10004 : messages.emplace_back(static_cast<uint32_t>(hdr.elink), std::span<uint8_t>(dataptr + pos, hdr.length)); 79 10004 : pos += hdr.length; 80 : } 81 : 82 10004 : static_cast<NetioBufferedReceiver*>(socket->lsocket->usr)->m_on_msg(messages); 83 10004 : } 84 : 85 : private: 86 : const std::string m_ip; 87 : const uint32_t m_port; 88 : Bus& m_bus; 89 : netio_buffered_socket_attr m_socket_attr; 90 : std::shared_ptr<netio_context> m_context; 91 : netio_buffered_listen_socket m_socket; 92 : 93 : OnConnOpen m_on_conn_open; 94 : OnConnClose m_on_conn_close; 95 : OnMsg m_on_msg; 96 : std::thread m_event_loop_thread; 97 : }; 98 : 99 : #endif /* FELIX_BUFFERTED_RECEIVER_H_ */