Program Listing for File Session.cpp

Return to documentation for file (BackendAsyncmsg/Session.cpp)

#include "Session.hpp"

#include <mutex>

#include <asyncmsg/Error.h>

#include <tracy/Tracy.hpp>

#include "Issues.hpp"
#include "SendMessage.hpp"
#include "SendMessageBuffered.hpp"
#include "netio3-backend/Issues.hpp"

netio3::asyncmsg::Session::Session(
  boost::asio::io_service& ioService,
  NetworkConfig config,
  BaseEventLoop* eventloop,
  std::function<void(std::shared_ptr<felix::asyncmsg::Session>)> connectionClosedCb,
  std::function<void(const ConnectionEvent&)> addEventCb,
  const bool useAsioEventLoop,
  const Mode mode,
  const ConnectionParameters& connectionParams) :
  felix::asyncmsg::Session(ioService),
  m_config{std::move(config)},
  m_bufferManager{connectionParams},
  m_messageSignal{eventloop->create_signal(
    [this](int /*fd*/) {
      ReceiveMessage::Message message{};
      const auto dequeued = m_messageQueue.try_pop(message);
      if (not dequeued) {
        ers::error(TcpFailedDequeueMessage(ERS_HERE));
        return;
      }
      handleOnReceive(message);
    },
    true)},
  m_connectionClosedCb{std::move(connectionClosedCb)},
  m_addEventCb{std::move(addEventCb)},
  m_useAsioEventLoop{useAsioEventLoop},
  m_mode{mode}
{}

void netio3::asyncmsg::Session::sendData(std::unique_ptr<SendMessageUnbuffered> message)
{
  ZoneScoped;
  asyncSend(std::move(message));
  std::lock_guard lock(m_mutex_pending_sends);
  m_pendingSends.push_back(message->key());
}

void netio3::asyncmsg::Session::onOpen()
{
  ERS_INFO(std::format(
    "Opened connection {}:{}", remoteEndpoint().address().to_string(), remoteEndpoint().port()));
  asyncReceive();
  trigger_on_connection_established_cb();
}

void netio3::asyncmsg::Session::onOpenError(const boost::system::error_code& error)
{
  ers::error(FailedOpenSendEndpoint(ERS_HERE,
                                    cachedRemoteEndpoint().address().to_string(),
                                    cachedRemoteEndpoint().port(),
                                    error.message()));
  trigger_on_connection_refused_cb();
}

void netio3::asyncmsg::Session::onClose()
{
  ERS_INFO(std::format("Closed connection {}:{}",
                       cachedRemoteEndpoint().address().to_string(),
                       cachedRemoteEndpoint().port()));
  trigger_on_connection_closed_cb();
}

void netio3::asyncmsg::Session::onCloseError(const boost::system::error_code& error)
{
  if (error == felix::asyncmsg::Error::SESSION_NOT_OPEN) {
    ERS_DEBUG(2,
              FailedCloseEndpoint(ERS_HERE,
                                  cachedRemoteEndpoint().address().to_string(),
                                  cachedRemoteEndpoint().port(),
                                  error.message()));
    return;
  }
  ers::warning(FailedCloseEndpoint(ERS_HERE,
                                   cachedRemoteEndpoint().address().to_string(),
                                   cachedRemoteEndpoint().port(),
                                   error.message()));
  if (m_numCloseAttempts < MAX_CLOSE_ATTEMPTS) {
    ++m_numCloseAttempts;
    asyncClose();
  } else {
    ers::error(
      FailedCloseEndpoint(ERS_HERE,
                          cachedRemoteEndpoint().address().to_string(),
                          cachedRemoteEndpoint().port(),
                          std::format("Failed max number of attempts ({})", m_numCloseAttempts)));
  }
}

std::unique_ptr<felix::asyncmsg::InputMessage> netio3::asyncmsg::Session::createMessage(
  const std::uint32_t typeId,
  const std::uint32_t transactionId,
  const std::uint32_t size)
{
  ERS_DEBUG(
    2,
    std::format(
      "Creating message for type {}, transaction {}, size {}", typeId, transactionId, size));
  return std::make_unique<ReceiveMessage>(ReceiveMessage::ReceiveMessageConfig{
    .typeId = typeId, .transactionId = transactionId, .size = size});
}

void netio3::asyncmsg::Session::onReceive(std::unique_ptr<felix::asyncmsg::InputMessage> message)
{
  ZoneScopedC(0x33cccc);
  ERS_DEBUG(2,
            std::format("Received message from {}:{}",
                        remoteEndpoint().address().to_string(),
                        remoteEndpoint().port()));
  asyncReceive();
  auto* receiveMessage = dynamic_cast<ReceiveMessage*>(message.get());
  if (receiveMessage == nullptr) {
    ers::error(TcpReceiveMessageWrongType(ERS_HERE));
    return;
  }
  if (m_mode == Mode::DELEGATE) {
    ZoneScoped;
    m_messageQueue.push(std::move(receiveMessage->getMessage()));
    m_messageSignal.fire();
  } else {
    handleOnReceive(receiveMessage->getMessage());
  }
}

