.. _program_listing_file_fromhost_writer.hpp: Program Listing for File fromhost_writer.hpp ============================================ |exhale_lsh| :ref:`Return to documentation for file ` (``fromhost_writer.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #ifndef TOFLX_WRITER_H_ #define TOFLX_WRITER_H_ #include #include #include #include #include "fromhost_buffer.hpp" #include "receiver.hpp" #include "elink.hpp" #include "log.hpp" #include "felix/felix_toflx.hpp" template class FromHostWriter { public: FromHostWriter(std::shared_ptr buffer, const std::vector &elinks, std::unique_ptr receiver, bool trickle = false); FromHostWriter(std::shared_ptr buffer, elink_type_t elinks_type, std::unique_ptr receiver); FromHostWriter(const FromHostWriter&) = delete; FromHostWriter& operator=(const FromHostWriter&) = delete; FromHostWriter(FromHostWriter&&) noexcept = default; FromHostWriter& operator=(FromHostWriter&&) noexcept = default; void connection_established(const std::string& s){LOG_INFO("Connection established. Conn. info from receiver: %s", s.c_str());}; void connection_closed(const std::string& s){LOG_INFO("Connection closed. Conn. info from receiver: %s", s.c_str());}; void encode_and_write(const std::vector& messages, bool trickle = false); std::map get_writer_elink_stats(); unsigned int get_number_of_connections(){return m_receiver->get_number_of_connections();} uint32_t get_id() const { return m_id; } private: const uint32_t m_id; std::shared_ptr m_buffer; std::unique_ptr m_receiver; std::map m_elink_stats; std::map m_elink_stats_prev; }; template FromHostWriter::FromHostWriter(std::shared_ptr buffer, const std::vector &elinks, std::unique_ptr receiver, bool trickle /*= false*/) : m_id(0), m_buffer(buffer), m_receiver{std::move(receiver)} { m_receiver->declare(elinks); for (const auto & e : elinks){ m_elink_stats.emplace(std::piecewise_construct, std::forward_as_tuple(e.lid), std::forward_as_tuple(e.fid)); } m_elink_stats_prev = m_elink_stats; m_receiver->set_conn_open_callback([this](const std::string& s){connection_established(s);}); m_receiver->set_conn_close_callback([this](const std::string& s){connection_closed(s);}); m_receiver->set_on_msg_callback([this, trickle](const std::vector& messages){encode_and_write(messages, trickle);}); } template FromHostWriter::FromHostWriter(std::shared_ptr buffer, elink_type_t type, std::unique_ptr receiver) : FromHostWriter(buffer, buffer->get_elinks_of_type(type), std::move(receiver)) { } template void FromHostWriter::encode_and_write(const std::vector& messages, bool trickle) { size_t tot_payload_size = std::accumulate(messages.begin(), messages.end(), size_t{0}, [](size_t acc, const ToFlxMessage& msg){return acc + msg.payload.size();}); if (tot_payload_size > m_buffer->get_size()) { LOG_ERR(std::format("Dropping message of size {} B, larger than entire buffer of size {} B", tot_payload_size, m_buffer->get_size()).c_str()); return; } std::unique_lock lk(m_buffer->m_buffer_mutex, std::defer_lock); if (m_buffer->has_multiple_writers()) { lk.lock(); } if (trickle) { m_buffer->set_oneshot_trickle_buffer(m_buffer->get_trickle_config_size()); m_buffer->set_trickle_config_size(tot_payload_size); } LOG_DBG(std::format("Received network msg of total length {} B", tot_payload_size).c_str()); for(const ToFlxMessage& msg : messages){ if(msg.status != ToFlxMessage::Status::MessageOk){ LOG_ERR(std::format("Message status not OK: {}. Dropping message.", ToFlxMessage::statusToString(msg.status)).c_str()); ++(m_elink_stats[msg.elink].dropped_msg); return; } size_t msg_size = m_buffer->compute_msg_dma_occupancy(msg.payload.size()); while ( msg_size > m_buffer->dma_free_bytes()) { usleep(10); } m_buffer->encode_and_write(msg.elink, msg.payload.data(), msg.payload.size(), trickle); m_elink_stats[msg.elink].on_processed_msg(msg.payload.size()); } if (trickle) { m_buffer->dma_start_circular_trickle_buffer(); } } template std::map FromHostWriter::get_writer_elink_stats() { std::map output; for (auto & [key, value] : m_elink_stats){ output[key] = value.get_increment(m_elink_stats_prev[key]); } return output; } #endif /* TOFLX_WRITER_H_ */