Program Listing for File fromhost_writer.hpp

Return to documentation for file (fromhost_writer.hpp)

#ifndef TOFLX_WRITER_H_
#define TOFLX_WRITER_H_

#include <cassert>
#include <format>
#include <memory>
#include <numeric>
#include "fromhost_buffer.hpp"
#include "receiver.hpp"
#include "elink.hpp"
#include "log.hpp"
#include "felix/felix_toflx.hpp"


template <class B>
class FromHostWriter {

    public:
        FromHostWriter(std::shared_ptr<B> buffer, const std::vector<Elink> &elinks, std::unique_ptr<Receiver> receiver, bool trickle = false);

        FromHostWriter(std::shared_ptr<B> buffer, elink_type_t elinks_type, std::unique_ptr<Receiver> receiver);

        FromHostWriter(const FromHostWriter&) = delete;
        FromHostWriter& operator=(const FromHostWriter&) = delete;

        FromHostWriter(FromHostWriter&&) noexcept = default;
        FromHostWriter& operator=(FromHostWriter&&) noexcept = default;

        void connection_established(const std::string& s){LOG_INFO("Connection established. Conn. info from receiver: %s", s.c_str());};

        void connection_closed(const std::string& s){LOG_INFO("Connection closed. Conn. info from receiver: %s", s.c_str());};

        void encode_and_write(const std::vector<ToFlxMessage>& messages, bool trickle = false);

        std::map<local_elink_t, FromHostElinkStats> get_writer_elink_stats();

        unsigned int get_number_of_connections(){return m_receiver->get_number_of_connections();}

        uint32_t get_id() const {
            return m_id;
        }

    private:
        const uint32_t m_id;
        std::shared_ptr<B> m_buffer;
        std::unique_ptr<Receiver> m_receiver;
        std::map<local_elink_t, FromHostElinkStats> m_elink_stats;
        std::map<local_elink_t, FromHostElinkStats> m_elink_stats_prev;
};


template <class B>
FromHostWriter<B>::FromHostWriter(std::shared_ptr<B> buffer, const std::vector<Elink> &elinks, std::unique_ptr<Receiver> receiver, bool trickle /*= false*/)
    : m_id(0), m_buffer(buffer), m_receiver{std::move(receiver)}
{
    m_receiver->declare(elinks);

    for (const auto & e : elinks){
        m_elink_stats.emplace(std::piecewise_construct, std::forward_as_tuple(e.lid), std::forward_as_tuple(e.fid));
    }

    m_elink_stats_prev = m_elink_stats;
    m_receiver->set_conn_open_callback([this](const std::string& s){connection_established(s);});
    m_receiver->set_conn_close_callback([this](const std::string& s){connection_closed(s);});
    m_receiver->set_on_msg_callback([this, trickle](const std::vector<ToFlxMessage>& messages){encode_and_write(messages, trickle);});
}


template <class B>
FromHostWriter<B>::FromHostWriter(std::shared_ptr<B> buffer, elink_type_t type, std::unique_ptr<Receiver> receiver)
    : FromHostWriter(buffer, buffer->get_elinks_of_type(type), std::move(receiver)) { }


template <class B>
void FromHostWriter<B>::encode_and_write(const std::vector<ToFlxMessage>& messages, bool trickle) {

    size_t tot_payload_size = std::accumulate(messages.begin(), messages.end(), size_t{0}, [](size_t acc, const ToFlxMessage& msg){return acc + msg.payload.size();});

    if (tot_payload_size > m_buffer->get_size()) {
        LOG_ERR(std::format("Dropping message of size {} B, larger than entire buffer of size {} B", tot_payload_size, m_buffer->get_size()).c_str());
        return;
    }

    std::unique_lock<std::mutex> lk(m_buffer->m_buffer_mutex, std::defer_lock);
    if (m_buffer->has_multiple_writers()) {
        lk.lock();
    }

    if (trickle) {
        m_buffer->set_oneshot_trickle_buffer(m_buffer->get_trickle_config_size());
        m_buffer->set_trickle_config_size(tot_payload_size);
    }

    LOG_DBG(std::format("Received network msg of total length {} B", tot_payload_size).c_str());

    for(const ToFlxMessage& msg : messages){
        if(msg.status != ToFlxMessage::Status::MessageOk){
            LOG_ERR(std::format("Message status not OK: {}. Dropping message.", ToFlxMessage::statusToString(msg.status)).c_str());
            ++(m_elink_stats[msg.elink].dropped_msg);
            return;
        }

        size_t msg_size = m_buffer->compute_msg_dma_occupancy(msg.payload.size());
        while ( msg_size > m_buffer->dma_free_bytes()) {
            usleep(10);
        }
        m_buffer->encode_and_write(msg.elink, msg.payload.data(), msg.payload.size(), trickle);
        m_elink_stats[msg.elink].on_processed_msg(msg.payload.size());
    }

    if (trickle) { m_buffer->dma_start_circular_trickle_buffer(); }
}


template <class B>
std::map<local_elink_t, FromHostElinkStats> FromHostWriter<B>::get_writer_elink_stats()
{
    std::map<local_elink_t, FromHostElinkStats> output;
    for (auto & [key, value] : m_elink_stats){
        output[key] = value.get_increment(m_elink_stats_prev[key]);
    }
    return output;
}


#endif /* TOFLX_WRITER_H_ */