void netio3::asyncmsg::Session::onReceiveError(
  const boost::system::error_code& error,
  std::unique_ptr<felix::asyncmsg::InputMessage> /*message*/)
{
  if (state() != State::OPEN) {
    ERS_LOG(std::format("Error receiving message from {}:{}: {} while not in state OPEN. Ignoring.",
                        cachedRemoteEndpoint().address().to_string(),
                        cachedRemoteEndpoint().port(),
                        error.message()));
    return;
  }
  if (error.value() == boost::asio::error::eof) {
    ERS_LOG(std::format("Connection closed by {}:{}",
                        cachedRemoteEndpoint().address().to_string(),
                        cachedRemoteEndpoint().port()));
    asyncClose();
    return;
  }
  ers::error(FailedReceive(ERS_HERE,
                           cachedRemoteEndpoint().address().to_string(),
                           cachedRemoteEndpoint().port(),
                           error.message()));
  asyncReceive();
}

void netio3::asyncmsg::Session::onSend(std::unique_ptr<const felix::asyncmsg::OutputMessage> message)
{
  ZoneScopedC(0x29a3a3);
  ERS_DEBUG(2,
            std::format("Sent message to {}:{}",
                        cachedRemoteEndpoint().address().to_string(),
                        cachedRemoteEndpoint().port()));
  handleOnSend(std::move(message));
}

void netio3::asyncmsg::Session::onSendError(
  const boost::system::error_code& error,
  std::unique_ptr<const felix::asyncmsg::OutputMessage> message)
{
  if (state() == State::OPEN or state() == State::OPEN_PENDING) {
    ers::error(FailedSend(
      ERS_HERE, remoteEndpoint().address().to_string(), remoteEndpoint().port(), error.message()));
  }
  handleOnSend(std::move(message));
}

netio3::NetworkBuffer* netio3::asyncmsg::Session::getBuffer()
{
  try {
    return m_bufferManager.getBuffer();
  } catch (const TcpNoBuffers&) {
    throw NoBuffersAllocated(
      ERS_HERE, remoteEndpoint().address().to_string(), remoteEndpoint().port());
  }
}

std::size_t netio3::asyncmsg::Session::getNumAvailableBuffers()
{
  return m_bufferManager.getNumAvailableBuffers();
}

std::vector<std::uint64_t> netio3::asyncmsg::Session::getPendingSends() const
{
  std::lock_guard lock(m_mutex_pending_sends);
  return m_pendingSends;
}

void netio3::asyncmsg::Session::handleOnReceive(const ReceiveMessage::Message& receiveMessage) const
{
  ZoneScopedC(0x1f7a7a);
  ERS_DEBUG(2, "Received message");
  if (m_config.on_data_cb) {
    m_config.on_data_cb(receiveMessage.data);
  }
}

void netio3::asyncmsg::Session::handleOnSend(std::unique_ptr<const felix::asyncmsg::OutputMessage> message)
{
  ZoneScopedC(0x1f7a7a);
  const auto* sendMessage = dynamic_cast<const SendMessage*>(message.get());
  if (sendMessage == nullptr) {
    ers::error(TcpSendMessageWrongType(ERS_HERE));
    return;
  }
  checkAndReturnBuffer(sendMessage);
  if (m_config.on_send_completed_cb) {
    m_config.on_send_completed_cb(
      EndPointAddress(cachedRemoteEndpoint().address().to_string(), cachedRemoteEndpoint().port()),
      sendMessage->key());
  }
  std::lock_guard lock(m_mutex_pending_sends);
  std::erase(m_pendingSends, sendMessage->key());
}

void netio3::asyncmsg::Session::trigger_on_connection_established_cb()
{
  trigger_cb(ConnectionEvent::Type::OPENED);
}

void netio3::asyncmsg::Session::trigger_on_connection_refused_cb()
{
  trigger_cb(ConnectionEvent::Type::REFUSED);
}

void netio3::asyncmsg::Session::trigger_on_connection_closed_cb()
{
  trigger_cb(ConnectionEvent::Type::CLOSED);
}

void netio3::asyncmsg::Session::trigger_cb(const ConnectionEvent::Type eventType)
{
  const auto endpoint =
    EndPointAddress(cachedRemoteEndpoint().address().to_string(), cachedRemoteEndpoint().port());
  if (not m_useAsioEventLoop and m_mode == Mode::DELEGATE) {
    m_addEventCb({eventType, endpoint});
    return;
  }
  switch (eventType) {
  case ConnectionEvent::Type::OPENED:
    if (m_config.on_connection_established_cb) {
      m_config.on_connection_established_cb(endpoint);
    }
    break;
  case ConnectionEvent::Type::REFUSED:
    if (m_config.on_connection_refused_cb) {
      m_config.on_connection_refused_cb(endpoint);
    }
    m_connectionClosedCb(shared_from_this());
    break;
  case ConnectionEvent::Type::CLOSED:
    if (m_config.on_connection_closed_cb) {
      std::lock_guard lock(m_mutex_pending_sends);
      m_config.on_connection_closed_cb(endpoint, m_pendingSends);
    }
    m_connectionClosedCb(shared_from_this());
    break;
  default:
    throw std::logic_error("Unknown event type");
  }
}

void netio3::asyncmsg::Session::checkAndReturnBuffer(const felix::asyncmsg::OutputMessage* message)
{
  const auto* bufferedMessage = dynamic_cast<const SendMessageBuffered*>(message);
  if (bufferedMessage == nullptr) {
    return;
  }
  m_bufferManager.returnBuffer(bufferedMessage->key());
}