Program Listing for File NetioSubscriber.cpp

Return to documentation for file (NetioSubscriber.cpp)

#include "netio3/NetioSubscriber.hpp"

#include <chrono>
#include <memory>
#include <cstdint>
#include <cstring>
#include <set>
#include <span>
#include <string>
#include <utility>
#include <vector>

#include <ers/ers.h>

#include <netio3-backend/EventLoop/BaseEventLoop.hpp>
#include <netio3-backend/Issues.hpp>
#include <netio3-backend/Netio3Backend.hpp>

#include "netio3/Utility.hpp"
#include "netio3/SubscriptionRequest.hpp"
#include "netio3/NetioReceiver.hpp"
#include "netio3/NetioIssues.hpp"

using namespace netio3;
using namespace std::chrono_literals;

NetioSubscriber::NetioSubscriber(const NetioSubscriberConfig& config,
                                 std::shared_ptr<BaseEventLoop> evloop,
                                 const std::string& local_ip)
    : m_local_ip(local_ip),
      m_thread_safe{config.thread_safety == ThreadSafetyModel::SAFE} {

    ERS_DEBUG(5, "Entered");

    const NetworkConfig backend_config{
        .mode=NetworkMode::TCP,
        .thread_safety = config.thread_safety,
        .conn_params{},
        .callbacks{
            .on_data_cb = [&](std::span<const std::uint8_t> data) {
                on_acknowledgement(data);},
            .on_connection_established_cb = [&](const EndPointAddress& remote_ep, const EndPointAddress& local_ep, EndpointCapabilities /*cap*/) {
                control_connection_established(remote_ep, local_ep);},
            .on_connection_closed_cb = [&](const EndPointAddress& ep, const std::vector<std::uint64_t>& /*keys*/) {
                connection_lost(ep);},
            .on_connection_refused_cb = [&](const EndPointAddress& ep) {
                connection_refused(ep);},
            .on_send_completed_cb = nullptr
        },
        .local_ip = m_local_ip == "0.0.0.0" ? std::nullopt : std::optional<std::string>(m_local_ip),
    };
    m_subscriber_backend = NetworkBackend::create(
      NetworkType::ASYNCMSG, backend_config, evloop);

    const NetioReceiverConfig rec_config {
        .backend_type = config.backend_type,
        .backend_mode = config.backend_mode,
        .thread_safety = config.thread_safety,
    };
    m_dataReceiver = std::make_unique<NetioReceiver>(rec_config, evloop);
    m_dataReceiver->set_on_connection_closed_cb(
        [&](const EndPointAddress&) {
            ERS_DEBUG(5, "***  Receiver connection closed!!!! ***");
        });


    ERS_DEBUG(5, "Finished");
}

NetioSubscriber::~NetioSubscriber() {
    ERS_DEBUG(1, "Entered " << m_subscriptions.size()
           << " subscriptions still in list");
    m_subscriber_backend.reset();
    m_dataReceiver.reset();
    ERS_DEBUG(5, "Finished");
}

void NetioSubscriber::control_connection_established(const EndPointAddress& remote_ep,
                                                     const EndPointAddress& local_ep) {
    ERS_DEBUG(5, "Entered with remote_ep " << remote_ep << ", local_ep " << local_ep);
    {
        const utility::ConditionalLockGuard lock(m_mutex, m_thread_safe);
        m_acknowledge_port.insert({remote_ep, local_ep.port()});
    }
}


void NetioSubscriber::connection_lost(const EndPointAddress& ep) {
    ERS_DEBUG(5, "connection to " << ep << " closed");
    {
        ERS_DEBUG(5, "Locking mutex " << m_mutex.native_handle());
        const utility::ConditionalLockGuard lock(m_mutex, m_thread_safe);
        ERS_DEBUG(5, "Locked mutex " << m_mutex.native_handle());

        if (m_connections.contains(ep)) {
            std::set<uint64_t> tags;
            for (auto tag : m_connections.at(ep)) {
                m_subscriptions.erase(tag);
                --m_num_connections_per_receiver.at(m_tag_to_conn_params.at(tag));
                // Close listen connection if nothing is subscribed to this receiver anymore
                bool close_receiver =
                    (m_num_connections_per_receiver.at(m_tag_to_conn_params.at(tag)) == 0);
                if (close_receiver) {
                    m_dataReceiver->close({m_local_ip, m_listen_ports.at(m_tag_to_conn_params.at(tag))});
                    m_listen_ports.erase(m_tag_to_conn_params.at(tag));
                    m_num_connections_per_receiver.erase(m_tag_to_conn_params.at(tag));
                }
                m_tag_to_conn_params.erase(tag);
            }
            tags = m_connections.at(ep);
            m_connections.erase(ep);
            m_on_subscription_lost_cb(tags);
        }
        if (m_closing_connections.contains(ep)) {
            m_closing_connections.erase(ep);
        }
        if (m_acknowledge_port.contains(ep)) {
            m_acknowledge_port.erase(ep);
        }
    }

    ERS_DEBUG(5, "Finished");
}

