Program Listing for File trickle_manager.cpp

Return to documentation for file (trickle/trickle_manager.cpp)

#include "trickle_manager.hpp"

#include <cstdint>
#include <felix/felix_fid.h>
#include <functional>

#include "elink.hpp"


namespace trickle {

template <class Buffer>
TrickleManager<Buffer>::TrickleManager(std::shared_ptr<Buffer> buffer, std::unique_ptr<Receiver> receiver)
    : m_buffer{std::move(buffer)}, m_receiver{std::move(receiver)}
{
    m_receiver->set_conn_open_callback([this](const std::string& s){connection_established(s);});
    m_receiver->set_conn_close_callback([this](const std::string& s){connection_closed(s);});
    m_receiver->set_on_msg_callback([this](const std::vector<ToFlxMessage>& messages){message_received(messages);});
}


template <class Buffer>
void TrickleManager<Buffer>::message_received(const std::vector<ToFlxMessage>& messages) {
    // msg.payload.size() might be bigger than what will actually go inside the buffer, because in the payload there is a JSON message that has some overhead
    const size_t tot_payload_size = std::accumulate(messages.begin(), messages.end(), size_t{0}, [](size_t acc, const ToFlxMessage& msg){return acc + msg.payload.size();});

    if (tot_payload_size > m_buffer->get_size()) {
        ers::error(felix_log::trickle_manager_issue(std::format("Dropping message of size {} Buffer, larger than entire buffer of size {} Buffer", tot_payload_size, m_buffer->get_size())));
        return;
    }

    for(const ToFlxMessage& msg : messages){
        if(msg.status != ToFlxMessage::Status::MessageOk){
            ers::error(felix_log::trickle_manager_issue(std::format("Message status not OK: {}. Dropping message.", ToFlxMessage::statusToString(msg.status))));
            return;
        }

        try {
            process_message(msg);
        } catch (const std::exception& ex) {
            ers::error(felix_log::trickle_manager_issue(ex.what()));
        }
    }
}


template <class Buffer>
void TrickleManager<Buffer>::process_message(const ToFlxMessage& message) {
    using json = nlohmann::json;

    const auto command_obj = std::invoke([&message] {
        try {
            return json::from_cbor(message.payload);
        } catch (const json::parse_error& e) {
            throw felix_log::trickle_manager_issue(std::format("Invalid CBOR format: {}", e.what()));
        }
    });

    if (not command_obj.contains("command")) {
        throw felix_log::trickle_manager_issue("Message missing required 'command' field");
    }

    const std::string command = command_obj["command"];
    ERS_DEBUG(2, std::format("Received JSON command: {}", command));

    if (command == "send_config") {
        handle_send_config(command_obj);

    } else if (command == "start_trickle") {
        start_trickle();

    } else if (command == "stop_trickle") {
        stop_trickle();

    } else if (command == "select_bcids") {
        select_bcids(command_obj);

    } else if (command == "throttle"){
        throttle(command_obj);

    } else {
        throw felix_log::trickle_manager_issue(std::format("Unknown command: {}", command));
    }
}


template <class Buffer>
void TrickleManager<Buffer>::handle_send_config(const nlohmann::json& command) {
    if(not m_buffer->can_write_trickle()){
        ers::error(felix_log::trickle_manager_issue("You cannot send a configuration while trickling. Run stop_trickle() first."));
        return;
    }

    if (not command.contains("payload") || not command["payload"].is_array()) {
        throw felix_log::trickle_manager_issue("send_config command requires 'payload' field as an array");
    }

    // Start writing from the beginning of the buffer.
    m_buffer->dma_set_write_offset(0);

    for (const auto& item : command["payload"]) {
        if (not item.contains("fid") || not item.contains("data")) {
            throw felix_log::trickle_manager_issue("In the payload there must be 'fid' and 'data' fields");
        }

        const uint64_t fid = std::invoke([&item](){
            if (item["fid"].is_number()) {
                return item["fid"].get<uint64_t>();
            } else {
                throw felix_log::trickle_manager_issue(std::format("FID is not a number: {}. Use the decimal form of FID", item["fid"].dump()));
            }
        });

        const std::vector<uint8_t> data = std::invoke([&item]() {
            if (item["data"].is_binary()) {
                return item["data"].get_binary();
            } else {
                throw felix_log::trickle_manager_issue("data field must be binary CBOR");
            }
        });

        ERS_DEBUG(3, std::format("Writing {} bytes to fid {:#x}", data.size(), fid));
        if(not m_buffer->can_write_trickle()){
            ers::error(felix_log::trickle_manager_issue("Firmware might be reading in DMA buffer, cannot write trickle configuration."));
            return;
        }

        m_buffer->encode_and_write(get_elink(fid), data.data(), data.size());
    }
    add_padding();
}


template <class Buffer>
void TrickleManager<Buffer>::start_trickle() {
    if(not m_buffer->can_write_trickle()){
        ers::error(felix_log::trickle_manager_issue("Already trickling, first run stop_trickle()"));
    } else {
        ERS_INFO("Starting trickle configuration");
        m_buffer->dma_start_circular_trickle_buffer();
    }
}


template <class Buffer>
void TrickleManager<Buffer>::stop_trickle() {
    ERS_INFO("Stopping trickle configuration");
    m_buffer->set_oneshot_trickle_buffer();
}


template <class Buffer>
void TrickleManager<Buffer>::select_bcids(const nlohmann::json &command) {
    if (not command.contains("first_bcid") || not command.contains("last_bcid")) {
        throw felix_log::trickle_manager_issue("select_bcids command missing first_bcid or last_bcid fields");
    }

    m_first_bcid = command["first_bcid"];
    m_last_bcid = command["last_bcid"];
    ERS_INFO("The select_bcids() function is not yet supported");
}

template <class Buffer>
void TrickleManager<Buffer>::throttle(const nlohmann::json &command) {
    if (not command.contains("throttle_factor")) {
        throw felix_log::trickle_manager_issue("throttle command missing throttle_factor field");
    }
    m_throttle_factor = command["throttle_factor"];
    ERS_INFO("The throttle() function is not yet supported");
}

template <class Buffer>
void TrickleManager<Buffer>::add_padding() {
    m_buffer->trickle_padding();
}

} // namespace trickle

// Explicit instantiations for the specific template combinations used
namespace trickle {
    template class TrickleManager<FlxFromHostBuffer>;
    template class TrickleManager<FileFromHostBuffer>;
}