Program Listing for File trickle_manager.cpp
↰ Return to documentation for file (trickle/trickle_manager.cpp)
#include "trickle_manager.hpp"
#include <cstdint>
#include <felix/felix_fid.h>
#include <functional>
#include "elink.hpp"
namespace trickle {
template <class Buffer>
TrickleManager<Buffer>::TrickleManager(std::shared_ptr<Buffer> buffer, std::unique_ptr<Receiver> receiver)
: m_buffer{std::move(buffer)}, m_receiver{std::move(receiver)}
{
m_receiver->set_conn_open_callback([this](const std::string& s){connection_established(s);});
m_receiver->set_conn_close_callback([this](const std::string& s){connection_closed(s);});
m_receiver->set_on_msg_callback([this](const std::vector<ToFlxMessage>& messages){message_received(messages);});
}
template <class Buffer>
void TrickleManager<Buffer>::message_received(const std::vector<ToFlxMessage>& messages) {
// msg.payload.size() might be bigger than what will actually go inside the buffer, because in the payload there is a JSON message that has some overhead
const size_t tot_payload_size = std::accumulate(messages.begin(), messages.end(), size_t{0}, [](size_t acc, const ToFlxMessage& msg){return acc + msg.payload.size();});
if (tot_payload_size > m_buffer->get_size()) {
ers::error(felix_log::trickle_manager_issue(std::format("Dropping message of size {} Buffer, larger than entire buffer of size {} Buffer", tot_payload_size, m_buffer->get_size())));
return;
}
for(const ToFlxMessage& msg : messages){
if(msg.status != ToFlxMessage::Status::MessageOk){
ers::error(felix_log::trickle_manager_issue(std::format("Message status not OK: {}. Dropping message.", ToFlxMessage::statusToString(msg.status))));
return;
}
try {
process_message(msg);
} catch (const std::exception& ex) {
ers::error(felix_log::trickle_manager_issue(ex.what()));
}
}
}
template <class Buffer>
void TrickleManager<Buffer>::process_message(const ToFlxMessage& message) {
using json = nlohmann::json;
const auto command_obj = std::invoke([&message] {
try {
return json::from_cbor(message.payload);
} catch (const json::parse_error& e) {
throw felix_log::trickle_manager_issue(std::format("Invalid CBOR format: {}", e.what()));
}
});
if (not command_obj.contains("command")) {
throw felix_log::trickle_manager_issue("Message missing required 'command' field");
}
const std::string command = command_obj["command"];
ERS_DEBUG(2, std::format("Received JSON command: {}", command));
if (command == "send_config") {
handle_send_config(command_obj);
} else if (command == "start_trickle") {
start_trickle();
} else if (command == "stop_trickle") {
stop_trickle();
} else if (command == "select_bcids") {
select_bcids(command_obj);
} else if (command == "throttle"){
throttle(command_obj);
} else {
throw felix_log::trickle_manager_issue(std::format("Unknown command: {}", command));
}
}
template <class Buffer>
void TrickleManager<Buffer>::handle_send_config(const nlohmann::json& command) {
if(not m_buffer->can_write_trickle()){
ers::error(felix_log::trickle_manager_issue("You cannot send a configuration while trickling. Run stop_trickle() first."));
return;
}
if (not command.contains("payload") || not command["payload"].is_array()) {
throw felix_log::trickle_manager_issue("send_config command requires 'payload' field as an array");
}
// Start writing from the beginning of the buffer.
m_buffer->dma_set_write_offset(0);
for (const auto& item : command["payload"]) {
if (not item.contains("fid") || not item.contains("data")) {
throw felix_log::trickle_manager_issue("In the payload there must be 'fid' and 'data' fields");
}
const uint64_t fid = std::invoke([&item](){
if (item["fid"].is_number()) {
return item["fid"].get<uint64_t>();
} else {
throw felix_log::trickle_manager_issue(std::format("FID is not a number: {}. Use the decimal form of FID", item["fid"].dump()));
}
});
const std::vector<uint8_t> data = std::invoke([&item]() {
if (item["data"].is_binary()) {
return item["data"].get_binary();
} else {
throw felix_log::trickle_manager_issue("data field must be binary CBOR");
}
});
ERS_DEBUG(3, std::format("Writing {} bytes to fid {:#x}", data.size(), fid));
if(not m_buffer->can_write_trickle()){
ers::error(felix_log::trickle_manager_issue("Firmware might be reading in DMA buffer, cannot write trickle configuration."));
return;
}
m_buffer->encode_and_write(get_elink(fid), data.data(), data.size());
}
add_padding();
}
template <class Buffer>
void TrickleManager<Buffer>::start_trickle() {
if(not m_buffer->can_write_trickle()){
ers::error(felix_log::trickle_manager_issue("Already trickling, first run stop_trickle()"));
} else {
ERS_INFO("Starting trickle configuration");
m_buffer->dma_start_circular_trickle_buffer();
}
}
template <class Buffer>
void TrickleManager<Buffer>::stop_trickle() {
ERS_INFO("Stopping trickle configuration");
m_buffer->set_oneshot_trickle_buffer();
}
template <class Buffer>
void TrickleManager<Buffer>::select_bcids(const nlohmann::json &command) {
if (not command.contains("first_bcid") || not command.contains("last_bcid")) {
throw felix_log::trickle_manager_issue("select_bcids command missing first_bcid or last_bcid fields");
}
m_first_bcid = command["first_bcid"];
m_last_bcid = command["last_bcid"];
ERS_INFO("The select_bcids() function is not yet supported");
}
template <class Buffer>
void TrickleManager<Buffer>::throttle(const nlohmann::json &command) {
if (not command.contains("throttle_factor")) {
throw felix_log::trickle_manager_issue("throttle command missing throttle_factor field");
}
m_throttle_factor = command["throttle_factor"];
ERS_INFO("The throttle() function is not yet supported");
}
template <class Buffer>
void TrickleManager<Buffer>::add_padding() {
m_buffer->trickle_padding();
}
} // namespace trickle
// Explicit instantiations for the specific template combinations used
namespace trickle {
template class TrickleManager<FlxFromHostBuffer>;
template class TrickleManager<FileFromHostBuffer>;
}