Program Listing for File Session.cpp

Return to documentation for file (BackendAsyncmsg/Session.cpp)

#include "Session.hpp"

#include <mutex>
#include <utility>

#include <asyncmsg/Error.h>

#include <tracy/Tracy.hpp>

#include "Issues.hpp"
#include "SendMessage.hpp"
#include "SendMessageBuffered.hpp"
#include "netio3-backend/Issues.hpp"

netio3::asyncmsg::Session::Session(
  boost::asio::io_context& ioContext,
  CallbacksConfig callbacksConfig,
  BaseEventLoop* eventloop,
  std::function<void(std::shared_ptr<felix::asyncmsg::Session>)> connectionClosedCb,
  std::function<void(const ConnectionEvent&)> addEventCb,
  const bool useAsioEventLoop,
  const Mode mode,
  BufferManager* sharedBufferManager,
  ConnectionParameters connectionParams) :
  felix::asyncmsg::Session(ioContext),
  m_callbacks{std::move(callbacksConfig)},
  m_connectionParams{std::move(connectionParams)},
  m_bufferManager{std::invoke([this, sharedBufferManager] () -> std::optional<BufferManager> {
    if (sharedBufferManager != nullptr) {
      return std::nullopt;
    }
    return std::optional<BufferManager>{std::in_place, m_connectionParams.send_buffered_params};
  })},
  m_bufferManagerPtr{std::invoke([this, sharedBufferManager]() -> BufferManager* {
    if (sharedBufferManager != nullptr) {
      return sharedBufferManager;
    }
    return &m_bufferManager.value();
  })},
  m_messageSignal{eventloop->create_signal(
    [this](int /*fd*/) {
      ReceiveMessage::Message message{};
      const auto dequeued = m_messageQueue.try_pop(message);
      if (not dequeued) {
        ers::error(TcpFailedDequeueMessage(ERS_HERE));
        return;
      }
      handleOnReceive(message);
    },
    true)},
  m_connectionClosedCb{std::move(connectionClosedCb)},
  m_addEventCb{std::move(addEventCb)},
  m_useAsioEventLoop{useAsioEventLoop},
  m_mode{mode}
{}

void netio3::asyncmsg::Session::sendData(std::unique_ptr<SendMessageUnbuffered> message)
{
  ZoneScoped;
  asyncSend(std::move(message));
  std::lock_guard lock(m_mutex_pending_sends);
  m_pendingSends.push_back(message->key());
}

void netio3::asyncmsg::Session::onOpen()
{
  ERS_INFO(std::format(
    "Opened connection {}:{}", remoteEndpoint().address().to_string(), remoteEndpoint().port()));
  asyncReceive();
  trigger_on_connection_established_cb();
}

void netio3::asyncmsg::Session::onOpenError(const boost::system::error_code& error)
{
  ers::error(FailedOpenActiveEndpoint(ERS_HERE,
                                      cachedRemoteEndpoint().address().to_string(),
                                      cachedRemoteEndpoint().port(),
                                      error.message()));
  trigger_on_connection_refused_cb();
}

void netio3::asyncmsg::Session::onClose()
{
  ERS_INFO(std::format("Closed connection {}:{}",
                       cachedRemoteEndpoint().address().to_string(),
                       cachedRemoteEndpoint().port()));
  trigger_on_connection_closed_cb();
}

void netio3::asyncmsg::Session::onCloseError(const boost::system::error_code& error)
{
  if (error == felix::asyncmsg::Error::SESSION_NOT_OPEN) {
    ERS_DEBUG(2,
              FailedCloseActiveEndpoint(ERS_HERE,
                                        cachedRemoteEndpoint().address().to_string(),
                                        cachedRemoteEndpoint().port(),
                                        error.message()));
    return;
  }
  ers::warning(FailedCloseActiveEndpoint(ERS_HERE,
                                         cachedRemoteEndpoint().address().to_string(),
                                         cachedRemoteEndpoint().port(),
                                         error.message()));
  if (m_numCloseAttempts < MAX_CLOSE_ATTEMPTS) {
    ++m_numCloseAttempts;
    asyncClose();
  } else {
    ers::error(FailedCloseActiveEndpoint(
      ERS_HERE,
      cachedRemoteEndpoint().address().to_string(),
      cachedRemoteEndpoint().port(),
      std::format("Failed max number of attempts ({})", m_numCloseAttempts)));
  }
}

