Program Listing for File NetioSubscriber.cpp
↰ Return to documentation for file (NetioSubscriber.cpp)
#include "netio3/NetioSubscriber.hpp"
#include <chrono>
#include <memory>
#include <cstdint>
#include <cstring>
#include <set>
#include <span>
#include <string>
#include <utility>
#include <vector>
#include <ers/ers.h>
#include <netio3-backend/EventLoop/BaseEventLoop.hpp>
#include <netio3-backend/Issues.hpp>
#include <netio3-backend/Netio3Backend.hpp>
#include "netio3/Utility.hpp"
#include "netio3/SubscriptionRequest.hpp"
#include "netio3/NetioReceiver.hpp"
#include "netio3/NetioIssues.hpp"
using namespace netio3;
using namespace std::chrono_literals;
NetioSubscriber::NetioSubscriber(const NetioSubscriberConfig& config,
std::shared_ptr<BaseEventLoop> evloop,
const std::string& local_ip)
: m_local_ip(local_ip),
m_thread_safe{config.thread_safety == ThreadSafetyModel::SAFE} {
ERS_DEBUG(5, "Entered");
const NetworkConfig backend_config{
.mode=NetworkMode::TCP,
.thread_safety = config.thread_safety,
.conn_params{},
.callbacks{
.on_data_cb = [&](std::span<const std::uint8_t> data) {
on_acknowledgement(data);},
.on_connection_established_cb = [&](const EndPointAddress& remote_ep, const EndPointAddress& local_ep, EndpointCapabilities /*cap*/) {
control_connection_established(remote_ep, local_ep);},
.on_connection_closed_cb = [&](const EndPointAddress& ep, const std::vector<std::uint64_t>& /*keys*/) {
connection_lost(ep);},
.on_connection_refused_cb = [&](const EndPointAddress& ep) {
connection_refused(ep);},
.on_send_completed_cb = nullptr
},
.local_ip = m_local_ip == "0.0.0.0" ? std::nullopt : std::optional<std::string>(m_local_ip),
};
m_subscriber_backend = NetworkBackend::create(
NetworkType::ASYNCMSG, backend_config, evloop);
const NetioReceiverConfig rec_config {
.backend_type = config.backend_type,
.backend_mode = config.backend_mode,
.thread_safety = config.thread_safety,
};
m_dataReceiver = std::make_unique<NetioReceiver>(rec_config, evloop);
m_dataReceiver->set_on_connection_closed_cb(
[&](const EndPointAddress&) {
ERS_DEBUG(5, "*** Receiver connection closed!!!! ***");
});
ERS_DEBUG(5, "Finished");
}
NetioSubscriber::~NetioSubscriber() {
ERS_DEBUG(1, "Entered " << m_subscriptions.size()
<< " subscriptions still in list");
m_subscriber_backend.reset();
m_dataReceiver.reset();
ERS_DEBUG(5, "Finished");
}
void NetioSubscriber::control_connection_established(const EndPointAddress& remote_ep,
const EndPointAddress& local_ep) {
ERS_DEBUG(5, "Entered with remote_ep " << remote_ep << ", local_ep " << local_ep);
{
const utility::ConditionalLockGuard lock(m_mutex, m_thread_safe);
m_acknowledge_port.insert({remote_ep, local_ep.port()});
}
}
void NetioSubscriber::connection_lost(const EndPointAddress& ep) {
ERS_DEBUG(5, "connection to " << ep << " closed");
{
ERS_DEBUG(5, "Locking mutex " << m_mutex.native_handle());
const utility::ConditionalLockGuard lock(m_mutex, m_thread_safe);
ERS_DEBUG(5, "Locked mutex " << m_mutex.native_handle());
if (m_connections.contains(ep)) {
std::set<uint64_t> tags;
for (auto tag : m_connections.at(ep)) {
m_subscriptions.erase(tag);
--m_num_connections_per_receiver.at(m_tag_to_conn_params.at(tag));
// Close listen connection if nothing is subscribed to this receiver anymore
bool close_receiver =
(m_num_connections_per_receiver.at(m_tag_to_conn_params.at(tag)) == 0);
if (close_receiver) {
m_dataReceiver->close({m_local_ip, m_listen_ports.at(m_tag_to_conn_params.at(tag))});
m_listen_ports.erase(m_tag_to_conn_params.at(tag));
m_num_connections_per_receiver.erase(m_tag_to_conn_params.at(tag));
}
m_tag_to_conn_params.erase(tag);
}
tags = m_connections.at(ep);
m_connections.erase(ep);
m_on_subscription_lost_cb(tags);
}
if (m_closing_connections.contains(ep)) {
m_closing_connections.erase(ep);
}
if (m_acknowledge_port.contains(ep)) {
m_acknowledge_port.erase(ep);
}
}
ERS_DEBUG(5, "Finished");
}
void NetioSubscriber::connection_refused(const EndPointAddress& ep) {
ERS_DEBUG(5, "Locking mutex " << m_mutex.native_handle());
const utility::ConditionalLockGuard lock(m_mutex, m_thread_safe);
ERS_DEBUG(5, "Locked mutex " << m_mutex.native_handle());
if (m_connections.contains(ep)) {
if (m_connections.at(ep).empty()) {
m_connections.erase(ep);
}
else {
ers::fatal(ConnectionNotIdle(ERS_HERE, ep.address(), ep.port()));
}
}
if (m_closing_connections.contains(ep)) {
m_closing_connections.erase(ep);
}
}
void NetioSubscriber::on_acknowledgement(std::span<const std::uint8_t> data) {
const auto ack = SubscriptionAcknowledgement::from_json(std::string(data.begin(), data.end()));
ERS_DEBUG(5, (ack.subscribe==SubscriptionRequest::Sub::SUBSCRIBE ? "S" : "Uns")
<<"ubscription acknowledged: tag=" << std::hex << ack.tag);
if (ack.subscribe == SubscriptionRequest::Sub::SUBSCRIBE) {
m_on_subscription (ack.tag);
}
else if (ack.subscribe == SubscriptionRequest::Sub::UNSUBSCRIBE) {
m_on_unsubscription (ack.tag);
ERS_DEBUG(5, "Locking mutex " << m_mutex.native_handle());
const utility::ConditionalLockGuard lock(m_mutex, m_thread_safe);
if (m_closing_tags.contains(ack.tag)) {
auto remote_ep = m_closing_tags.at(ack.tag);
ERS_DEBUG (5, "Closing connection to " << remote_ep);
if (m_subscriber_backend) {
m_subscriber_backend->close_active_endpoint(remote_ep);
m_closing_tags.erase(ack.tag);
}
}
}
}
NetioStatus NetioSubscriber::send_subscription(const SubscriptionRequest& req,
const EndPointAddress& remoteEp) {
ERS_DEBUG(5,
"Sending " << (req.subscribe == SubscriptionRequest::Sub::SUBSCRIBE ? "" : "un")
<< "subscription request");
// Note this is only allowed for ASYNCMSG as it copies the data. Libfabric would have a dangling pointer
auto req_string = SubscriptionRequest::to_json(req);
ERS_DEBUG(4, "SubscriptionRequest=" << req_string);
auto req_span = std::span<const uint8_t>(
reinterpret_cast<const uint8_t*>(req_string.data()), req_string.size());
return m_subscriber_backend->send_data_copy(remoteEp,
req_span,
std::span<uint8_t>{}, 0);
}
NetioStatus NetioSubscriber::subscribe(uint64_t tag,
const EndPointAddress& remoteEp,
const ConnectionParametersRecv& conn_params) {
ERS_DEBUG(5, "Locking mutex " << m_mutex.native_handle());
const utility::ConditionalUniqueLock lock(m_mutex, m_thread_safe);
if (m_subscriptions.contains(tag)) {
ers::error (AlreadySubscribed(ERS_HERE, tag));
return NetioStatus::FAILED;
}
// Currently closing, retry after closed
if (m_closing_connections.contains(remoteEp)) {
return NetioStatus::NO_RESOURCES;
}
if (!m_connections.contains(remoteEp)) {
ERS_DEBUG(5, "opening active endpoint for " << remoteEp);
try {
m_subscriber_backend->open_active_endpoint(remoteEp,
{{},{},{}});
}
catch (FailedOpenActiveEndpoint& issue) {
ers::warning (issue);
return NetioStatus::FAILED;
}
}
m_connections.insert({remoteEp, {}});
if (not m_listen_ports.contains(conn_params)) {
m_listen_ports.try_emplace(conn_params, m_dataReceiver->listen({m_local_ip, 0}, conn_params));
m_num_connections_per_receiver.try_emplace(conn_params, 0);
}
++m_num_connections_per_receiver.at(conn_params);
m_tag_to_conn_params[tag] = conn_params;
NetioStatus status = NetioStatus::NO_RESOURCES;
if (m_acknowledge_port.contains(remoteEp)) {
const auto req = SubscriptionRequest{
.tag = tag,
.addr = m_local_ip,
.data_port = m_listen_ports.at(conn_params),
.ack_port = m_acknowledge_port.at(remoteEp),
.subscribe = SubscriptionRequest::Sub::SUBSCRIBE,
};
status = send_subscription(req, remoteEp);
if (status == NetioStatus::OK) {
m_subscriptions.emplace(tag);
m_connections[remoteEp].insert(tag);
}
else {
ERS_DEBUG(6, "Failed to send subscription");
}
}
return status;
}
NetioStatus NetioSubscriber::unsubscribe(uint64_t tag,
const EndPointAddress& remoteEp) {
ERS_DEBUG (5, std::format("Entered: tag {:x}, ep", tag) << remoteEp);
NetioStatus status = NetioStatus::OK;
{
ERS_DEBUG(5, "Locking mutex " << m_mutex.native_handle());
const utility::ConditionalLockGuard lock(m_mutex, m_thread_safe);
ERS_DEBUG(5, "Locked mutex " << m_mutex.native_handle());
if (not m_subscriptions.contains(tag)) {
ers::error (NotSubscribed(ERS_HERE, tag));
return NetioStatus::FAILED;
}
const auto req = SubscriptionRequest{
.tag = tag,
.addr = m_local_ip,
.data_port = m_listen_ports.at(m_tag_to_conn_params.at(tag)),
.ack_port = m_acknowledge_port.at(remoteEp),
.subscribe = SubscriptionRequest::Sub::UNSUBSCRIBE,
};
status = send_subscription(req, remoteEp);
if (status == NetioStatus::OK) {
m_subscriptions.erase(tag);
m_connections.at(remoteEp).erase(tag);
if (m_connections.at(remoteEp).empty()) {
ERS_DEBUG (5, "Removed last subscription to " << remoteEp << ", scheduling closing of connection");
m_closing_connections.emplace(remoteEp);
m_closing_tags.insert({tag, remoteEp});
m_connections.erase(remoteEp);
}
// Close listen connection if nothing is subscribed to this receiver anymore
--m_num_connections_per_receiver.at(m_tag_to_conn_params.at(tag));
if (m_num_connections_per_receiver.at(m_tag_to_conn_params.at(tag)) == 0) {
m_dataReceiver->close({m_local_ip, m_listen_ports.at(m_tag_to_conn_params.at(tag))});
m_listen_ports.erase(m_tag_to_conn_params.at(tag));
m_num_connections_per_receiver.erase(m_tag_to_conn_params.at(tag));
}
m_tag_to_conn_params.erase(tag);
}
else { // What do we do if the send failed??
ers::error (FailedSubscribe(ERS_HERE, "unsubscription", tag));
}
}
ERS_DEBUG(5, "Finished");
return status;
}
void NetioSubscriber::set_on_subscription_cb(const CbSubscriptionConfirmed& cb) {
m_on_subscription = cb;
}
void NetioSubscriber::set_on_unsubscription_cb(const CbUnsubscriptionConfirmed& cb) {
m_on_unsubscription = cb;
}
void NetioSubscriber::set_on_subscription_lost_cb(const CbSubscriptionLost& cb) {
m_on_subscription_lost_cb = cb;
}
void NetioSubscriber::set_on_data_cb(const CbMessageReceived& cb) {
// Forward this on to our NetioReceiver
m_dataReceiver->set_on_data_cb(cb);
}
void NetioSubscriber::set_on_buffer_cb(const CbBufferReceived& cb) {
// Forward this on to our NetioReceiver
m_dataReceiver->set_on_buffer_cb(cb);
}