Line data Source code
1 : #ifndef FELIX_UNBUFFERED_RECEIVER_H_ 2 : #define FELIX_UNBUFFERED_RECEIVER_H_ 3 : 4 : #include <thread> 5 : 6 : #include "netio/netio.h" 7 : #include "netio_evloop.hpp" 8 : #include "receiver.hpp" 9 : #include "bus.hpp" 10 : #include "felix/felix_toflx.hpp" 11 : 12 : class NetioUnbufferedReceiver : public Receiver { 13 : 14 : public: 15 : explicit NetioUnbufferedReceiver(const std::string &ip, uint32_t port, Bus& bus, 16 : unsigned int netio_pn, unsigned int netio_ps); 17 : 18 : explicit NetioUnbufferedReceiver(const std::string &ip, uint32_t port, Bus& bus, 19 : unsigned int netio_pn, unsigned int netio_ps, NetioEventLoop& evloop); 20 : 21 : ~NetioUnbufferedReceiver(); 22 : 23 : bool declare(const std::vector<Elink> &elinks) override; 24 : 25 0 : void set_conn_open_callback(OnConnOpen callback) override {m_on_conn_open = callback;}; 26 : 27 0 : void set_conn_close_callback(OnConnClose callback) override {m_on_conn_close = callback;}; 28 : 29 0 : void set_on_msg_callback(OnMsg callback) override {m_on_msg = callback;}; 30 : 31 0 : int get_number_of_connections() override {return -1;} //Unsupported: need entries of netio_socket_list linked list. 32 : 33 : private: 34 : void eventLoop(uint32_t port); 35 : void init_listen_socket(); 36 : 37 0 : static void cb_on_connection_established(struct netio_recv_socket* socket) { 38 0 : std::string ep = std::to_string(socket->eqfd); //Would like to have an IP here 39 0 : NetioUnbufferedReceiver* object = static_cast<NetioUnbufferedReceiver*>(socket->lsocket->usr); 40 0 : if (object->m_on_conn_open) { 41 0 : object->m_on_conn_open(ep); 42 : } 43 0 : } 44 : 45 0 : static void cb_on_connection_close(struct netio_recv_socket* socket) { 46 0 : std::string ep = std::to_string(socket->eqfd); //Would like to have an IP here 47 0 : NetioUnbufferedReceiver* object = static_cast<NetioUnbufferedReceiver*>(socket->lsocket->usr); 48 0 : if (object->m_on_conn_close) { 49 0 : object->m_on_conn_close(ep); 50 : } 51 0 : } 52 : 53 0 : static void cb_msg_received(struct netio_recv_socket* socket, struct netio_buffer* buf, void* data, size_t len) { 54 0 : uint8_t* dataptr = static_cast<uint8_t*>(buf->data); 55 0 : std::vector<ToFlxMessage> messages{}; 56 0 : size_t pos = 0; 57 0 : while (pos < len) { 58 0 : if (len - pos < sizeof(ToFlxHeader)) { 59 0 : messages.push_back({.status = ToFlxMessage::Status::HeaderNotDecoded}); 60 0 : break; 61 : } 62 : 63 0 : ToFlxHeader hdr; 64 0 : std::memcpy(&hdr, dataptr + pos, sizeof(ToFlxHeader)); 65 0 : pos += sizeof(ToFlxHeader); 66 : 67 0 : if (hdr.length == 0) { 68 0 : messages.push_back({.status = ToFlxMessage::Status::InvalidMsgLength}); 69 0 : break; 70 : } 71 : 72 0 : if (len - pos < hdr.length) { 73 0 : messages.push_back({.status = ToFlxMessage::Status::MessageNotDecoded}); 74 0 : break; 75 : } 76 : 77 0 : messages.emplace_back(static_cast<uint32_t>(hdr.elink), std::span<uint8_t>(dataptr + pos, hdr.length)); 78 0 : pos += hdr.length; 79 : } 80 : 81 0 : static_cast<NetioUnbufferedReceiver*>(socket->lsocket->usr)->m_on_msg(messages); 82 0 : netio_post_recv(socket, buf); 83 0 : } 84 : 85 : private: 86 : const std::string m_ip; 87 : const uint32_t m_port; 88 : Bus& m_bus; 89 : netio_unbuffered_socket_attr m_socket_attr; 90 : std::shared_ptr<netio_context> m_context; 91 : netio_listen_socket m_socket; 92 : 93 : OnConnOpen m_on_conn_open; 94 : OnConnClose m_on_conn_close; 95 : OnMsg m_on_msg; 96 : std::thread m_event_loop_thread; 97 : }; 98 : 99 : #endif /* FELIX_UNBUFFERED_RECEIVER_H_ */