Program Listing for File register_device_controller.hpp

Return to documentation for file (register_device_controller.hpp)

#ifndef DEVICE_REGISTER_CONTROLLER_H_
#define DEVICE_REGISTER_CONTROLLER_H_

#include <cstdint>
#include <memory>
#include <stdint.h>
#include <string>
#include <format>
#include <variant>

#include "device.hpp"
#include "elink.hpp"
#include "bus.hpp"
#include "ers/ers.h"
#include "fromhost_message.hpp"

#include "regmap_manager.hpp"
#include "register_cmd_parser.hpp"
#include "register_device_interface.hpp"
#include "felix/felix_client_thread.hpp"
#include "config/config_register.hpp"
#include "publisher.hpp"
#include "receiver.hpp"

#include "network/netio3_evloop.hpp"
#include "network/utility.hpp"

#include "felix/felix_toflx.hpp"


ERS_DECLARE_ISSUE(felix_log, register_device_controller_issue, issue_message, ((const std::string&)issue_message))


template <class DEV>
class RegisterDeviceController
{

    public:

        RegisterDeviceController(
            Netio3EventLoop&       evtloop,
            RegmapManager &        regmap,
            std::shared_ptr<DEV>   opened_device,
            std::shared_ptr<DEV>   opened_primary_device,
            const ConfigRegister & config,
            unsigned int           d_idx
        );

        //Cmd callbacks
        void cmd_connection_established(const std::string& s){ERS_INFO(std::format("Command connection established. Conn. info: {}", s));};
        void cmd_connection_closed(const std::string& s){ERS_INFO(std::format("Command connection closed. Conn. info: {}", s));};
        void process_message(const std::vector<ToFlxMessage>& messages);

        //Public for unit test
        std::string process_requests(std::vector<ReqData>& req_vec);

    private:
        //Shared resources
        Netio3EventLoop& m_evtloop;
        RegisterDeviceInterface<DEV> m_devs;
        int m_dev_no{-1};
        uint64_t m_fid_reply{0};
        uint64_t m_fid_cmd{0};

        //Owned resources
        Bus m_bus_rep_tx;
        Bus m_bus_cmd_rx;
        bool m_do_reply;

        RegisterMsgParser m_parser;
        std::unique_ptr<Receiver>  m_cmd_receiver;
        std::unique_ptr<Publisher> m_reply_publisher;
};


template <class DEV>
RegisterDeviceController<DEV>::RegisterDeviceController(
    Netio3EventLoop&       evtloop,
    RegmapManager &        regmap,
    std::shared_ptr<DEV>   open_device,
    std::shared_ptr<DEV>   open_primary_device,
    const ConfigRegister & c,
    unsigned int           d_idx)
        : m_evtloop(evtloop), m_devs(regmap, open_device, open_primary_device), m_dev_no(c.dconfs.at(d_idx).dev_no),
          m_fid_reply(c.dconfs.at(d_idx).fid_reply), m_fid_cmd(c.dconfs.at(d_idx).fid_cmd),
          m_bus_rep_tx{c.bus_dir, c.bus_group, "register-pub-"+std::to_string(m_dev_no), c.verbose_bus},
          m_bus_cmd_rx{c.bus_dir, c.bus_group, "register-cmd-"+std::to_string(m_dev_no), c.verbose_bus},
          m_do_reply{c.enable_reply}
{

    constexpr static int net_pages{64};
    constexpr static int net_page_size{4096};
    constexpr static int net_page_timeout{0};

    auto fid_cmd = Elink(m_fid_cmd);
    auto fid_reply = Elink(m_fid_reply);
    const std::string controller_info = "RegDevCtrl";

    if (c.enable_cmd) {
        int port_cmd = c.dconfs.at(d_idx).port_cmd;
        m_cmd_receiver = network::utility::create_buffered_receiver(c.network_mode, c.local_ip, port_cmd, net_pages, net_page_size, controller_info);
        m_bus_cmd_rx.publish(std::vector<felixbus::FelixBusElinkInfo>{felixbus::FelixBusElinkInfo{
            .fid = fid_cmd.fid,
            .ip = c.local_ip,
            .port = static_cast<std::uint16_t>(port_cmd),
            .unbuffered = false,
            .pubsub = false,
            .tcp = c.network_mode == NetworkMode::tcp,
            .stream = fid_cmd.has_streams,
            .netio_pages = net_pages,
            .netio_pagesize = net_page_size,
        }});
        m_cmd_receiver->set_conn_open_callback([this](const std::string& s){cmd_connection_established(s);});
        m_cmd_receiver->set_conn_close_callback([this](const std::string& s){cmd_connection_closed(s);});
        m_cmd_receiver->set_on_msg_callback([this](const std::vector<ToFlxMessage>& messages){process_message(messages);});
    }

    int port_reply = c.dconfs.at(d_idx).port_reply;
    m_reply_publisher = network::utility::create_buffered_publisher(c.network_mode, c.local_ip, port_reply, net_pages, net_page_size, net_page_timeout, UINT32_MAX, controller_info, std::chrono::milliseconds{c.mon_interval});

    m_bus_rep_tx.publish(std::vector<felixbus::FelixBusElinkInfo>{felixbus::FelixBusElinkInfo{
        .fid = fid_reply.fid,
        .ip = c.local_ip,
        .port = static_cast<std::uint16_t>(port_reply),
        .unbuffered = false,
        .pubsub = true,
        .tcp = c.network_mode == NetworkMode::tcp,
        .stream = fid_reply.has_streams,
        .netio_pages = net_pages,
        .netio_pagesize = net_page_size,
    }});
    //evloop started by owner
}


