.. _program_listing_file_BackendAsyncmsg_Session.cpp: Program Listing for File Session.cpp ==================================== |exhale_lsh| :ref:`Return to documentation for file ` (``BackendAsyncmsg/Session.cpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #include "Session.hpp" #include #include #include #include "Issues.hpp" #include "SendMessage.hpp" #include "SendMessageBuffered.hpp" #include "netio3-backend/Issues.hpp" netio3::asyncmsg::Session::Session( boost::asio::io_service& ioService, NetworkConfig config, BaseEventLoop* eventloop, std::function)> connectionClosedCb, std::function addEventCb, const bool useAsioEventLoop, const Mode mode, const ConnectionParameters& connectionParams) : felix::asyncmsg::Session(ioService), m_config{std::move(config)}, m_bufferManager{connectionParams}, m_messageSignal{eventloop->create_signal( [this](int /*fd*/) { ReceiveMessage::Message message{}; const auto dequeued = m_messageQueue.try_pop(message); if (not dequeued) { ers::error(TcpFailedDequeueMessage(ERS_HERE)); return; } handleOnReceive(message); }, true)}, m_connectionClosedCb{std::move(connectionClosedCb)}, m_addEventCb{std::move(addEventCb)}, m_useAsioEventLoop{useAsioEventLoop}, m_mode{mode} {} void netio3::asyncmsg::Session::sendData(std::unique_ptr message) { ZoneScoped; asyncSend(std::move(message)); std::lock_guard lock(m_mutex_pending_sends); m_pendingSends.push_back(message->key()); } void netio3::asyncmsg::Session::onOpen() { ERS_INFO(std::format( "Opened connection {}:{}", remoteEndpoint().address().to_string(), remoteEndpoint().port())); asyncReceive(); trigger_on_connection_established_cb(); } void netio3::asyncmsg::Session::onOpenError(const boost::system::error_code& error) { ers::error(FailedOpenSendEndpoint(ERS_HERE, cachedRemoteEndpoint().address().to_string(), cachedRemoteEndpoint().port(), error.message())); trigger_on_connection_refused_cb(); } void netio3::asyncmsg::Session::onClose() { ERS_INFO(std::format("Closed connection {}:{}", cachedRemoteEndpoint().address().to_string(), cachedRemoteEndpoint().port())); trigger_on_connection_closed_cb(); } void netio3::asyncmsg::Session::onCloseError(const boost::system::error_code& error) { if (error == felix::asyncmsg::Error::SESSION_NOT_OPEN) { ERS_DEBUG(2, FailedCloseEndpoint(ERS_HERE, cachedRemoteEndpoint().address().to_string(), cachedRemoteEndpoint().port(), error.message())); return; } ers::warning(FailedCloseEndpoint(ERS_HERE, cachedRemoteEndpoint().address().to_string(), cachedRemoteEndpoint().port(), error.message())); if (m_numCloseAttempts < MAX_CLOSE_ATTEMPTS) { ++m_numCloseAttempts; asyncClose(); } else { ers::error( FailedCloseEndpoint(ERS_HERE, cachedRemoteEndpoint().address().to_string(), cachedRemoteEndpoint().port(), std::format("Failed max number of attempts ({})", m_numCloseAttempts))); } } std::unique_ptr netio3::asyncmsg::Session::createMessage( const std::uint32_t typeId, const std::uint32_t transactionId, const std::uint32_t size) { ERS_DEBUG( 2, std::format( "Creating message for type {}, transaction {}, size {}", typeId, transactionId, size)); return std::make_unique(ReceiveMessage::ReceiveMessageConfig{ .typeId = typeId, .transactionId = transactionId, .size = size}); } void netio3::asyncmsg::Session::onReceive(std::unique_ptr message) { ZoneScopedC(0x33cccc); ERS_DEBUG(2, std::format("Received message from {}:{}", remoteEndpoint().address().to_string(), remoteEndpoint().port())); asyncReceive(); auto* receiveMessage = dynamic_cast(message.get()); if (receiveMessage == nullptr) { ers::error(TcpReceiveMessageWrongType(ERS_HERE)); return; } if (m_mode == Mode::DELEGATE) { ZoneScoped; m_messageQueue.push(std::move(receiveMessage->getMessage())); m_messageSignal.fire(); } else { handleOnReceive(receiveMessage->getMessage()); } } void netio3::asyncmsg::Session::onReceiveError( const boost::system::error_code& error, std::unique_ptr /*message*/) { if (state() != State::OPEN) { ERS_LOG(std::format("Error receiving message from {}:{}: {} while not in state OPEN. Ignoring.", cachedRemoteEndpoint().address().to_string(), cachedRemoteEndpoint().port(), error.message())); return; } if (error.value() == boost::asio::error::eof) { ERS_LOG(std::format("Connection closed by {}:{}", cachedRemoteEndpoint().address().to_string(), cachedRemoteEndpoint().port())); asyncClose(); return; } ers::error(FailedReceive(ERS_HERE, cachedRemoteEndpoint().address().to_string(), cachedRemoteEndpoint().port(), error.message())); asyncReceive(); } void netio3::asyncmsg::Session::onSend(std::unique_ptr message) { ZoneScopedC(0x29a3a3); ERS_DEBUG(2, std::format("Sent message to {}:{}", cachedRemoteEndpoint().address().to_string(), cachedRemoteEndpoint().port())); handleOnSend(std::move(message)); } void netio3::asyncmsg::Session::onSendError( const boost::system::error_code& error, std::unique_ptr message) { if (state() == State::OPEN or state() == State::OPEN_PENDING) { ers::error(FailedSend( ERS_HERE, remoteEndpoint().address().to_string(), remoteEndpoint().port(), error.message())); } handleOnSend(std::move(message)); } netio3::NetworkBuffer* netio3::asyncmsg::Session::getBuffer() { try { return m_bufferManager.getBuffer(); } catch (const TcpNoBuffers&) { throw NoBuffersAllocated( ERS_HERE, remoteEndpoint().address().to_string(), remoteEndpoint().port()); } } std::size_t netio3::asyncmsg::Session::getNumAvailableBuffers() { return m_bufferManager.getNumAvailableBuffers(); } std::vector netio3::asyncmsg::Session::getPendingSends() const { std::lock_guard lock(m_mutex_pending_sends); return m_pendingSends; } void netio3::asyncmsg::Session::handleOnReceive(const ReceiveMessage::Message& receiveMessage) const { ZoneScopedC(0x1f7a7a); ERS_DEBUG(2, "Received message"); if (m_config.on_data_cb) { m_config.on_data_cb(receiveMessage.data); } } void netio3::asyncmsg::Session::handleOnSend(std::unique_ptr message) { ZoneScopedC(0x1f7a7a); const auto* sendMessage = dynamic_cast(message.get()); if (sendMessage == nullptr) { ers::error(TcpSendMessageWrongType(ERS_HERE)); return; } checkAndReturnBuffer(sendMessage); if (m_config.on_send_completed_cb) { m_config.on_send_completed_cb( EndPointAddress(cachedRemoteEndpoint().address().to_string(), cachedRemoteEndpoint().port()), sendMessage->key()); } std::lock_guard lock(m_mutex_pending_sends); std::erase(m_pendingSends, sendMessage->key()); } void netio3::asyncmsg::Session::trigger_on_connection_established_cb() { trigger_cb(ConnectionEvent::Type::OPENED); } void netio3::asyncmsg::Session::trigger_on_connection_refused_cb() { trigger_cb(ConnectionEvent::Type::REFUSED); } void netio3::asyncmsg::Session::trigger_on_connection_closed_cb() { trigger_cb(ConnectionEvent::Type::CLOSED); } void netio3::asyncmsg::Session::trigger_cb(const ConnectionEvent::Type eventType) { const auto endpoint = EndPointAddress(cachedRemoteEndpoint().address().to_string(), cachedRemoteEndpoint().port()); if (not m_useAsioEventLoop and m_mode == Mode::DELEGATE) { m_addEventCb({eventType, endpoint}); return; } switch (eventType) { case ConnectionEvent::Type::OPENED: if (m_config.on_connection_established_cb) { m_config.on_connection_established_cb(endpoint); } break; case ConnectionEvent::Type::REFUSED: if (m_config.on_connection_refused_cb) { m_config.on_connection_refused_cb(endpoint); } m_connectionClosedCb(shared_from_this()); break; case ConnectionEvent::Type::CLOSED: if (m_config.on_connection_closed_cb) { std::lock_guard lock(m_mutex_pending_sends); m_config.on_connection_closed_cb(endpoint, m_pendingSends); } m_connectionClosedCb(shared_from_this()); break; default: throw std::logic_error("Unknown event type"); } } void netio3::asyncmsg::Session::checkAndReturnBuffer(const felix::asyncmsg::OutputMessage* message) { const auto* bufferedMessage = dynamic_cast(message); if (bufferedMessage == nullptr) { return; } m_bufferManager.returnBuffer(bufferedMessage->key()); }