Program Listing for File Server.cpp

Return to documentation for file (BackendAsyncmsg/Server.cpp)

#include "Server.hpp"

#include "Session.hpp"
#include "netio3-backend/Issues.hpp"

netio3::asyncmsg::Server::Server(boost::asio::io_service& ioService,
                                 NetworkConfig config,
                                 std::function<void(const EndPointAddress&)> onShutdownCb,
                                 std::function<void(const ConnectionEvent&)> addEventCb,
                                 BaseEventLoop* eventloop,
                                 const bool useAsioEventLoop,
                                 const Mode mode) :
  felix::asyncmsg::Server(ioService),
  m_ioService{ioService},
  m_config{std::move(config)},
  m_onShutdownCb{std::move(onShutdownCb)},
  m_addEventCb{std::move(addEventCb)},
  m_eventloop{std::move(eventloop)},
  m_useAsioEventLoop{useAsioEventLoop},
  m_mode{mode}
{}

void netio3::asyncmsg::Server::startAccept()
{
  std::lock_guard lock(m_mutex);
  asyncAccept(std::make_shared<Session>(
    m_ioService,
    m_config,
    m_eventloop,
    [this, weakSelf = weak_from_this()](const std::shared_ptr<felix::asyncmsg::Session>& session) {
      if (auto self = weakSelf.lock()) {
        closeSession(session);
      }
    },
    [this](const ConnectionEvent& event) { m_addEventCb(event); },
    m_useAsioEventLoop,
    m_mode));
}

void netio3::asyncmsg::Server::onAccept(std::shared_ptr<felix::asyncmsg::Session> session)
{
  {
    std::lock_guard lock(m_mutex);
    ERS_INFO(std::format("Accepted connection from {}:{}",
                         session->remoteEndpoint().address().to_string(),
                         session->remoteEndpoint().port()));
    m_sessions.emplace(session);
  }
  startAccept();
}

void netio3::asyncmsg::Server::onAcceptError(const boost::system::error_code& error,
                                             std::shared_ptr<felix::asyncmsg::Session> session)
{
  {
    std::lock_guard lock(m_mutex);
    ers::error(FailedOpenReceiveEndpoint(ERS_HERE,
                                         session->remoteEndpoint().address().to_string(),
                                         session->remoteEndpoint().port(),
                                         error.message()));
    const auto endpoint = EndPointAddress(session->remoteEndpoint().address().to_string(),
                                          session->remoteEndpoint().port());
    if (not m_useAsioEventLoop and m_mode == Mode::DELEGATE) {
      m_addEventCb({ConnectionEvent::Type::REFUSED, endpoint});
    } else if (m_config.on_connection_refused_cb) {
      m_config.on_connection_refused_cb(endpoint);
    }
  }
  startAccept();
}

void netio3::asyncmsg::Server::shutdown()
{
  std::lock_guard lock(m_mutex);
  m_shuttingDown = true;
  for (const auto& session : m_sessions) {
    session->asyncClose();
  }
  try {
    close();
  } catch (const boost::system::system_error& e) {
    throw FailedCloseListenEndpoint(
      ERS_HERE, localEndpoint().address().to_string(), localEndpoint().port(), e.what(), e);
  }
}

void netio3::asyncmsg::Server::closeSession(std::shared_ptr<felix::asyncmsg::Session> session)
{
  std::unique_lock lock(m_mutex);
  m_sessions.erase(session);
  if (m_shuttingDown and m_sessions.empty()) {
    if (m_onShutdownCb) {
      lock.unlock();
      m_onShutdownCb({localEndpoint().address().to_string(), localEndpoint().port()});
    }
  }
}