.. _program_listing_file_BackendAsyncmsg_BackendAsyncmsg.cpp: Program Listing for File BackendAsyncmsg.cpp ============================================ |exhale_lsh| :ref:`Return to documentation for file ` (``BackendAsyncmsg/BackendAsyncmsg.cpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #include "BackendAsyncmsg.hpp" #include #include "Issues.hpp" #include "SendMessageBuffered.hpp" #include "SendMessageUnbuffered.hpp" #include "SendMessageUnbufferedCopy.hpp" #include "netio3-backend/EventLoop/AsioEventLoop.hpp" #include "netio3-backend/Issues.hpp" netio3::asyncmsg::BackendAsyncmsg::BackendAsyncmsg(const NetworkConfig& config, std::shared_ptr evloop) : NetworkBackend(config, std::move(evloop)), m_ioService{initIoService()}, m_eventSignal{m_evloop->create_signal([this](int /*fd*/) { handleQueuedEvent(); }, true)}, m_useAsioEventLoop{dynamic_cast(m_evloop.get()) != nullptr} { start(); } netio3::asyncmsg::BackendAsyncmsg::~BackendAsyncmsg() { ERS_DEBUG(1, "Stopping TCP backend"); for (const auto& [endpoint, server] : m_serversReceive) { server->shutdown(); } stop(); } void netio3::asyncmsg::BackendAsyncmsg::open_send_endpoint( const EndPointAddress& address, const ConnectionParameters& connection_params) { ZoneScopedC(0x70dbdb); std::lock_guard lock(m_mutex); const auto endpoint = getEndpoint(address.address(), address.port()); { auto ac = SenderMap::const_accessor{}; if (m_sessionsSend.find(ac, endpoint)) { throw SendEndpointAlreadyExists(ERS_HERE, address.address(), address.port()); } } const auto session = std::make_shared( m_ioService.get(), m_config, m_evloop.get(), [this](const std::shared_ptr& session_cb) { closeSession(session_cb); }, [this](const ConnectionEvent& event) { addEvent(event); }, m_useAsioEventLoop, m_mode, connection_params); m_sessionsSend.insert({endpoint, session}); session->asyncOpen(std::string{MYSELF}, endpoint); ERS_DEBUG( 1, std::format("Requested to open send connection on {}:{}", address.address(), address.port())); } netio3::EndPointAddress netio3::asyncmsg::BackendAsyncmsg::open_listen_endpoint( const EndPointAddress& address, const ConnectionParametersRecv& /*connection_params*/) { ZoneScopedC(0x70dbdb); std::lock_guard lock(m_mutex); const auto endpoint = getEndpoint(address.address(), address.port()); if (m_serversReceive.contains(endpoint)) { throw ListenEndpointAlreadyExists(ERS_HERE, address.address(), address.port()); } const auto server = std::make_shared( m_ioService.get(), m_config, [this](const EndPointAddress& address_cb) { onServerShutdown(address_cb); }, [this](const ConnectionEvent& event) { addEvent(event); }, m_evloop.get(), m_useAsioEventLoop, m_mode); try { server->listen(std::string{MYSELF}, endpoint); } catch (const boost::system::system_error& e) { throw FailedOpenListenEndpoint(ERS_HERE, address.address(), address.port(), e.what(), e); } server->startAccept(); m_serversReceive.emplace(server->localEndpoint(), server); ERS_INFO(std::format("Listening on {}:{}", server->localEndpoint().address().to_string(), server->localEndpoint().port())); return {server->localEndpoint().address().to_string(), server->localEndpoint().port()}; } void netio3::asyncmsg::BackendAsyncmsg::close_listen_endpoint(const EndPointAddress& address) { ZoneScopedC(0x29a3a3); std::lock_guard lock(m_mutex); const auto endpoint = getEndpoint(address.address(), address.port()); if (not m_serversReceive.contains(endpoint)) { throw UnknownListenEndpoint(ERS_HERE, address.address(), address.port()); } m_serversReceive.at(endpoint)->shutdown(); ERS_INFO(std::format("Stop listening on {}:{}", address.address(), address.port())); } void netio3::asyncmsg::BackendAsyncmsg::close_send_endpoint(const EndPointAddress& address) { ZoneScopedC(0x29a3a3); std::lock_guard lock(m_mutex); const auto endpoint = getEndpoint(address.address(), address.port()); auto ac = SenderMap::const_accessor{}; if (not m_sessionsSend.find(ac, endpoint)) { throw UnknownSendEndpoint(ERS_HERE, address.address(), address.port()); } ac->second->asyncClose(); ac.release(); ERS_DEBUG( 1, std::format("Requested to close send connection on {}:{}", address.address(), address.port())); } boost::asio::io_service& netio3::asyncmsg::BackendAsyncmsg::initIoService() { auto* evloop = dynamic_cast(m_evloop.get()); if (evloop != nullptr) { return evloop->get_io_service(); } return m_ioServiceResource; } void netio3::asyncmsg::BackendAsyncmsg::onServerShutdown(const EndPointAddress& address) { std::lock_guard lock(m_mutex); m_serversReceive.erase(getEndpoint(address.address(), address.port())); } void netio3::asyncmsg::BackendAsyncmsg::addEvent(const ConnectionEvent& event) { m_eventQueue.push(event); m_eventSignal.fire(); } void netio3::asyncmsg::BackendAsyncmsg::handleQueuedEvent() { ConnectionEvent event{}; const auto dequeued = m_eventQueue.try_pop(event); if (not dequeued) { ers::error(TcpFailedDequeueMessage(ERS_HERE)); return; } switch (event.type) { case ConnectionEvent::Type::OPENED: if (m_config.on_connection_established_cb) { m_config.on_connection_established_cb(event.address); } break; case ConnectionEvent::Type::REFUSED: { if (m_config.on_connection_refused_cb) { m_config.on_connection_refused_cb(event.address); } const auto endpoint = getEndpoint(event.address.address(), event.address.port()); auto ac = SenderMap::const_accessor{}; if (m_sessionsSend.find(ac, endpoint)) { const auto session = ac->second; ac.release(); closeSession(session); } break; } case ConnectionEvent::Type::CLOSED: { const auto endpoint = getEndpoint(event.address.address(), event.address.port()); auto ac = SenderMap::const_accessor{}; if (m_sessionsSend.find(ac, endpoint)) { const auto session = ac->second; ac.release(); if (m_config.on_connection_closed_cb) { m_config.on_connection_closed_cb(event.address, session->getPendingSends()); } closeSession(session); } else if (m_config.on_connection_closed_cb) { // receive sessions m_config.on_connection_closed_cb(event.address, {}); } break; } } } void netio3::asyncmsg::BackendAsyncmsg::start() { if (m_useAsioEventLoop) { return; } switch (m_mode) { case Mode::POLL: [[fallthrough]]; case Mode::POLL_ONE: m_timer.start(TIMER_INTERVAL); break; case Mode::STANDALONE: [[fallthrough]]; case Mode::DELEGATE: m_work = std::make_unique(m_ioService); m_ioServiceThread = std::jthread([this]() { m_ioService.get().run(); }); break; } } void netio3::asyncmsg::BackendAsyncmsg::stop() { if (m_useAsioEventLoop) { return; } switch (m_mode) { case Mode::POLL: [[fallthrough]]; case Mode::POLL_ONE: m_timer.stop(); break; case Mode::STANDALONE: [[fallthrough]]; case Mode::DELEGATE: m_work.reset(); m_ioService.get().stop(); m_ioServiceThread.join(); break; } } void netio3::asyncmsg::BackendAsyncmsg::closeSession( std::shared_ptr session) { std::lock_guard lock(m_mutex); m_sessionsSend.erase(session->cachedRemoteEndpoint()); } void netio3::asyncmsg::BackendAsyncmsg::poll() { ERS_DEBUG(2, "Polling"); switch (m_mode) { case Mode::POLL: if (m_ioService.get().stopped()) { m_ioService.get().restart(); } m_ioService.get().poll(); break; case Mode::POLL_ONE: for (auto i = 0; i < MAX_POLLS; ++i) { const auto numPolled = m_ioService.get().poll_one(); if (numPolled == 0) { break; } } break; default: ers::warning(TcpLogicError(ERS_HERE, "Polling is not supported in this mode")); break; } } boost::asio::ip::tcp::endpoint netio3::asyncmsg::BackendAsyncmsg::getEndpoint( std::string_view address, unsigned short port) { boost::system::error_code ec; const auto ip = boost::asio::ip::make_address(address, ec); if (ec) { throw InvalidEndpointAddress(ERS_HERE, std::string{address}, port); } return boost::asio::ip::tcp::endpoint{ip, port}; } netio3::NetioStatus netio3::asyncmsg::BackendAsyncmsg::send_data( const EndPointAddress& address, const std::span data, const std::span header_data, const std::uint64_t key) { const auto endpoint = getEndpoint(address.address(), address.port()); auto ac = SenderMap::const_accessor{}; if (not m_sessionsSend.find(ac, endpoint)) { throw UnknownSendEndpoint(ERS_HERE, address.address(), address.port()); } auto message = std::make_unique(data, header_data, key); ac->second->asyncSend(std::move(message)); return NetioStatus::OK; } netio3::NetioStatus netio3::asyncmsg::BackendAsyncmsg::send_data( const EndPointAddress& address, const std::span iov, const std::span header_data, const std::uint64_t key) { const auto endpoint = getEndpoint(address.address(), address.port()); auto ac = SenderMap::const_accessor{}; if (not m_sessionsSend.find(ac, endpoint)) { throw UnknownSendEndpoint(ERS_HERE, address.address(), address.port()); } auto message = std::make_unique(iov, header_data, key); ac->second->asyncSend(std::move(message)); return NetioStatus::OK; } netio3::NetioStatus netio3::asyncmsg::BackendAsyncmsg::send_data_copy( const EndPointAddress& address, const std::span data, const std::span header_data, const std::uint64_t key) { const auto endpoint = getEndpoint(address.address(), address.port()); auto ac = SenderMap::const_accessor{}; if (not m_sessionsSend.find(ac, endpoint)) { throw UnknownSendEndpoint(ERS_HERE, address.address(), address.port()); } auto message = std::make_unique(data, header_data, key); ac->second->asyncSend(std::move(message)); return NetioStatus::OK; } netio3::NetioStatus netio3::asyncmsg::BackendAsyncmsg::send_data_copy( const EndPointAddress& address, const std::span iov, const std::span header_data, const std::uint64_t key) { const auto endpoint = getEndpoint(address.address(), address.port()); auto ac = SenderMap::const_accessor{}; if (not m_sessionsSend.find(ac, endpoint)) { throw UnknownSendEndpoint(ERS_HERE, address.address(), address.port()); } auto message = std::make_unique(iov, header_data, key); ac->second->asyncSend(std::move(message)); return NetioStatus::OK; } netio3::NetworkBuffer* netio3::asyncmsg::BackendAsyncmsg::get_buffer(const EndPointAddress& address) { const auto endpoint = getEndpoint(address.address(), address.port()); auto ac = SenderMap::const_accessor{}; if (not m_sessionsSend.find(ac, endpoint)) { throw UnknownSendEndpoint(ERS_HERE, address.address(), address.port()); } return ac->second->getBuffer(); } netio3::NetioStatus netio3::asyncmsg::BackendAsyncmsg::send_buffer(const EndPointAddress& address, NetworkBuffer* buffer) { const auto endpoint = getEndpoint(address.address(), address.port()); auto ac = SenderMap::const_accessor{}; if (not m_sessionsSend.find(ac, endpoint)) { throw UnknownSendEndpoint(ERS_HERE, address.address(), address.port()); } const auto* actualBuffer = dynamic_cast(buffer); if (actualBuffer == nullptr) { ers::error(InvalidBuffer(ERS_HERE, "Buffer")); return NetioStatus::FAILED; } ac->second->asyncSend(std::make_unique(actualBuffer)); return NetioStatus::OK; } std::size_t netio3::asyncmsg::BackendAsyncmsg::get_num_available_buffers( const EndPointAddress& address) { const auto endpoint = getEndpoint(address.address(), address.port()); auto ac = SenderMap::const_accessor{}; if (not m_sessionsSend.find(ac, endpoint)) { throw UnknownSendEndpoint(ERS_HERE, address.address(), address.port()); } return ac->second->getNumAvailableBuffers(); }