Program Listing for File Server.cpp

Return to documentation for file (BackendAsyncmsg/Server.cpp)

#include "Server.hpp"

#include "Session.hpp"
#include "netio3-backend/Issues.hpp"

netio3::asyncmsg::Server::Server(boost::asio::io_context& ioContext,
                                 CallbacksConfig callbacksConfig,
                                 std::function<void(const EndPointAddress&)> onShutdownCb,
                                 std::function<void(std::shared_ptr<felix::asyncmsg::Session>)> onOpenedCb,
                                 std::function<void(std::shared_ptr<felix::asyncmsg::Session>)> onClosedCb,
                                 std::function<void(const ConnectionEvent&)> addEventCb,
                                 BaseEventLoop* eventloop,
                                 const bool useAsioEventLoop,
                                 const Mode mode,
                                 BufferManager* sharedBufferManager,
                                 ConnectionParameters connectionParams,
                                 EndPointAddress endpoint) :
  felix::asyncmsg::Server(ioContext),
  m_ioContext{ioContext},
  m_callbacks{std::move(callbacksConfig)},
  m_onShutdownCb{std::move(onShutdownCb)},
  m_onOpenedCb{std::move(onOpenedCb)},
  m_onClosedCb{std::move(onClosedCb)},
  m_addEventCb{std::move(addEventCb)},
  m_eventloop{std::move(eventloop)},
  m_bufferManagerPtr{sharedBufferManager},
  m_useAsioEventLoop{useAsioEventLoop},
  m_mode{mode},
  m_connectionParams{std::move(connectionParams)},
  m_endpoint{std::move(endpoint)}
{}

void netio3::asyncmsg::Server::startAccept()
{
  std::lock_guard lock(m_mutex);
  asyncAccept(std::make_shared<Session>(
    m_ioContext,
    m_callbacks,
    m_eventloop,
    [this, weakSelf = weak_from_this()](const std::shared_ptr<felix::asyncmsg::Session>& session) {
      if (auto self = weakSelf.lock()) {
        onSessionClosed(session);
      }
    },
    [this](const ConnectionEvent& event) { m_addEventCb(event); },
    m_useAsioEventLoop,
    m_mode,
    m_bufferManagerPtr,
    m_connectionParams));
}

void netio3::asyncmsg::Server::onAccept(std::shared_ptr<felix::asyncmsg::Session> session)
{
  {
    std::lock_guard lock(m_mutex);
    ERS_INFO(std::format("Accepted connection from {}:{}",
                         session->remoteEndpoint().address().to_string(),
                         session->remoteEndpoint().port()));
    m_sessions.emplace(session);
  }
  if (m_onOpenedCb) {
    m_onOpenedCb(session);
  }
  startAccept();
}

void netio3::asyncmsg::Server::onAcceptError(const boost::system::error_code& error,
                                             std::shared_ptr<felix::asyncmsg::Session> session)
{
  {
    std::lock_guard lock(m_mutex);
    ers::error(FailedOpenActiveEndpoint(ERS_HERE,
                                        session->remoteEndpoint().address().to_string(),
                                        session->remoteEndpoint().port(),
                                        error.message()));
    const auto endpoint = EndPointAddress(session->remoteEndpoint().address().to_string(),
                                          session->remoteEndpoint().port());
    const auto local_endpoint = EndPointAddress(session->localEndpoint().address().to_string(),
                                                session->localEndpoint().port());
    if (not m_useAsioEventLoop and m_mode == Mode::DELEGATE) {
      m_addEventCb({ConnectionEvent::Type::REFUSED, endpoint, local_endpoint});
    } else if (m_callbacks.on_connection_refused_cb) {
      m_callbacks.on_connection_refused_cb(endpoint);
    }
  }
  startAccept();
}

void netio3::asyncmsg::Server::shutdown()
{
  std::unique_lock lock(m_mutex);
  m_shuttingDown = true;
  for (const auto& session : m_sessions) {
    session->asyncClose();
  }
  try {
    close();
  } catch (const boost::system::system_error& e) {
    throw FailedCloseListenEndpoint(
      ERS_HERE, localEndpoint().address().to_string(), localEndpoint().port(), e.what(), e);
  }
  if (m_sessions.empty() and m_onShutdownCb) {
    lock.unlock();
    boost::asio::post(m_ioContext.get(), [this, self = shared_from_this()]() {
      m_onShutdownCb(m_endpoint);
    });
  }
}

void netio3::asyncmsg::Server::onSessionClosed(std::shared_ptr<felix::asyncmsg::Session> session)
{
  std::unique_lock lock(m_mutex);
  if (m_onClosedCb) {
    m_onClosedCb(session);
  }
  m_sessions.erase(session);
  if (m_shuttingDown and m_sessions.empty()) {
    if (m_onShutdownCb) {
      lock.unlock();
      m_onShutdownCb(m_endpoint);
    }
  }
}