Program Listing for File fromhost_writer.hpp

Return to documentation for file (fromhost_writer.hpp)

#ifndef TOFLX_WRITER_H_
#define TOFLX_WRITER_H_

#include <cassert>
#include <format>
#include <memory>
#include <numeric>
#include "fromhost_buffer.hpp"
#include "receiver.hpp"
#include "elink.hpp"
#include "ers/ers.h"

#include "felix/felix_toflx.hpp"

ERS_DECLARE_ISSUE(felix_log, fromhost_writer_issue, issue_message, ((std::string)issue_message))


template <class BUF>
class FromHostWriter {

    public:
        FromHostWriter(std::shared_ptr<BUF> buffer, const std::vector<Elink> &elinks, std::unique_ptr<Receiver> receiver, int thread_id = 0);

        FromHostWriter(std::shared_ptr<BUF> buffer, elink_type_t elinks_type, std::unique_ptr<Receiver> receiver, int thread_id = 0);

        FromHostWriter(const FromHostWriter&) = delete;
        FromHostWriter& operator=(const FromHostWriter&) = delete;

        FromHostWriter(FromHostWriter&&) noexcept = default;
        FromHostWriter& operator=(FromHostWriter&&) noexcept = default;

        void connection_established(const std::string& s){ERS_INFO(std::format("Connection established. Conn. info from receiver: {}", s));};

        void connection_closed(const std::string& s){ERS_INFO(std::format("Connection closed. Conn. info from receiver: {}", s));};

        void encode_and_write(const std::vector<ToFlxMessage>& messages);

        std::map<local_elink_t, FromHostElinkStats> get_writer_elink_stats();

        unsigned int get_number_of_connections(){return m_receiver->get_number_of_connections();}

        [[nodiscard]] int get_thread_id() const {
            return m_thread_id;
        }

        [[nodiscard]] uint16_t get_port() const {
            return m_receiver->get_port();
        }

    private:
        int m_thread_id;
        std::shared_ptr<BUF> m_buffer;
        std::unique_ptr<Receiver> m_receiver;
        std::map<local_elink_t, FromHostElinkStats> m_elink_stats;
        std::map<local_elink_t, FromHostElinkStats> m_elink_stats_prev;
};


template <class BUF>
FromHostWriter<BUF>::FromHostWriter(std::shared_ptr<BUF> buffer, const std::vector<Elink> &elinks, std::unique_ptr<Receiver> receiver, int thread_id)
    : m_thread_id{thread_id}, m_buffer{buffer}, m_receiver{std::move(receiver)}
{
    for (const auto & e : elinks){
        m_elink_stats.try_emplace(e.lid, e.fid);
    }

    m_elink_stats_prev = m_elink_stats;
    m_receiver->set_conn_open_callback([this](const std::string& s){connection_established(s);});
    m_receiver->set_conn_close_callback([this](const std::string& s){connection_closed(s);});
    m_receiver->set_on_msg_callback([this](const std::vector<ToFlxMessage>& messages){encode_and_write(messages);});
}


template <class BUF>
FromHostWriter<BUF>::FromHostWriter(std::shared_ptr<BUF> buffer, elink_type_t type, std::unique_ptr<Receiver> receiver, int thread_id)
    : FromHostWriter{buffer, buffer->get_elinks_of_type(type), std::move(receiver), thread_id} { }


template <class BUF>
void FromHostWriter<BUF>::encode_and_write(const std::vector<ToFlxMessage>& messages) {

    size_t tot_payload_size = std::accumulate(messages.begin(), messages.end(), size_t{0}, [](size_t acc, const ToFlxMessage& msg){return acc + msg.payload.size();});

    if (tot_payload_size > m_buffer->get_size()) {
        ers::error(felix_log::fromhost_writer_issue(ERS_HERE, std::format("Dropping message of size {} B, larger than entire buffer of size {} B", tot_payload_size, m_buffer->get_size())));
        return;
    }

    std::unique_lock<std::mutex> lk(m_buffer->m_buffer_mutex, std::defer_lock);
    if (m_buffer->has_multiple_writers()) {
        lk.lock();
    }

    ERS_DEBUG(1, std::format("Received network msg of total length {} B", tot_payload_size).c_str());

    for(const ToFlxMessage& msg : messages){
        if(msg.status != ToFlxMessage::Status::MessageOk){
            ers::error(felix_log::fromhost_writer_issue(ERS_HERE, std::format("Message status not OK: {}. Dropping message.", ToFlxMessage::statusToString(msg.status)).c_str()));
            ++(m_elink_stats[msg.elink].dropped_msg);
            return;
        }

        size_t msg_size = m_buffer->compute_msg_dma_occupancy(msg.payload.size());
        while ( msg_size > m_buffer->dma_free_bytes()) {
            usleep(10);
        }
        m_buffer->encode_and_write(msg.elink, msg.payload.data(), msg.payload.size());
        m_elink_stats[msg.elink].on_processed_msg(msg.payload.size());
    }

}


template <class BUF>
std::map<local_elink_t, FromHostElinkStats> FromHostWriter<BUF>::get_writer_elink_stats()
{
    std::map<local_elink_t, FromHostElinkStats> output;
    for (auto & [key, value] : m_elink_stats){
        output[key] = value.get_increment(m_elink_stats_prev[key]);
    }
    return output;
}


#endif /* TOFLX_WRITER_H_ */