Program Listing for File fromhost_writer.hpp
↰ Return to documentation for file (fromhost_writer.hpp)
#ifndef TOFLX_WRITER_H_
#define TOFLX_WRITER_H_
#include <cassert>
#include <format>
#include <memory>
#include <numeric>
#include "fromhost_buffer.hpp"
#include "receiver.hpp"
#include "elink.hpp"
#include "ers/ers.h"
#include "felix/felix_toflx.hpp"
ERS_DECLARE_ISSUE(felix_log, fromhost_writer_issue, issue_message, ((std::string)issue_message))
template <class BUF>
class FromHostWriter {
public:
FromHostWriter(std::shared_ptr<BUF> buffer, const std::vector<Elink> &elinks, std::unique_ptr<Receiver> receiver, int thread_id = 0);
FromHostWriter(std::shared_ptr<BUF> buffer, elink_type_t elinks_type, std::unique_ptr<Receiver> receiver, int thread_id = 0);
FromHostWriter(const FromHostWriter&) = delete;
FromHostWriter& operator=(const FromHostWriter&) = delete;
FromHostWriter(FromHostWriter&&) noexcept = default;
FromHostWriter& operator=(FromHostWriter&&) noexcept = default;
void connection_established(const std::string& s){ERS_INFO(std::format("Connection established. Conn. info from receiver: {}", s));};
void connection_closed(const std::string& s){ERS_INFO(std::format("Connection closed. Conn. info from receiver: {}", s));};
void encode_and_write(const std::vector<ToFlxMessage>& messages);
std::map<local_elink_t, FromHostElinkStats> get_writer_elink_stats();
unsigned int get_number_of_connections(){return m_receiver->get_number_of_connections();}
[[nodiscard]] int get_thread_id() const {
return m_thread_id;
}
[[nodiscard]] uint16_t get_port() const {
return m_receiver->get_port();
}
private:
int m_thread_id;
std::shared_ptr<BUF> m_buffer;
std::unique_ptr<Receiver> m_receiver;
std::map<local_elink_t, FromHostElinkStats> m_elink_stats;
std::map<local_elink_t, FromHostElinkStats> m_elink_stats_prev;
};
template <class BUF>
FromHostWriter<BUF>::FromHostWriter(std::shared_ptr<BUF> buffer, const std::vector<Elink> &elinks, std::unique_ptr<Receiver> receiver, int thread_id)
: m_thread_id{thread_id}, m_buffer{buffer}, m_receiver{std::move(receiver)}
{
for (const auto & e : elinks){
m_elink_stats.try_emplace(e.lid, e.fid);
}
m_elink_stats_prev = m_elink_stats;
m_receiver->set_conn_open_callback([this](const std::string& s){connection_established(s);});
m_receiver->set_conn_close_callback([this](const std::string& s){connection_closed(s);});
m_receiver->set_on_msg_callback([this](const std::vector<ToFlxMessage>& messages){encode_and_write(messages);});
}
template <class BUF>
FromHostWriter<BUF>::FromHostWriter(std::shared_ptr<BUF> buffer, elink_type_t type, std::unique_ptr<Receiver> receiver, int thread_id)
: FromHostWriter{buffer, buffer->get_elinks_of_type(type), std::move(receiver), thread_id} { }
template <class BUF>
void FromHostWriter<BUF>::encode_and_write(const std::vector<ToFlxMessage>& messages) {
size_t tot_payload_size = std::accumulate(messages.begin(), messages.end(), size_t{0}, [](size_t acc, const ToFlxMessage& msg){return acc + msg.payload.size();});
if (tot_payload_size > m_buffer->get_size()) {
ers::error(felix_log::fromhost_writer_issue(ERS_HERE, std::format("Dropping message of size {} B, larger than entire buffer of size {} B", tot_payload_size, m_buffer->get_size())));
return;
}
std::unique_lock<std::mutex> lk(m_buffer->m_buffer_mutex, std::defer_lock);
if (m_buffer->has_multiple_writers()) {
lk.lock();
}
ERS_DEBUG(1, std::format("Received network msg of total length {} B", tot_payload_size).c_str());
for(const ToFlxMessage& msg : messages){
if(msg.status != ToFlxMessage::Status::MessageOk){
ers::error(felix_log::fromhost_writer_issue(ERS_HERE, std::format("Message status not OK: {}. Dropping message.", ToFlxMessage::statusToString(msg.status)).c_str()));
++(m_elink_stats[msg.elink].dropped_msg);
return;
}
size_t msg_size = m_buffer->compute_msg_dma_occupancy(msg.payload.size());
while ( msg_size > m_buffer->dma_free_bytes()) {
usleep(10);
}
m_buffer->encode_and_write(msg.elink, msg.payload.data(), msg.payload.size());
m_elink_stats[msg.elink].on_processed_msg(msg.payload.size());
}
}
template <class BUF>
std::map<local_elink_t, FromHostElinkStats> FromHostWriter<BUF>::get_writer_elink_stats()
{
std::map<local_elink_t, FromHostElinkStats> output;
for (auto & [key, value] : m_elink_stats){
output[key] = value.get_increment(m_elink_stats_prev[key]);
}
return output;
}
#endif /* TOFLX_WRITER_H_ */