Program Listing for File Session.cpp
↰ Return to documentation for file (BackendAsyncmsg/Session.cpp
)
#include "Session.hpp"
#include <mutex>
#include <asyncmsg/Error.h>
#include <tracy/Tracy.hpp>
#include "Issues.hpp"
#include "SendMessage.hpp"
#include "SendMessageBuffered.hpp"
#include "netio3-backend/Issues.hpp"
netio3::asyncmsg::Session::Session(
boost::asio::io_service& ioService,
NetworkConfig config,
BaseEventLoop* eventloop,
std::function<void(std::shared_ptr<felix::asyncmsg::Session>)> connectionClosedCb,
std::function<void(const ConnectionEvent&)> addEventCb,
const bool useAsioEventLoop,
const Mode mode,
const ConnectionParameters& connectionParams) :
felix::asyncmsg::Session(ioService),
m_config{std::move(config)},
m_bufferManager{connectionParams},
m_messageSignal{eventloop->create_signal(
[this](int /*fd*/) {
ReceiveMessage::Message message{};
const auto dequeued = m_messageQueue.try_pop(message);
if (not dequeued) {
ers::error(TcpFailedDequeueMessage(ERS_HERE));
return;
}
handleOnReceive(message);
},
true)},
m_connectionClosedCb{std::move(connectionClosedCb)},
m_addEventCb{std::move(addEventCb)},
m_useAsioEventLoop{useAsioEventLoop},
m_mode{mode}
{}
void netio3::asyncmsg::Session::sendData(std::unique_ptr<SendMessageUnbuffered> message)
{
ZoneScoped;
asyncSend(std::move(message));
std::lock_guard lock(m_mutex_pending_sends);
m_pendingSends.push_back(message->key());
}
void netio3::asyncmsg::Session::onOpen()
{
ERS_INFO(std::format(
"Opened connection {}:{}", remoteEndpoint().address().to_string(), remoteEndpoint().port()));
asyncReceive();
trigger_on_connection_established_cb();
}
void netio3::asyncmsg::Session::onOpenError(const boost::system::error_code& error)
{
ers::error(FailedOpenSendEndpoint(ERS_HERE,
cachedRemoteEndpoint().address().to_string(),
cachedRemoteEndpoint().port(),
error.message()));
trigger_on_connection_refused_cb();
}
void netio3::asyncmsg::Session::onClose()
{
ERS_INFO(std::format("Closed connection {}:{}",
cachedRemoteEndpoint().address().to_string(),
cachedRemoteEndpoint().port()));
trigger_on_connection_closed_cb();
}
void netio3::asyncmsg::Session::onCloseError(const boost::system::error_code& error)
{
if (error == felix::asyncmsg::Error::SESSION_NOT_OPEN) {
ERS_DEBUG(2,
FailedCloseEndpoint(ERS_HERE,
cachedRemoteEndpoint().address().to_string(),
cachedRemoteEndpoint().port(),
error.message()));
return;
}
ers::warning(FailedCloseEndpoint(ERS_HERE,
cachedRemoteEndpoint().address().to_string(),
cachedRemoteEndpoint().port(),
error.message()));
if (m_numCloseAttempts < MAX_CLOSE_ATTEMPTS) {
++m_numCloseAttempts;
asyncClose();
} else {
ers::error(
FailedCloseEndpoint(ERS_HERE,
cachedRemoteEndpoint().address().to_string(),
cachedRemoteEndpoint().port(),
std::format("Failed max number of attempts ({})", m_numCloseAttempts)));
}
}
std::unique_ptr<felix::asyncmsg::InputMessage> netio3::asyncmsg::Session::createMessage(
const std::uint32_t typeId,
const std::uint32_t transactionId,
const std::uint32_t size)
{
ERS_DEBUG(
2,
std::format(
"Creating message for type {}, transaction {}, size {}", typeId, transactionId, size));
return std::make_unique<ReceiveMessage>(ReceiveMessage::ReceiveMessageConfig{
.typeId = typeId, .transactionId = transactionId, .size = size});
}
void netio3::asyncmsg::Session::onReceive(std::unique_ptr<felix::asyncmsg::InputMessage> message)
{
ZoneScopedC(0x33cccc);
ERS_DEBUG(2,
std::format("Received message from {}:{}",
remoteEndpoint().address().to_string(),
remoteEndpoint().port()));
asyncReceive();
auto* receiveMessage = dynamic_cast<ReceiveMessage*>(message.get());
if (receiveMessage == nullptr) {
ers::error(TcpReceiveMessageWrongType(ERS_HERE));
return;
}
if (m_mode == Mode::DELEGATE) {
ZoneScoped;
m_messageQueue.push(std::move(receiveMessage->getMessage()));
m_messageSignal.fire();
} else {
handleOnReceive(receiveMessage->getMessage());
}
}
void netio3::asyncmsg::Session::onReceiveError(
const boost::system::error_code& error,
std::unique_ptr<felix::asyncmsg::InputMessage> /*message*/)
{
if (state() != State::OPEN) {
ERS_LOG(std::format("Error receiving message from {}:{}: {} while not in state OPEN. Ignoring.",
cachedRemoteEndpoint().address().to_string(),
cachedRemoteEndpoint().port(),
error.message()));
return;
}
if (error.value() == boost::asio::error::eof) {
ERS_LOG(std::format("Connection closed by {}:{}",
cachedRemoteEndpoint().address().to_string(),
cachedRemoteEndpoint().port()));
asyncClose();
return;
}
ers::error(FailedReceive(ERS_HERE,
cachedRemoteEndpoint().address().to_string(),
cachedRemoteEndpoint().port(),
error.message()));
asyncReceive();
}
void netio3::asyncmsg::Session::onSend(std::unique_ptr<const felix::asyncmsg::OutputMessage> message)
{
ZoneScopedC(0x29a3a3);
ERS_DEBUG(2,
std::format("Sent message to {}:{}",
cachedRemoteEndpoint().address().to_string(),
cachedRemoteEndpoint().port()));
handleOnSend(std::move(message));
}
void netio3::asyncmsg::Session::onSendError(
const boost::system::error_code& error,
std::unique_ptr<const felix::asyncmsg::OutputMessage> message)
{
if (state() == State::OPEN or state() == State::OPEN_PENDING) {
ers::error(FailedSend(
ERS_HERE, remoteEndpoint().address().to_string(), remoteEndpoint().port(), error.message()));
}
handleOnSend(std::move(message));
}
netio3::NetworkBuffer* netio3::asyncmsg::Session::getBuffer()
{
try {
return m_bufferManager.getBuffer();
} catch (const TcpNoBuffers&) {
throw NoBuffersAllocated(
ERS_HERE, remoteEndpoint().address().to_string(), remoteEndpoint().port());
}
}
std::size_t netio3::asyncmsg::Session::getNumAvailableBuffers()
{
return m_bufferManager.getNumAvailableBuffers();
}
std::vector<std::uint64_t> netio3::asyncmsg::Session::getPendingSends() const
{
std::lock_guard lock(m_mutex_pending_sends);
return m_pendingSends;
}
void netio3::asyncmsg::Session::handleOnReceive(const ReceiveMessage::Message& receiveMessage) const
{
ZoneScopedC(0x1f7a7a);
ERS_DEBUG(2, "Received message");
if (m_config.on_data_cb) {
m_config.on_data_cb(receiveMessage.data);
}
}
void netio3::asyncmsg::Session::handleOnSend(std::unique_ptr<const felix::asyncmsg::OutputMessage> message)
{
ZoneScopedC(0x1f7a7a);
const auto* sendMessage = dynamic_cast<const SendMessage*>(message.get());
if (sendMessage == nullptr) {
ers::error(TcpSendMessageWrongType(ERS_HERE));
return;
}
checkAndReturnBuffer(sendMessage);
if (m_config.on_send_completed_cb) {
m_config.on_send_completed_cb(
EndPointAddress(cachedRemoteEndpoint().address().to_string(), cachedRemoteEndpoint().port()),
sendMessage->key());
}
std::lock_guard lock(m_mutex_pending_sends);
std::erase(m_pendingSends, sendMessage->key());
}
void netio3::asyncmsg::Session::trigger_on_connection_established_cb()
{
trigger_cb(ConnectionEvent::Type::OPENED);
}
void netio3::asyncmsg::Session::trigger_on_connection_refused_cb()
{
trigger_cb(ConnectionEvent::Type::REFUSED);
}
void netio3::asyncmsg::Session::trigger_on_connection_closed_cb()
{
trigger_cb(ConnectionEvent::Type::CLOSED);
}
void netio3::asyncmsg::Session::trigger_cb(const ConnectionEvent::Type eventType)
{
const auto endpoint =
EndPointAddress(cachedRemoteEndpoint().address().to_string(), cachedRemoteEndpoint().port());
if (not m_useAsioEventLoop and m_mode == Mode::DELEGATE) {
m_addEventCb({eventType, endpoint});
return;
}
switch (eventType) {
case ConnectionEvent::Type::OPENED:
if (m_config.on_connection_established_cb) {
m_config.on_connection_established_cb(endpoint);
}
break;
case ConnectionEvent::Type::REFUSED:
if (m_config.on_connection_refused_cb) {
m_config.on_connection_refused_cb(endpoint);
}
m_connectionClosedCb(shared_from_this());
break;
case ConnectionEvent::Type::CLOSED:
if (m_config.on_connection_closed_cb) {
std::lock_guard lock(m_mutex_pending_sends);
m_config.on_connection_closed_cb(endpoint, m_pendingSends);
}
m_connectionClosedCb(shared_from_this());
break;
default:
throw std::logic_error("Unknown event type");
}
}
void netio3::asyncmsg::Session::checkAndReturnBuffer(const felix::asyncmsg::OutputMessage* message)
{
const auto* bufferedMessage = dynamic_cast<const SendMessageBuffered*>(message);
if (bufferedMessage == nullptr) {
return;
}
m_bufferManager.returnBuffer(bufferedMessage->key());
}