std::unique_ptr<felix::asyncmsg::InputMessage> netio3::asyncmsg::Session::createMessage(
  const std::uint32_t typeId,
  const std::uint32_t transactionId,
  const std::uint32_t size)
{
  ERS_DEBUG(
    2,
    std::format(
      "Creating message for type {}, transaction {}, size {}", typeId, transactionId, size));
  return std::make_unique<ReceiveMessage>(ReceiveMessage::ReceiveMessageConfig{
    .typeId = typeId, .transactionId = transactionId, .size = size});
}

void netio3::asyncmsg::Session::onReceive(std::unique_ptr<felix::asyncmsg::InputMessage> message)
{
  ZoneScopedC(0x33cccc);
  ERS_DEBUG(2,
            std::format("Received message from {}:{}",
                        remoteEndpoint().address().to_string(),
                        remoteEndpoint().port()));
  asyncReceive();
  auto* receiveMessage = dynamic_cast<ReceiveMessage*>(message.get());
  if (receiveMessage == nullptr) {
    ers::error(TcpReceiveMessageWrongType(ERS_HERE));
    return;
  }
  if (m_mode == Mode::DELEGATE) {
    ZoneScoped;
    m_messageQueue.push(std::move(receiveMessage->getMessage()));
    m_messageSignal.fire();
  } else {
    handleOnReceive(receiveMessage->getMessage());
  }
}

void netio3::asyncmsg::Session::onReceiveError(
  const boost::system::error_code& error,
  std::unique_ptr<felix::asyncmsg::InputMessage> /*message*/)
{
  if (state() != State::OPEN) {
    ERS_LOG(std::format("Error receiving message from {}:{}: {} while not in state OPEN. Ignoring.",
                        cachedRemoteEndpoint().address().to_string(),
                        cachedRemoteEndpoint().port(),
                        error.message()));
    return;
  }
  if (error.value() == boost::asio::error::eof) {
    ERS_LOG(std::format("Connection closed by {}:{}",
                        cachedRemoteEndpoint().address().to_string(),
                        cachedRemoteEndpoint().port()));
    asyncClose();
    return;
  }
  if (error.value() == boost::asio::error::connection_reset ||
      error.value() == boost::asio::error::connection_aborted ||
      error.value() == boost::asio::error::broken_pipe) {
    ERS_LOG(std::format("Connection abruptly closed by {}:{}: {}",
                        cachedRemoteEndpoint().address().to_string(),
                        cachedRemoteEndpoint().port(),
                        error.message()));
    asyncClose();
    return;
  }
  ers::error(FailedReceive(ERS_HERE,
                           cachedRemoteEndpoint().address().to_string(),
                           cachedRemoteEndpoint().port(),
                           error.message()));
  asyncReceive();
}

void netio3::asyncmsg::Session::onSend(std::unique_ptr<const felix::asyncmsg::OutputMessage> message)
{
  ZoneScopedC(0x29a3a3);
  ERS_DEBUG(2,
            std::format("Sent message to {}:{}",
                        cachedRemoteEndpoint().address().to_string(),
                        cachedRemoteEndpoint().port()));
  handleOnSend(std::move(message));
}

void netio3::asyncmsg::Session::onSendError(
  const boost::system::error_code& error,
  std::unique_ptr<const felix::asyncmsg::OutputMessage> message)
{
  if (state() == State::OPEN or state() == State::OPEN_PENDING) {
    ers::error(FailedSend(
      ERS_HERE, remoteEndpoint().address().to_string(), remoteEndpoint().port(), error.message()));
  }
  handleOnSend(std::move(message));
}

netio3::NetworkBuffer* netio3::asyncmsg::Session::getBuffer()
{
  try {
    return m_bufferManagerPtr->getBuffer();
  } catch (const TcpNoBuffers&) {
    throw NoBuffersAllocated(
      ERS_HERE, remoteEndpoint().address().to_string(), remoteEndpoint().port());
  }
}

