LCOV - code coverage report
Current view: top level - src/network - netio_buffered_receiver.hpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 30 37 81.1 %
Date: 2025-09-09 12:09:29 Functions: 6 7 85.7 %

          Line data    Source code
       1             : #ifndef FELIX_BUFFERED_RECEIVER_H_
       2             : #define FELIX_BUFFERED_RECEIVER_H_
       3             : 
       4             : #include <thread>
       5             : 
       6             : #include "netio/netio.h"
       7             : #include "netio_evloop.hpp"
       8             : #include "receiver.hpp"
       9             : #include "bus.hpp"
      10             : #include "felix/felix_toflx.hpp"
      11             : 
      12             : class NetioBufferedReceiver : public Receiver {
      13             : 
      14             : public:
      15             :     explicit NetioBufferedReceiver(const std::string &ip, uint32_t port, Bus& bus,
      16             :         unsigned int netio_pn, unsigned int netio_ps);
      17             : 
      18             :     explicit NetioBufferedReceiver(const std::string &ip, uint32_t port, Bus& bus,
      19             :         unsigned int netio_pn, unsigned int netio_ps, NetioEventLoop& evloop);
      20             : 
      21             :     ~NetioBufferedReceiver();
      22             : 
      23             :     bool declare(const std::vector<Elink> &elinks) override;
      24             : 
      25           5 :     void set_conn_open_callback(OnConnOpen callback) override {m_on_conn_open = callback;};
      26             : 
      27           5 :     void set_conn_close_callback(OnConnClose callback) override {m_on_conn_close = callback;};
      28             : 
      29           5 :     void set_on_msg_callback(OnMsg callback) override {m_on_msg = callback;};
      30             : 
      31           0 :     int get_number_of_connections() override {return -1;} //Unsupported: need entries of netio_socket_list linked list.
      32             : 
      33             : private:
      34             :     void eventLoop(uint32_t port);
      35             :     void init_listen_socket();
      36             : 
      37           5 :     static void cb_on_connection_established(struct netio_buffered_recv_socket* socket) {
      38           5 :         std::string ep = std::to_string(socket->recv_socket.eqfd); //Would like to have an IP here
      39           5 :         NetioBufferedReceiver* object = static_cast<NetioBufferedReceiver*>(socket->lsocket->usr);
      40           5 :         if (object->m_on_conn_open) {
      41           5 :             object->m_on_conn_open(ep);
      42             :         }
      43           5 :     }
      44             : 
      45           5 :     static void cb_on_connection_close(struct netio_buffered_recv_socket* socket) {
      46           5 :         std::string ep = std::to_string(socket->recv_socket.eqfd); //Would like to have an IP here
      47           5 :         NetioBufferedReceiver* object = static_cast<NetioBufferedReceiver*>(socket->lsocket->usr);
      48           5 :         if (object->m_on_conn_close) {
      49           5 :             object->m_on_conn_close(ep);
      50             :         }
      51           5 :     }
      52             : 
      53       10004 :     static void cb_buffered_msg_received(struct netio_buffered_recv_socket* socket, void* data, size_t len) {
      54       10004 :         uint8_t* dataptr = static_cast<uint8_t*>(data);
      55       10004 :         std::vector<ToFlxMessage> messages{};
      56             : 
      57       10004 :         size_t pos = 0;
      58       20008 :         while (pos < len) {
      59       10004 :             if (len - pos < sizeof(ToFlxHeader)) {
      60           0 :                 messages.push_back({.status = ToFlxMessage::Status::HeaderNotDecoded});
      61           0 :                 break;
      62             :             }
      63             : 
      64       10004 :             ToFlxHeader hdr;
      65       10004 :             std::memcpy(&hdr, dataptr + pos, sizeof(ToFlxHeader));
      66       10004 :             pos += sizeof(ToFlxHeader);
      67             : 
      68       10004 :             if (hdr.length == 0) {
      69           0 :                 messages.push_back({.status = ToFlxMessage::Status::InvalidMsgLength});
      70           0 :                 break;
      71             :             }
      72             : 
      73       10004 :             if (len - pos < hdr.length) {
      74           0 :                 messages.push_back({.status = ToFlxMessage::Status::MessageNotDecoded});
      75           0 :                 break;
      76             :             }
      77             :             
      78       10004 :             messages.emplace_back(static_cast<uint32_t>(hdr.elink), std::span<uint8_t>(dataptr + pos, hdr.length));
      79       10004 :             pos += hdr.length;
      80             :         }
      81             : 
      82       10004 :         static_cast<NetioBufferedReceiver*>(socket->lsocket->usr)->m_on_msg(messages);
      83       10004 :     }
      84             : 
      85             : private:
      86             :     const std::string m_ip;
      87             :     const uint32_t m_port;
      88             :     Bus& m_bus;    
      89             :     netio_buffered_socket_attr m_socket_attr;
      90             :     std::shared_ptr<netio_context> m_context;
      91             :     netio_buffered_listen_socket m_socket;
      92             : 
      93             :     OnConnOpen m_on_conn_open;
      94             :     OnConnClose m_on_conn_close;
      95             :     OnMsg m_on_msg;
      96             :     std::thread m_event_loop_thread;
      97             : };
      98             : 
      99             : #endif /* FELIX_BUFFERTED_RECEIVER_H_ */

Generated by: LCOV version 1.0