void NetioSubscriber::connection_refused(const EndPointAddress& ep) {
    ERS_DEBUG(5, "Locking mutex " << m_mutex.native_handle());
    const utility::ConditionalLockGuard lock(m_mutex, m_thread_safe);
    ERS_DEBUG(5, "Locked mutex " << m_mutex.native_handle());

    if (m_connections.contains(ep)) {
        if (m_connections.at(ep).empty()) {
            m_connections.erase(ep);
        }
        else {
            ers::fatal(ConnectionNotIdle(ERS_HERE, ep.address(), ep.port()));
        }
    }
    if (m_closing_connections.contains(ep)) {
        m_closing_connections.erase(ep);
    }
}

void NetioSubscriber::on_acknowledgement(std::span<const std::uint8_t> data) {
    const auto ack = SubscriptionAcknowledgement::from_json(std::string(data.begin(), data.end()));
    ERS_DEBUG(5, (ack.subscribe==SubscriptionRequest::Sub::SUBSCRIBE ? "S" : "Uns")
              <<"ubscription acknowledged: tag=" << std::hex << ack.tag);

    if (ack.subscribe == SubscriptionRequest::Sub::SUBSCRIBE) {
        m_on_subscription (ack.tag);
    }
    else if (ack.subscribe == SubscriptionRequest::Sub::UNSUBSCRIBE) {
        m_on_unsubscription (ack.tag);

        ERS_DEBUG(5, "Locking mutex " << m_mutex.native_handle());
        const utility::ConditionalLockGuard lock(m_mutex, m_thread_safe);

        if (m_closing_tags.contains(ack.tag)) {
            auto remote_ep = m_closing_tags.at(ack.tag);
            ERS_DEBUG (5, "Closing connection to " << remote_ep);
            if (m_subscriber_backend) {
                m_subscriber_backend->close_active_endpoint(remote_ep);
                m_closing_tags.erase(ack.tag);
            }
        }
    }
}

NetioStatus NetioSubscriber::send_subscription(const SubscriptionRequest& req,
                                               const EndPointAddress& remoteEp) {
    ERS_DEBUG(5,
              "Sending " << (req.subscribe == SubscriptionRequest::Sub::SUBSCRIBE ? "" : "un")
                         << "subscription request");
    // Note this is only allowed for ASYNCMSG as it copies the data. Libfabric would have a dangling pointer
    auto req_string = SubscriptionRequest::to_json(req);
    ERS_DEBUG(4, "SubscriptionRequest=" << req_string);

    auto req_span = std::span<const uint8_t>(
        reinterpret_cast<const uint8_t*>(req_string.data()), req_string.size());
    return m_subscriber_backend->send_data_copy(remoteEp,
                                                req_span,
                                                std::span<uint8_t>{}, 0);
}

