Program Listing for File AsioEventLoop.cpp

Return to documentation for file (EventLoop/AsioEventLoop.cpp)

#include "netio3-backend/EventLoop/AsioEventLoop.hpp"

#include <chrono>

#include <boost/asio.hpp>
#include <unistd.h>

#include <ers/ers.h>

#include "netio3-backend/EventLoop/BaseEventLoop.hpp"

netio3::AsioEventLoop::AsioEventLoop(std::function<void()> cb_init) : m_cb_init{std::move(cb_init)}
{}

netio3::AsioEventLoop::~AsioEventLoop()
{
  do_stop();
}

void netio3::AsioEventLoop::process_event(const EventContext& evc)
{
  if (evc.cb != nullptr) {
    evc.cb(evc.fd);
  }
}

void netio3::AsioEventLoop::register_fd(const EventContext& ctx)
{
  std::lock_guard lck(m_mutex);

  // Create a stream_descriptor for the file descriptor.
  auto streamDescriptor =
    std::make_shared<boost::asio::posix::stream_descriptor>(m_ioService, ctx.fd);

  // Set the file descriptor to non-blocking mode.
  update_flags(ctx.fd, O_NONBLOCK);

  // Store the stream_descriptor in a map.
  m_streamDescriptors.try_emplace(ctx.fd, streamDescriptor);
  m_ev_contexts_by_fd.try_emplace(ctx.fd, ctx);

  // Start an asynchronous read operation.
  start_async_read(
    streamDescriptor,
    [ctx] { process_event(ctx); },
    [this, fd = ctx.fd](const boost::system::error_code& error) {
      if (error == boost::asio::error::eof) {
        remove_fd(fd);
      }
      ERS_LOG(std::format("Failed executing callback for {} due to {}", fd, error.message()));
    });

  ERS_DEBUG(2, std::format("Added file descriptor {} to Boost.Asio event loop", ctx.fd));
}

void netio3::AsioEventLoop::remove_fd(const int fd, bool /*close_fd*/, bool wait)
{
  // Could use some cleanup regarding locks and call order and so on
  if (wait) {
    constexpr static auto TIMEOUT = std::chrono::milliseconds(100);
    finish_callbacks(fd, TIMEOUT);
  }
  std::lock_guard lck(m_mutex);
  const auto it = m_streamDescriptors.find(fd);
  if (it != m_streamDescriptors.end()) {
    it->second->close();
    m_streamDescriptors.erase(it);
    const auto evctx = m_ev_contexts_by_fd.at(fd);
    if (evctx.on_closed_cb != nullptr) {
      evctx.on_closed_cb(evctx.fd);
    }
    ERS_DEBUG(2, std::format("Removed file descriptor {} from Boost.Asio event loop", fd));
  } else {
    ers::warning(FailedRemovingFd(ERS_HERE, fd));
  }
}

void netio3::AsioEventLoop::finish_callbacks(const int fd, std::chrono::milliseconds timeout)
{
  std::unique_lock lck(m_mutex);
  const auto it = m_streamDescriptors.find(fd);
  if (it != m_streamDescriptors.end()) {
    auto& stream_descriptor = it->second;
    const auto native_fd = stream_descriptor->native_handle();
    lck.unlock();
    wait_for_fd(native_fd, timeout, [this, &fd]() {
      process_event(m_ev_contexts_by_fd.at(fd));
    });
  }
}

void netio3::AsioEventLoop::run()
{
  {
    std::lock_guard lck(m_mutex);
    set_thread_id(std::this_thread::get_id());
    if (m_cb_init) {
      m_cb_init();
    }
    m_work = std::make_unique<boost::asio::io_service::work>(m_ioService);
  }
  m_is_running = true;
  m_ioService.run();
  m_is_running = false;
  ERS_INFO("Event loop stopped");
}

void netio3::AsioEventLoop::run_for(const std::uint64_t sec)
{
  auto timer = create_timer([this](int) { stop(); });
  timer.start(std::chrono::seconds{sec});
  run();
}

void netio3::AsioEventLoop::stop()
{
  std::lock_guard lck(m_mutex);
  do_stop();
}

bool netio3::AsioEventLoop::is_running() const {
  return m_is_running.load();
}

void netio3::AsioEventLoop::start_async_read(
  const std::shared_ptr<boost::asio::posix::stream_descriptor>& stream_descriptor,
  const std::function<void()>& success_handler,
  const std::function<void(const boost::system::error_code&)>& error_handler) const
{
  std::lock_guard lck(m_mutex);
  if (not stream_descriptor->is_open()) {
    return;
  }
  stream_descriptor->async_read_some(
    boost::asio::null_buffers(),
    [this, success_handler, error_handler, stream_descriptor](
      const boost::system::error_code& error, std::size_t /*bytes_transferred*/) {
      if (!error) {
        success_handler();
      } else {
        error_handler(error);
      }
      start_async_read(stream_descriptor, success_handler, error_handler);
    });
}

void netio3::AsioEventLoop::do_stop()
{
  ERS_INFO("Stopping evloop");
  m_work.reset();
  m_ioService.stop();
  reset_thread_id();
}