LCOV - code coverage report
Current view: top level - src - fromhost_writer.hpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 36 44 81.8 %
Date: 2025-09-09 12:09:29 Functions: 5 5 100.0 %

          Line data    Source code
       1             : #ifndef TOFLX_WRITER_H_
       2             : #define TOFLX_WRITER_H_
       3             : 
       4             : #include <cassert>
       5             : #include <format>
       6             : #include <memory>
       7             : #include <numeric>
       8             : #include "fromhost_buffer.hpp"
       9             : #include "receiver.hpp"
      10             : #include "elink.hpp"
      11             : #include "log.hpp"
      12             : #include "felix/felix_toflx.hpp"
      13             : 
      14             : 
      15             : /**
      16             :  * FromHostWriter owns a network Receiver that runs the (event loop) thread.
      17             :  * Messages are received from the network, encoded, and written into the
      18             :  * FromHost buffer. The FromHost can be shared by more than one FromHostWriter.
      19             :  * FromHostWriter is templated to support FlxFromHostBuffer and FileFromHostBuffer.
      20             :  * */
      21             : template <class B>
      22             : class FromHostWriter {
      23             : 
      24             :     public:
      25             :         /**
      26             :          * @brief FromHostWriter contructor.
      27             :          * @param buffer shared pointer to Fromhost DMA buffer.
      28             :          * @param elinks vector of enabled e-links.
      29             :          * @param receiver pointer to network receiver.
      30             :          */
      31             :         FromHostWriter(std::shared_ptr<B> buffer, const std::vector<Elink> &elinks, std::unique_ptr<Receiver> receiver, bool trickle = false);
      32             : 
      33             :         /**
      34             :          * @brief encode and write an incoming network message.
      35             :          * @param buffer shared pointer to Fromhost DMA buffer.
      36             :          * @param elinks_type type of e-links to be enabled.
      37             :          * @param receiver pointer to network receiver.
      38             :          */
      39             :         FromHostWriter(std::shared_ptr<B> buffer, elink_type_t elinks_type, std::unique_ptr<Receiver> receiver);
      40             : 
      41             :         FromHostWriter(const FromHostWriter&) = delete;
      42             :         FromHostWriter& operator=(const FromHostWriter&) = delete;
      43             : 
      44             :         FromHostWriter(FromHostWriter&&) noexcept = default;
      45             :         FromHostWriter& operator=(FromHostWriter&&) noexcept = default;
      46             : 
      47             :         /**
      48             :          * @brief on-connection-established callback.
      49             :          */
      50           5 :         void connection_established(const std::string& s){LOG_INFO("Connection established. Conn. info from receiver: %s", s.c_str());};
      51             : 
      52             :         /**
      53             :          * @brief on-connection-closed callback.
      54             :          */
      55           5 :         void connection_closed(const std::string& s){LOG_INFO("Connection closed. Conn. info from receiver: %s", s.c_str());};
      56             : 
      57             :         /**
      58             :          * @brief encode and write an incoming network message.
      59             :          * @param messages vector of ToFlxMessage structs representing incoming network messages.
      60             :          */
      61             :         void encode_and_write(const std::vector<ToFlxMessage>& messages, bool trickle = false);
      62             : 
      63             :         /**
      64             :          * @return map of e-link / e-link monitoring information.
      65             :          */
      66             :         std::map<local_elink_t, FromHostElinkStats> get_writer_elink_stats();
      67             : 
      68             :         /**
      69             :          * @return number of open connections.
      70             :          */
      71             :         unsigned int get_number_of_connections(){return m_receiver->get_number_of_connections();}
      72             : 
      73             :         /**
      74             :          * @return id of the writer
      75             :          */
      76          21 :         uint32_t get_id() const {
      77          21 :             return m_id;
      78             :         }
      79             : 
      80             :     private:
      81             :         const uint32_t m_id;
      82             :         std::shared_ptr<B> m_buffer;
      83             :         std::unique_ptr<Receiver> m_receiver;
      84             :         std::map<local_elink_t, FromHostElinkStats> m_elink_stats;
      85             :         std::map<local_elink_t, FromHostElinkStats> m_elink_stats_prev;
      86             : };
      87             : 
      88             : 
      89             : template <class B>
      90           5 : FromHostWriter<B>::FromHostWriter(std::shared_ptr<B> buffer, const std::vector<Elink> &elinks, std::unique_ptr<Receiver> receiver, bool trickle /*= false*/)
      91          10 :     : m_id(0), m_buffer(buffer), m_receiver{std::move(receiver)}
      92             : {
      93           5 :     m_receiver->declare(elinks);
      94             : 
      95          10 :     for (const auto & e : elinks){
      96           5 :         m_elink_stats.emplace(std::piecewise_construct, std::forward_as_tuple(e.lid), std::forward_as_tuple(e.fid));
      97             :     }
      98             : 
      99           5 :     m_elink_stats_prev = m_elink_stats;
     100          10 :     m_receiver->set_conn_open_callback([this](const std::string& s){connection_established(s);});
     101          10 :     m_receiver->set_conn_close_callback([this](const std::string& s){connection_closed(s);});
     102       10009 :     m_receiver->set_on_msg_callback([this, trickle](const std::vector<ToFlxMessage>& messages){encode_and_write(messages, trickle);});
     103           5 : }
     104             : 
     105             : 
     106             : template <class B>
     107             : FromHostWriter<B>::FromHostWriter(std::shared_ptr<B> buffer, elink_type_t type, std::unique_ptr<Receiver> receiver)
     108             :     : FromHostWriter(buffer, buffer->get_elinks_of_type(type), std::move(receiver)) { }
     109             : 
     110             : 
     111             : template <class B>
     112       10004 : void FromHostWriter<B>::encode_and_write(const std::vector<ToFlxMessage>& messages, bool trickle) {
     113             : 
     114       20008 :     size_t tot_payload_size = std::accumulate(messages.begin(), messages.end(), size_t{0}, [](size_t acc, const ToFlxMessage& msg){return acc + msg.payload.size();});
     115             : 
     116       10004 :     if (tot_payload_size > m_buffer->get_size()) {
     117           0 :         LOG_ERR(std::format("Dropping message of size {} B, larger than entire buffer of size {} B", tot_payload_size, m_buffer->get_size()).c_str());
     118           0 :         return;
     119             :     }
     120             : 
     121       10004 :     std::unique_lock<std::mutex> lk(m_buffer->m_buffer_mutex, std::defer_lock);
     122       10004 :     if (m_buffer->has_multiple_writers()) {
     123           0 :         lk.lock();
     124             :     }
     125             : 
     126       10004 :     if (trickle) {
     127           2 :         m_buffer->set_oneshot_trickle_buffer(m_buffer->get_trickle_config_size());
     128           2 :         m_buffer->set_trickle_config_size(tot_payload_size);
     129             :     }
     130             : 
     131       10004 :     LOG_DBG(std::format("Received network msg of total length {} B", tot_payload_size).c_str());
     132             : 
     133       20008 :     for(const ToFlxMessage& msg : messages){
     134       10004 :         if(msg.status != ToFlxMessage::Status::MessageOk){
     135           0 :             LOG_ERR(std::format("Message status not OK: {}. Dropping message.", ToFlxMessage::statusToString(msg.status)).c_str());
     136           0 :             ++(m_elink_stats[msg.elink].dropped_msg);
     137           0 :             return;
     138             :         }
     139             : 
     140       10004 :         size_t msg_size = m_buffer->compute_msg_dma_occupancy(msg.payload.size());
     141       10004 :         while ( msg_size > m_buffer->dma_free_bytes()) {
     142           0 :             usleep(10);
     143             :         }
     144       10004 :         m_buffer->encode_and_write(msg.elink, msg.payload.data(), msg.payload.size(), trickle);
     145       10004 :         m_elink_stats[msg.elink].on_processed_msg(msg.payload.size());
     146             :     }
     147             : 
     148       10004 :     if (trickle) { m_buffer->dma_start_circular_trickle_buffer(); }
     149       10004 : }
     150             : 
     151             : 
     152             : template <class B>
     153          21 : std::map<local_elink_t, FromHostElinkStats> FromHostWriter<B>::get_writer_elink_stats()
     154             : {
     155          21 :     std::map<local_elink_t, FromHostElinkStats> output;
     156          42 :     for (auto & [key, value] : m_elink_stats){
     157          21 :         output[key] = value.get_increment(m_elink_stats_prev[key]);
     158             :     }
     159          21 :     return output;
     160           0 : }
     161             : 
     162             : 
     163             : #endif /* TOFLX_WRITER_H_ */

Generated by: LCOV version 1.0