NetioStatus NetioSubscriber::subscribe(uint64_t tag,
                                       const EndPointAddress& remoteEp,
                                       const ConnectionParametersRecv& conn_params) {
    ERS_DEBUG(5, "Locking mutex " << m_mutex.native_handle());
    const utility::ConditionalUniqueLock lock(m_mutex, m_thread_safe);

    if (m_subscriptions.contains(tag)) {
        ers::error (AlreadySubscribed(ERS_HERE, tag));
        return NetioStatus::FAILED;
    }
    // Currently closing, retry after closed
    if (m_closing_connections.contains(remoteEp)) {
        return NetioStatus::NO_RESOURCES;
    }
    if (!m_connections.contains(remoteEp)) {
        ERS_DEBUG(5, "opening active endpoint for " << remoteEp);
        try {
            m_subscriber_backend->open_active_endpoint(remoteEp,
                                                       {{},{},{}});
        }
        catch (FailedOpenActiveEndpoint& issue) {
            ers::warning (issue);
            return NetioStatus::FAILED;
        }
    }

    m_connections.insert({remoteEp, {}});
    if (not m_listen_ports.contains(conn_params)) {
        m_listen_ports.try_emplace(conn_params, m_dataReceiver->listen({m_local_ip, 0}, conn_params));
        m_num_connections_per_receiver.try_emplace(conn_params, 0);
    }
    ++m_num_connections_per_receiver.at(conn_params);
    m_tag_to_conn_params[tag] = conn_params;

    NetioStatus status = NetioStatus::NO_RESOURCES;
    if (m_acknowledge_port.contains(remoteEp)) {
        const auto req = SubscriptionRequest{
            .tag = tag,
            .addr = m_local_ip,
            .data_port = m_listen_ports.at(conn_params),
            .ack_port = m_acknowledge_port.at(remoteEp),
            .subscribe = SubscriptionRequest::Sub::SUBSCRIBE,
        };
        status = send_subscription(req, remoteEp);
        if (status == NetioStatus::OK) {
            m_subscriptions.emplace(tag);
            m_connections[remoteEp].insert(tag);
        }
        else {
            ERS_DEBUG(6, "Failed to send subscription");
        }
    }
    return status;
}

NetioStatus NetioSubscriber::unsubscribe(uint64_t tag,
                                         const EndPointAddress& remoteEp) {
    ERS_DEBUG (5, std::format("Entered: tag {:x}, ep", tag) << remoteEp);
    NetioStatus status = NetioStatus::OK;
    {
        ERS_DEBUG(5, "Locking mutex " << m_mutex.native_handle());
        const utility::ConditionalLockGuard lock(m_mutex, m_thread_safe);
        ERS_DEBUG(5, "Locked mutex " << m_mutex.native_handle());
        if (not m_subscriptions.contains(tag)) {
            ers::error (NotSubscribed(ERS_HERE, tag));
            return NetioStatus::FAILED;
        }
        const auto req = SubscriptionRequest{
            .tag = tag,
            .addr = m_local_ip,
            .data_port = m_listen_ports.at(m_tag_to_conn_params.at(tag)),
            .ack_port = m_acknowledge_port.at(remoteEp),
            .subscribe = SubscriptionRequest::Sub::UNSUBSCRIBE,
        };
        status = send_subscription(req, remoteEp);
        if (status == NetioStatus::OK) {
            m_subscriptions.erase(tag);

            m_connections.at(remoteEp).erase(tag);
            if (m_connections.at(remoteEp).empty()) {
                ERS_DEBUG (5, "Removed last subscription to " << remoteEp << ", scheduling closing of connection");
                m_closing_connections.emplace(remoteEp);
                m_closing_tags.insert({tag, remoteEp});
                m_connections.erase(remoteEp);
            }

            // Close listen connection if nothing is subscribed to this receiver anymore
            --m_num_connections_per_receiver.at(m_tag_to_conn_params.at(tag));
            if (m_num_connections_per_receiver.at(m_tag_to_conn_params.at(tag)) == 0) {
                m_dataReceiver->close({m_local_ip, m_listen_ports.at(m_tag_to_conn_params.at(tag))});
                m_listen_ports.erase(m_tag_to_conn_params.at(tag));
                m_num_connections_per_receiver.erase(m_tag_to_conn_params.at(tag));
            }
            m_tag_to_conn_params.erase(tag);
        }
        else {        // What do we do if the send failed??
            ers::error (FailedSubscribe(ERS_HERE, "unsubscription", tag));
        }
    }

    ERS_DEBUG(5, "Finished");
    return status;
}

void NetioSubscriber::set_on_subscription_cb(const CbSubscriptionConfirmed& cb) {
    m_on_subscription = cb;
}

void NetioSubscriber::set_on_unsubscription_cb(const CbUnsubscriptionConfirmed& cb) {
    m_on_unsubscription = cb;
}

void NetioSubscriber::set_on_subscription_lost_cb(const CbSubscriptionLost& cb) {
    m_on_subscription_lost_cb = cb;
}

void NetioSubscriber::set_on_data_cb(const CbMessageReceived& cb) {
    // Forward this on to our NetioReceiver
    m_dataReceiver->set_on_data_cb(cb);
}

void NetioSubscriber::set_on_buffer_cb(const CbBufferReceived& cb) {
    // Forward this on to our NetioReceiver
    m_dataReceiver->set_on_buffer_cb(cb);
}