Program Listing for File fromhost_writer.hpp
↰ Return to documentation for file (fromhost_writer.hpp
)
#ifndef TOFLX_WRITER_H_
#define TOFLX_WRITER_H_
#include <cassert>
#include <format>
#include <memory>
#include <numeric>
#include "fromhost_buffer.hpp"
#include "receiver.hpp"
#include "elink.hpp"
#include "log.hpp"
#include "felix/felix_toflx.hpp"
template <class B>
class FromHostWriter {
public:
FromHostWriter(std::shared_ptr<B> buffer, const std::vector<Elink> &elinks, std::unique_ptr<Receiver> receiver, bool trickle = false);
FromHostWriter(std::shared_ptr<B> buffer, elink_type_t elinks_type, std::unique_ptr<Receiver> receiver);
FromHostWriter(const FromHostWriter&) = delete;
FromHostWriter& operator=(const FromHostWriter&) = delete;
FromHostWriter(FromHostWriter&&) noexcept = default;
FromHostWriter& operator=(FromHostWriter&&) noexcept = default;
void connection_established(const std::string& s){LOG_INFO("Connection established. Conn. info from receiver: %s", s.c_str());};
void connection_closed(const std::string& s){LOG_INFO("Connection closed. Conn. info from receiver: %s", s.c_str());};
void encode_and_write(const std::vector<ToFlxMessage>& messages, bool trickle = false);
std::map<local_elink_t, FromHostElinkStats> get_writer_elink_stats();
unsigned int get_number_of_connections(){return m_receiver->get_number_of_connections();}
uint32_t get_id() const {
return m_id;
}
private:
const uint32_t m_id;
std::shared_ptr<B> m_buffer;
std::unique_ptr<Receiver> m_receiver;
std::map<local_elink_t, FromHostElinkStats> m_elink_stats;
std::map<local_elink_t, FromHostElinkStats> m_elink_stats_prev;
};
template <class B>
FromHostWriter<B>::FromHostWriter(std::shared_ptr<B> buffer, const std::vector<Elink> &elinks, std::unique_ptr<Receiver> receiver, bool trickle /*= false*/)
: m_id(0), m_buffer(buffer), m_receiver{std::move(receiver)}
{
m_receiver->declare(elinks);
for (const auto & e : elinks){
m_elink_stats.emplace(std::piecewise_construct, std::forward_as_tuple(e.lid), std::forward_as_tuple(e.fid));
}
m_elink_stats_prev = m_elink_stats;
m_receiver->set_conn_open_callback([this](const std::string& s){connection_established(s);});
m_receiver->set_conn_close_callback([this](const std::string& s){connection_closed(s);});
m_receiver->set_on_msg_callback([this, trickle](const std::vector<ToFlxMessage>& messages){encode_and_write(messages, trickle);});
}
template <class B>
FromHostWriter<B>::FromHostWriter(std::shared_ptr<B> buffer, elink_type_t type, std::unique_ptr<Receiver> receiver)
: FromHostWriter(buffer, buffer->get_elinks_of_type(type), std::move(receiver)) { }
template <class B>
void FromHostWriter<B>::encode_and_write(const std::vector<ToFlxMessage>& messages, bool trickle) {
size_t tot_payload_size = std::accumulate(messages.begin(), messages.end(), size_t{0}, [](size_t acc, const ToFlxMessage& msg){return acc + msg.payload.size();});
if (tot_payload_size > m_buffer->get_size()) {
LOG_ERR(std::format("Dropping message of size {} B, larger than entire buffer of size {} B", tot_payload_size, m_buffer->get_size()).c_str());
return;
}
std::unique_lock<std::mutex> lk(m_buffer->m_buffer_mutex, std::defer_lock);
if (m_buffer->has_multiple_writers()) {
lk.lock();
}
if (trickle) {
m_buffer->set_oneshot_trickle_buffer(m_buffer->get_trickle_config_size());
m_buffer->set_trickle_config_size(tot_payload_size);
}
LOG_DBG(std::format("Received network msg of total length {} B", tot_payload_size).c_str());
for(const ToFlxMessage& msg : messages){
if(msg.status != ToFlxMessage::Status::MessageOk){
LOG_ERR(std::format("Message status not OK: {}. Dropping message.", ToFlxMessage::statusToString(msg.status)).c_str());
++(m_elink_stats[msg.elink].dropped_msg);
return;
}
size_t msg_size = m_buffer->compute_msg_dma_occupancy(msg.payload.size());
while ( msg_size > m_buffer->dma_free_bytes()) {
usleep(10);
}
m_buffer->encode_and_write(msg.elink, msg.payload.data(), msg.payload.size(), trickle);
m_elink_stats[msg.elink].on_processed_msg(msg.payload.size());
}
if (trickle) { m_buffer->dma_start_circular_trickle_buffer(); }
}
template <class B>
std::map<local_elink_t, FromHostElinkStats> FromHostWriter<B>::get_writer_elink_stats()
{
std::map<local_elink_t, FromHostElinkStats> output;
for (auto & [key, value] : m_elink_stats){
output[key] = value.get_increment(m_elink_stats_prev[key]);
}
return output;
}
#endif /* TOFLX_WRITER_H_ */