template <class DEV>
std::string RegisterDeviceController<DEV>::process_requests(std::vector<ReqData>& req_vec)
{
    for (auto& cmd_msg : req_vec) {
    //Further processing only if the command is correct
        if (cmd_msg.status_code == FelixClientThread::OK) {
            switch(cmd_msg.cmd) {
                case FelixClientThread::Cmd::GET:
                {
                    if (not m_devs.can_read(cmd_msg.resource_name)) {
                        ers::error(felix_log::register_device_controller_issue(ERS_HERE, std::format("Remote endpoint requested invalid Read register {}", cmd_msg.resource_name)));
                        cmd_msg.status_code = FelixClientThread::ERROR_INVALID_REGISTER;
                        cmd_msg.status_message = std::string("Invalid register name '") + cmd_msg.resource_name + "'";
                    } else {
                        cmd_msg.value = std::to_string(m_devs.get_register(cmd_msg.resource_name));
                        ERS_DEBUG(1, std::format( "Read {}, value {}", cmd_msg.resource_name, cmd_msg.value));
                    }
                    break;
                }
                case FelixClientThread::Cmd::SET:
                {
                    if (not m_devs.can_write(cmd_msg.resource_name)) {
                        ers::error(felix_log::register_device_controller_issue(ERS_HERE, std::format("Remote endpoint requested invalid Write register {}", cmd_msg.resource_name)));
                        cmd_msg.status_code = FelixClientThread::ERROR_NOT_AUTHORIZED;
                        cmd_msg.status_message = std::string("Not authorized to set register name '") + cmd_msg.resource_name + "'";
                    } else {
                        uint64_t value = std::stoul(cmd_msg.value, nullptr, 0);
                        m_devs.set_register(cmd_msg.resource_name, value);
                        ERS_DEBUG(1, std::format( "Set {} to {}", cmd_msg.resource_name, value));
                    }
                    break;
                }
                case FelixClientThread::Cmd::ECR_RESET:
                {
                    uint64_t value = std::stoul(cmd_msg.value, nullptr, 0);
                    ERS_INFO(std::format("ECR reset with new EC {}", value));
                    m_devs.set_register("TTC_DEC_CTRL_XL1ID_SW", value);
                    m_devs.set_register("TTC_DEC_CTRL_XL1ID_RST", 0x1);
                    m_devs.set_register("TTC_DEC_CTRL_XL1ID_RST", 0x0);
                    break;
                }
                default:
                {
                    ers::error(felix_log::register_device_controller_issue(ERS_HERE, std::format("Command (code {}) correctly parsed cannot be processed", static_cast<int>(cmd_msg.cmd))));
                    break;
                }
            }
        }
    }
    return m_parser.encode_replies(m_fid_reply, req_vec);
}


template <class DEV>
void RegisterDeviceController<DEV>::process_message(const std::vector<ToFlxMessage>& messages)
{
    for (const auto& msg : messages) {
        std::vector<ReqData> parsed_msgs = m_parser.parse_commands(reinterpret_cast<const char*>(msg.payload.data()), msg.payload.size());
        auto reply_msg = process_requests(parsed_msgs);
        auto reply_ptr = reinterpret_cast<uint8_t*>(reply_msg.data());
        if (m_do_reply){
            std::ignore = m_reply_publisher->publish(m_fid_reply, reply_ptr, reply_msg.size());
            std::ignore = m_reply_publisher->flush(m_fid_reply);
        }
    }
}

#endif /* DEVICE_REGISTER_CONTROLLER_H */