Line data Source code
1 : #ifndef TOFLX_WRITER_H_ 2 : #define TOFLX_WRITER_H_ 3 : 4 : #include <cassert> 5 : #include <format> 6 : #include <memory> 7 : #include <numeric> 8 : #include "fromhost_buffer.hpp" 9 : #include "receiver.hpp" 10 : #include "elink.hpp" 11 : #include "log.hpp" 12 : #include "felix/felix_toflx.hpp" 13 : 14 : 15 : /** 16 : * FromHostWriter owns a network Receiver that runs the (event loop) thread. 17 : * Messages are received from the network, encoded, and written into the 18 : * FromHost buffer. The FromHost can be shared by more than one FromHostWriter. 19 : * FromHostWriter is templated to support FlxFromHostBuffer and FileFromHostBuffer. 20 : * */ 21 : template <class B> 22 : class FromHostWriter { 23 : 24 : public: 25 : /** 26 : * @brief FromHostWriter contructor. 27 : * @param buffer shared pointer to Fromhost DMA buffer. 28 : * @param elinks vector of enabled e-links. 29 : * @param receiver pointer to network receiver. 30 : */ 31 : FromHostWriter(std::shared_ptr<B> buffer, const std::vector<Elink> &elinks, std::unique_ptr<Receiver> receiver, bool trickle = false); 32 : 33 : /** 34 : * @brief encode and write an incoming network message. 35 : * @param buffer shared pointer to Fromhost DMA buffer. 36 : * @param elinks_type type of e-links to be enabled. 37 : * @param receiver pointer to network receiver. 38 : */ 39 : FromHostWriter(std::shared_ptr<B> buffer, elink_type_t elinks_type, std::unique_ptr<Receiver> receiver); 40 : 41 : FromHostWriter(const FromHostWriter&) = delete; 42 : FromHostWriter& operator=(const FromHostWriter&) = delete; 43 : 44 : FromHostWriter(FromHostWriter&&) noexcept = default; 45 : FromHostWriter& operator=(FromHostWriter&&) noexcept = default; 46 : 47 : /** 48 : * @brief on-connection-established callback. 49 : */ 50 5 : void connection_established(const std::string& s){LOG_INFO("Connection established. Conn. info from receiver: %s", s.c_str());}; 51 : 52 : /** 53 : * @brief on-connection-closed callback. 54 : */ 55 5 : void connection_closed(const std::string& s){LOG_INFO("Connection closed. Conn. info from receiver: %s", s.c_str());}; 56 : 57 : /** 58 : * @brief encode and write an incoming network message. 59 : * @param messages vector of ToFlxMessage structs representing incoming network messages. 60 : */ 61 : void encode_and_write(const std::vector<ToFlxMessage>& messages, bool trickle = false); 62 : 63 : /** 64 : * @return map of e-link / e-link monitoring information. 65 : */ 66 : std::map<local_elink_t, FromHostElinkStats> get_writer_elink_stats(); 67 : 68 : /** 69 : * @return number of open connections. 70 : */ 71 : unsigned int get_number_of_connections(){return m_receiver->get_number_of_connections();} 72 : 73 : /** 74 : * @return id of the writer 75 : */ 76 21 : uint32_t get_id() const { 77 21 : return m_id; 78 : } 79 : 80 : private: 81 : const uint32_t m_id; 82 : std::shared_ptr<B> m_buffer; 83 : std::unique_ptr<Receiver> m_receiver; 84 : std::map<local_elink_t, FromHostElinkStats> m_elink_stats; 85 : std::map<local_elink_t, FromHostElinkStats> m_elink_stats_prev; 86 : }; 87 : 88 : 89 : template <class B> 90 5 : FromHostWriter<B>::FromHostWriter(std::shared_ptr<B> buffer, const std::vector<Elink> &elinks, std::unique_ptr<Receiver> receiver, bool trickle /*= false*/) 91 10 : : m_id(0), m_buffer(buffer), m_receiver{std::move(receiver)} 92 : { 93 5 : m_receiver->declare(elinks); 94 : 95 10 : for (const auto & e : elinks){ 96 5 : m_elink_stats.emplace(std::piecewise_construct, std::forward_as_tuple(e.lid), std::forward_as_tuple(e.fid)); 97 : } 98 : 99 5 : m_elink_stats_prev = m_elink_stats; 100 10 : m_receiver->set_conn_open_callback([this](const std::string& s){connection_established(s);}); 101 10 : m_receiver->set_conn_close_callback([this](const std::string& s){connection_closed(s);}); 102 10009 : m_receiver->set_on_msg_callback([this, trickle](const std::vector<ToFlxMessage>& messages){encode_and_write(messages, trickle);}); 103 5 : } 104 : 105 : 106 : template <class B> 107 : FromHostWriter<B>::FromHostWriter(std::shared_ptr<B> buffer, elink_type_t type, std::unique_ptr<Receiver> receiver) 108 : : FromHostWriter(buffer, buffer->get_elinks_of_type(type), std::move(receiver)) { } 109 : 110 : 111 : template <class B> 112 10004 : void FromHostWriter<B>::encode_and_write(const std::vector<ToFlxMessage>& messages, bool trickle) { 113 : 114 20008 : size_t tot_payload_size = std::accumulate(messages.begin(), messages.end(), size_t{0}, [](size_t acc, const ToFlxMessage& msg){return acc + msg.payload.size();}); 115 : 116 10004 : if (tot_payload_size > m_buffer->get_size()) { 117 0 : LOG_ERR(std::format("Dropping message of size {} B, larger than entire buffer of size {} B", tot_payload_size, m_buffer->get_size()).c_str()); 118 0 : return; 119 : } 120 : 121 10004 : std::unique_lock<std::mutex> lk(m_buffer->m_buffer_mutex, std::defer_lock); 122 10004 : if (m_buffer->has_multiple_writers()) { 123 0 : lk.lock(); 124 : } 125 : 126 10004 : if (trickle) { 127 2 : m_buffer->set_oneshot_trickle_buffer(m_buffer->get_trickle_config_size()); 128 2 : m_buffer->set_trickle_config_size(tot_payload_size); 129 : } 130 : 131 10004 : LOG_DBG(std::format("Received network msg of total length {} B", tot_payload_size).c_str()); 132 : 133 20008 : for(const ToFlxMessage& msg : messages){ 134 10004 : if(msg.status != ToFlxMessage::Status::MessageOk){ 135 0 : LOG_ERR(std::format("Message status not OK: {}. Dropping message.", ToFlxMessage::statusToString(msg.status)).c_str()); 136 0 : ++(m_elink_stats[msg.elink].dropped_msg); 137 0 : return; 138 : } 139 : 140 10004 : size_t msg_size = m_buffer->compute_msg_dma_occupancy(msg.payload.size()); 141 10004 : while ( msg_size > m_buffer->dma_free_bytes()) { 142 0 : usleep(10); 143 : } 144 10004 : m_buffer->encode_and_write(msg.elink, msg.payload.data(), msg.payload.size(), trickle); 145 10004 : m_elink_stats[msg.elink].on_processed_msg(msg.payload.size()); 146 : } 147 : 148 10004 : if (trickle) { m_buffer->dma_start_circular_trickle_buffer(); } 149 10004 : } 150 : 151 : 152 : template <class B> 153 21 : std::map<local_elink_t, FromHostElinkStats> FromHostWriter<B>::get_writer_elink_stats() 154 : { 155 21 : std::map<local_elink_t, FromHostElinkStats> output; 156 42 : for (auto & [key, value] : m_elink_stats){ 157 21 : output[key] = value.get_increment(m_elink_stats_prev[key]); 158 : } 159 21 : return output; 160 0 : } 161 : 162 : 163 : #endif /* TOFLX_WRITER_H_ */