LCOV - code coverage report
Current view: top level - src/network - netio_unbuffered_receiver.hpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 0 38 0.0 %
Date: 2025-09-09 12:09:29 Functions: 0 7 0.0 %

          Line data    Source code
       1             : #ifndef FELIX_UNBUFFERED_RECEIVER_H_
       2             : #define FELIX_UNBUFFERED_RECEIVER_H_
       3             : 
       4             : #include <thread>
       5             : 
       6             : #include "netio/netio.h"
       7             : #include "netio_evloop.hpp"
       8             : #include "receiver.hpp"
       9             : #include "bus.hpp"
      10             : #include "felix/felix_toflx.hpp"
      11             : 
      12             : class NetioUnbufferedReceiver : public Receiver {
      13             : 
      14             : public:
      15             :     explicit NetioUnbufferedReceiver(const std::string &ip, uint32_t port, Bus& bus,
      16             :         unsigned int netio_pn, unsigned int netio_ps);
      17             : 
      18             :     explicit NetioUnbufferedReceiver(const std::string &ip, uint32_t port, Bus& bus,
      19             :         unsigned int netio_pn, unsigned int netio_ps, NetioEventLoop& evloop);
      20             : 
      21             :     ~NetioUnbufferedReceiver();
      22             : 
      23             :     bool declare(const std::vector<Elink> &elinks) override;
      24             : 
      25           0 :     void set_conn_open_callback(OnConnOpen callback) override {m_on_conn_open = callback;};
      26             : 
      27           0 :     void set_conn_close_callback(OnConnClose callback) override {m_on_conn_close = callback;};
      28             : 
      29           0 :     void set_on_msg_callback(OnMsg callback) override {m_on_msg = callback;};
      30             : 
      31           0 :     int get_number_of_connections() override {return -1;} //Unsupported: need entries of netio_socket_list linked list.
      32             : 
      33             : private:
      34             :     void eventLoop(uint32_t port);
      35             :     void init_listen_socket();
      36             : 
      37           0 :     static void cb_on_connection_established(struct netio_recv_socket* socket) {
      38           0 :         std::string ep = std::to_string(socket->eqfd); //Would like to have an IP here
      39           0 :         NetioUnbufferedReceiver* object = static_cast<NetioUnbufferedReceiver*>(socket->lsocket->usr);
      40           0 :         if (object->m_on_conn_open) {
      41           0 :             object->m_on_conn_open(ep);
      42             :         }
      43           0 :     }
      44             : 
      45           0 :     static void cb_on_connection_close(struct netio_recv_socket* socket) {
      46           0 :         std::string ep = std::to_string(socket->eqfd); //Would like to have an IP here
      47           0 :         NetioUnbufferedReceiver* object = static_cast<NetioUnbufferedReceiver*>(socket->lsocket->usr);
      48           0 :         if (object->m_on_conn_close) {
      49           0 :             object->m_on_conn_close(ep);
      50             :         }
      51           0 :     }
      52             : 
      53           0 :     static void cb_msg_received(struct netio_recv_socket* socket, struct netio_buffer* buf, void* data, size_t len) {
      54           0 :         uint8_t* dataptr = static_cast<uint8_t*>(buf->data);
      55           0 :         std::vector<ToFlxMessage> messages{};
      56           0 :         size_t pos = 0;
      57           0 :         while (pos < len) {
      58           0 :             if (len - pos < sizeof(ToFlxHeader)) {
      59           0 :                 messages.push_back({.status = ToFlxMessage::Status::HeaderNotDecoded});
      60           0 :                 break;
      61             :             }
      62             : 
      63           0 :             ToFlxHeader hdr;
      64           0 :             std::memcpy(&hdr, dataptr + pos, sizeof(ToFlxHeader));
      65           0 :             pos += sizeof(ToFlxHeader);
      66             : 
      67           0 :             if (hdr.length == 0) {
      68           0 :                 messages.push_back({.status = ToFlxMessage::Status::InvalidMsgLength});
      69           0 :                 break;
      70             :             }
      71             : 
      72           0 :             if (len - pos < hdr.length) {
      73           0 :                 messages.push_back({.status = ToFlxMessage::Status::MessageNotDecoded});
      74           0 :                 break;
      75             :             }
      76             : 
      77           0 :             messages.emplace_back(static_cast<uint32_t>(hdr.elink), std::span<uint8_t>(dataptr + pos, hdr.length));
      78           0 :             pos += hdr.length;
      79             :         }
      80             : 
      81           0 :         static_cast<NetioUnbufferedReceiver*>(socket->lsocket->usr)->m_on_msg(messages);
      82           0 :         netio_post_recv(socket, buf);
      83           0 :     }
      84             : 
      85             : private:
      86             :     const std::string m_ip;
      87             :     const uint32_t m_port;
      88             :     Bus& m_bus;
      89             :     netio_unbuffered_socket_attr m_socket_attr;
      90             :     std::shared_ptr<netio_context> m_context;
      91             :     netio_listen_socket m_socket;
      92             : 
      93             :     OnConnOpen m_on_conn_open;
      94             :     OnConnClose m_on_conn_close;
      95             :     OnMsg m_on_msg;
      96             :     std::thread m_event_loop_thread;
      97             : };
      98             : 
      99             : #endif /* FELIX_UNBUFFERED_RECEIVER_H_ */

Generated by: LCOV version 1.0