.. _program_listing_file_NetioSubscriber.cpp: Program Listing for File NetioSubscriber.cpp ============================================ |exhale_lsh| :ref:`Return to documentation for file ` (``NetioSubscriber.cpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #include "netio3/NetioSubscriber.hpp" #include #include #include #include #include #include #include #include #include #include #include #include #include #include "netio3/Utility.hpp" #include "netio3/SubscriptionRequest.hpp" #include "netio3/NetioReceiver.hpp" #include "netio3/NetioIssues.hpp" using namespace netio3; using namespace std::chrono_literals; NetioSubscriber::NetioSubscriber(const NetioSubscriberConfig& config, std::shared_ptr evloop, const std::string& local_ip) : m_local_ip(local_ip), m_thread_safe{config.thread_safety == ThreadSafetyModel::SAFE} { ERS_DEBUG(5, "Entered"); const NetworkConfig backend_config{ .mode=NetworkMode::TCP, .thread_safety = config.thread_safety, .conn_params{}, .callbacks{ .on_data_cb = [&](std::span data) { on_acknowledgement(data);}, .on_connection_established_cb = [&](const EndPointAddress& remote_ep, const EndPointAddress& local_ep, EndpointCapabilities /*cap*/) { control_connection_established(remote_ep, local_ep);}, .on_connection_closed_cb = [&](const EndPointAddress& ep, const std::vector& /*keys*/) { connection_lost(ep);}, .on_connection_refused_cb = [&](const EndPointAddress& ep) { connection_refused(ep);}, .on_send_completed_cb = nullptr }, .local_ip = m_local_ip == "0.0.0.0" ? std::nullopt : std::optional(m_local_ip), }; m_subscriber_backend = NetworkBackend::create( NetworkType::ASYNCMSG, backend_config, evloop); const NetioReceiverConfig rec_config { .backend_type = config.backend_type, .backend_mode = config.backend_mode, .thread_safety = config.thread_safety, }; m_dataReceiver = std::make_unique(rec_config, evloop); m_dataReceiver->set_on_connection_closed_cb( [&](const EndPointAddress&) { ERS_DEBUG(5, "*** Receiver connection closed!!!! ***"); }); ERS_DEBUG(5, "Finished"); } NetioSubscriber::~NetioSubscriber() { ERS_DEBUG(1, "Entered " << m_subscriptions.size() << " subscriptions still in list"); m_subscriber_backend.reset(); m_dataReceiver.reset(); ERS_DEBUG(5, "Finished"); } void NetioSubscriber::control_connection_established(const EndPointAddress& remote_ep, const EndPointAddress& local_ep) { ERS_DEBUG(5, "Entered with remote_ep " << remote_ep << ", local_ep " << local_ep); { const utility::ConditionalLockGuard lock(m_mutex, m_thread_safe); m_acknowledge_port.insert({remote_ep, local_ep.port()}); } } void NetioSubscriber::connection_lost(const EndPointAddress& ep) { ERS_DEBUG(5, "connection to " << ep << " closed"); { ERS_DEBUG(5, "Locking mutex " << m_mutex.native_handle()); const utility::ConditionalLockGuard lock(m_mutex, m_thread_safe); ERS_DEBUG(5, "Locked mutex " << m_mutex.native_handle()); if (m_connections.contains(ep)) { std::set tags; for (auto tag : m_connections.at(ep)) { m_subscriptions.erase(tag); --m_num_connections_per_receiver.at(m_tag_to_conn_params.at(tag)); // Close listen connection if nothing is subscribed to this receiver anymore bool close_receiver = (m_num_connections_per_receiver.at(m_tag_to_conn_params.at(tag)) == 0); if (close_receiver) { m_dataReceiver->close({m_local_ip, m_listen_ports.at(m_tag_to_conn_params.at(tag))}); m_listen_ports.erase(m_tag_to_conn_params.at(tag)); m_num_connections_per_receiver.erase(m_tag_to_conn_params.at(tag)); } m_tag_to_conn_params.erase(tag); } tags = m_connections.at(ep); m_connections.erase(ep); m_on_subscription_lost_cb(tags); } if (m_closing_connections.contains(ep)) { m_closing_connections.erase(ep); } if (m_acknowledge_port.contains(ep)) { m_acknowledge_port.erase(ep); } } ERS_DEBUG(5, "Finished"); } void NetioSubscriber::connection_refused(const EndPointAddress& ep) { ERS_DEBUG(5, "Locking mutex " << m_mutex.native_handle()); const utility::ConditionalLockGuard lock(m_mutex, m_thread_safe); ERS_DEBUG(5, "Locked mutex " << m_mutex.native_handle()); if (m_connections.contains(ep)) { if (m_connections.at(ep).empty()) { m_connections.erase(ep); } else { ers::fatal(ConnectionNotIdle(ERS_HERE, ep.address(), ep.port())); } } if (m_closing_connections.contains(ep)) { m_closing_connections.erase(ep); } } void NetioSubscriber::on_acknowledgement(std::span data) { const auto ack = SubscriptionAcknowledgement::from_json(std::string(data.begin(), data.end())); ERS_DEBUG(5, (ack.subscribe==SubscriptionRequest::Sub::SUBSCRIBE ? "S" : "Uns") <<"ubscription acknowledged: tag=" << std::hex << ack.tag); if (ack.subscribe == SubscriptionRequest::Sub::SUBSCRIBE) { m_on_subscription (ack.tag); } else if (ack.subscribe == SubscriptionRequest::Sub::UNSUBSCRIBE) { m_on_unsubscription (ack.tag); ERS_DEBUG(5, "Locking mutex " << m_mutex.native_handle()); const utility::ConditionalLockGuard lock(m_mutex, m_thread_safe); if (m_closing_tags.contains(ack.tag)) { auto remote_ep = m_closing_tags.at(ack.tag); ERS_DEBUG (5, "Closing connection to " << remote_ep); if (m_subscriber_backend) { m_subscriber_backend->close_active_endpoint(remote_ep); m_closing_tags.erase(ack.tag); } } } } NetioStatus NetioSubscriber::send_subscription(const SubscriptionRequest& req, const EndPointAddress& remoteEp) { ERS_DEBUG(5, "Sending " << (req.subscribe == SubscriptionRequest::Sub::SUBSCRIBE ? "" : "un") << "subscription request"); // Note this is only allowed for ASYNCMSG as it copies the data. Libfabric would have a dangling pointer auto req_string = SubscriptionRequest::to_json(req); ERS_DEBUG(4, "SubscriptionRequest=" << req_string); auto req_span = std::span( reinterpret_cast(req_string.data()), req_string.size()); return m_subscriber_backend->send_data_copy(remoteEp, req_span, std::span{}, 0); } NetioStatus NetioSubscriber::subscribe(uint64_t tag, const EndPointAddress& remoteEp, const ConnectionParametersRecv& conn_params) { ERS_DEBUG(5, "Locking mutex " << m_mutex.native_handle()); const utility::ConditionalUniqueLock lock(m_mutex, m_thread_safe); if (m_subscriptions.contains(tag)) { ers::error (AlreadySubscribed(ERS_HERE, tag)); return NetioStatus::FAILED; } // Currently closing, retry after closed if (m_closing_connections.contains(remoteEp)) { return NetioStatus::NO_RESOURCES; } if (!m_connections.contains(remoteEp)) { ERS_DEBUG(5, "opening active endpoint for " << remoteEp); try { m_subscriber_backend->open_active_endpoint(remoteEp, {{},{},{}}); } catch (FailedOpenActiveEndpoint& issue) { ers::warning (issue); return NetioStatus::FAILED; } } m_connections.insert({remoteEp, {}}); if (not m_listen_ports.contains(conn_params)) { m_listen_ports.try_emplace(conn_params, m_dataReceiver->listen({m_local_ip, 0}, conn_params)); m_num_connections_per_receiver.try_emplace(conn_params, 0); } ++m_num_connections_per_receiver.at(conn_params); m_tag_to_conn_params[tag] = conn_params; NetioStatus status = NetioStatus::NO_RESOURCES; if (m_acknowledge_port.contains(remoteEp)) { const auto req = SubscriptionRequest{ .tag = tag, .addr = m_local_ip, .data_port = m_listen_ports.at(conn_params), .ack_port = m_acknowledge_port.at(remoteEp), .subscribe = SubscriptionRequest::Sub::SUBSCRIBE, }; status = send_subscription(req, remoteEp); if (status == NetioStatus::OK) { m_subscriptions.emplace(tag); m_connections[remoteEp].insert(tag); } else { ERS_DEBUG(6, "Failed to send subscription"); } } return status; } NetioStatus NetioSubscriber::unsubscribe(uint64_t tag, const EndPointAddress& remoteEp) { ERS_DEBUG (5, std::format("Entered: tag {:x}, ep", tag) << remoteEp); NetioStatus status = NetioStatus::OK; { ERS_DEBUG(5, "Locking mutex " << m_mutex.native_handle()); const utility::ConditionalLockGuard lock(m_mutex, m_thread_safe); ERS_DEBUG(5, "Locked mutex " << m_mutex.native_handle()); if (not m_subscriptions.contains(tag)) { ers::error (NotSubscribed(ERS_HERE, tag)); return NetioStatus::FAILED; } const auto req = SubscriptionRequest{ .tag = tag, .addr = m_local_ip, .data_port = m_listen_ports.at(m_tag_to_conn_params.at(tag)), .ack_port = m_acknowledge_port.at(remoteEp), .subscribe = SubscriptionRequest::Sub::UNSUBSCRIBE, }; status = send_subscription(req, remoteEp); if (status == NetioStatus::OK) { m_subscriptions.erase(tag); m_connections.at(remoteEp).erase(tag); if (m_connections.at(remoteEp).empty()) { ERS_DEBUG (5, "Removed last subscription to " << remoteEp << ", scheduling closing of connection"); m_closing_connections.emplace(remoteEp); m_closing_tags.insert({tag, remoteEp}); m_connections.erase(remoteEp); } // Close listen connection if nothing is subscribed to this receiver anymore --m_num_connections_per_receiver.at(m_tag_to_conn_params.at(tag)); if (m_num_connections_per_receiver.at(m_tag_to_conn_params.at(tag)) == 0) { m_dataReceiver->close({m_local_ip, m_listen_ports.at(m_tag_to_conn_params.at(tag))}); m_listen_ports.erase(m_tag_to_conn_params.at(tag)); m_num_connections_per_receiver.erase(m_tag_to_conn_params.at(tag)); } m_tag_to_conn_params.erase(tag); } else { // What do we do if the send failed?? ers::error (FailedSubscribe(ERS_HERE, "unsubscription", tag)); } } ERS_DEBUG(5, "Finished"); return status; } void NetioSubscriber::set_on_subscription_cb(const CbSubscriptionConfirmed& cb) { m_on_subscription = cb; } void NetioSubscriber::set_on_unsubscription_cb(const CbUnsubscriptionConfirmed& cb) { m_on_unsubscription = cb; } void NetioSubscriber::set_on_subscription_lost_cb(const CbSubscriptionLost& cb) { m_on_subscription_lost_cb = cb; } void NetioSubscriber::set_on_data_cb(const CbMessageReceived& cb) { // Forward this on to our NetioReceiver m_dataReceiver->set_on_data_cb(cb); } void NetioSubscriber::set_on_buffer_cb(const CbBufferReceived& cb) { // Forward this on to our NetioReceiver m_dataReceiver->set_on_buffer_cb(cb); }