std::size_t netio3::asyncmsg::Session::getNumAvailableBuffers()
{
  return m_bufferManagerPtr->getNumAvailableBuffers();
}

std::vector<std::uint64_t> netio3::asyncmsg::Session::getPendingSends() const
{
  std::lock_guard lock(m_mutex_pending_sends);
  return m_pendingSends;
}

void netio3::asyncmsg::Session::handleOnReceive(const ReceiveMessage::Message& receiveMessage) const
{
  ZoneScopedC(0x1f7a7a);
  ERS_DEBUG(2, "Received message");
  if (m_callbacks.on_data_cb) {
    m_callbacks.on_data_cb(receiveMessage.data);
  }
}

void netio3::asyncmsg::Session::handleOnSend(std::unique_ptr<const felix::asyncmsg::OutputMessage> message)
{
  ZoneScopedC(0x1f7a7a);
  const auto* sendMessage = dynamic_cast<const SendMessage*>(message.get());
  if (sendMessage == nullptr) {
    ers::error(TcpSendMessageWrongType(ERS_HERE));
    return;
  }
  checkAndReturnBuffer(sendMessage);
  if (m_callbacks.on_send_completed_cb) {
    m_callbacks.on_send_completed_cb(
      EndPointAddress(cachedRemoteEndpoint().address().to_string(), cachedRemoteEndpoint().port()),
      sendMessage->key());
  }
  std::lock_guard lock(m_mutex_pending_sends);
  std::erase(m_pendingSends, sendMessage->key());
}

void netio3::asyncmsg::Session::trigger_on_connection_established_cb()
{
  trigger_cb(ConnectionEvent::Type::OPENED);
}

void netio3::asyncmsg::Session::trigger_on_connection_refused_cb()
{
  trigger_cb(ConnectionEvent::Type::REFUSED);
}

void netio3::asyncmsg::Session::trigger_on_connection_closed_cb()
{
  trigger_cb(ConnectionEvent::Type::CLOSED);
}

void netio3::asyncmsg::Session::trigger_cb(const ConnectionEvent::Type eventType)
{
  const auto endpoint =
    EndPointAddress(cachedRemoteEndpoint().address().to_string(), cachedRemoteEndpoint().port());
  const auto local_endpoint =
    EndPointAddress(localEndpoint().address().to_string(), localEndpoint().port());
  const auto capabilities = EndpointCapabilities{
    .send_buffered = m_connectionParams.send_buffered_params.num_buf > 0 or
                     m_connectionParams.send_buffered_params.use_shared_send_buffers,
    .send_copy = true,
    .send_zero_copy = true,
    .receive = true,
  };
  if (not m_useAsioEventLoop and m_mode == Mode::DELEGATE) {
    m_addEventCb({eventType, endpoint, local_endpoint, capabilities});
    return;
  }
  switch (eventType) {
  case ConnectionEvent::Type::OPENED:
    if (m_callbacks.on_connection_established_cb) {
      m_callbacks.on_connection_established_cb(endpoint, local_endpoint, capabilities);
    }
    break;
  case ConnectionEvent::Type::REFUSED:
    if (m_callbacks.on_connection_refused_cb) {
      m_callbacks.on_connection_refused_cb(endpoint);
    }
    m_connectionClosedCb(shared_from_this());
    break;
  case ConnectionEvent::Type::CLOSED:
    if (m_callbacks.on_connection_closed_cb) {
      std::lock_guard lock(m_mutex_pending_sends);
      m_callbacks.on_connection_closed_cb(endpoint, m_pendingSends);
    }
    m_connectionClosedCb(shared_from_this());
    break;
  default:
    throw std::logic_error("Unknown event type");
  }
}

void netio3::asyncmsg::Session::checkAndReturnBuffer(const felix::asyncmsg::OutputMessage* message)
{
  const auto* bufferedMessage = dynamic_cast<const SendMessageBuffered*>(message);
  if (bufferedMessage == nullptr) {
    return;
  }
  m_bufferManagerPtr->returnBuffer(bufferedMessage->key());
}