Program Listing for File EpollEventLoop.cpp

Return to documentation for file (EventLoop/EpollEventLoop.cpp)

#include "netio3-backend/EventLoop/EpollEventLoop.hpp"

#include <chrono>
#include <mutex>

#include <tracy/Tracy.hpp>

#include "netio3-backend/EventLoop/BaseEventLoop.hpp"
#include "netio3-backend/Issues.hpp"
#include "netio3-backend/EventLoop/Utility.hpp"

netio3::EpollEventLoop::EpollEventLoop(std::function<void()> cb_init) :
  m_epollfd{epoll_create1(0)},
  m_cb_init{std::move(cb_init)},
  m_register_fd_signal{create_internal_signal([this](int) { register_queued_fd(); })},
  m_unregister_fd_signal{create_internal_signal([this](int) { unregister_queued_fd(); })},
  m_stop_fd_signal{create_internal_signal([](int) {})}
{}

netio3::EpollEventLoop::~EpollEventLoop()
{
  do_stop();
  close(m_register_fd_signal);
  close(m_unregister_fd_signal);
  close(m_stop_fd_signal);
}

void netio3::EpollEventLoop::process_event(const EventContext& evc)
{
  if (evc.cb != nullptr) {
    evc.cb(evc.fd);
  }
}

void netio3::EpollEventLoop::register_fd(const EventContext& ctx)
{
  ZoneScoped;
  ERS_DEBUG(2, std::format("Register fd {}", ctx.fd));
  if (get_thread_id() == std::this_thread::get_id()) {
    do_register_fd(ctx);
  } else {
    m_register_queue.emplace(ctx);
    fire_internal_signal(m_register_fd_signal);
  }
}

void netio3::EpollEventLoop::remove_fd(const int fd, bool close_fd, bool wait)
{
  ZoneScoped;
  ERS_DEBUG(2, std::format("Removing fd {}", fd));
  if (get_thread_id() == std::this_thread::get_id()) {
    if (wait) {
      m_fds_to_close.try_emplace(fd, close_fd);
    } else {
      do_unregister_fd(fd, close_fd);
    }
  } else {
    m_unregister_queue.emplace(fd, close_fd, wait);
    fire_internal_signal(m_unregister_fd_signal);
  }
}

std::ostream& operator <<(std::ostream& stream, const netio3::EventContext& val) {
  return stream << "{" << val.fd << "}";
}

void netio3::EpollEventLoop::run()
{
  set_thread_id(std::this_thread::get_id());
  if (m_cb_init) {
    m_cb_init();
  }

  constexpr static auto MAX_ITERATIONS_AFTER_STOP{5};
  auto iterations_after_stop{0};

  bool running = true;
  m_is_running = true;
  while (running) {
    // don't want to block or wait too long if we're shutting down
    const auto timeout = m_ev_should_stop ? MIN_POLL_TIMEOUT : EPOLL_TIMEOUT;
    std::array<struct epoll_event, MAX_EPOLL_EVENTS> active_events{};
    int nevents = epoll_wait(m_epollfd, active_events.data(), MAX_EPOLL_EVENTS, timeout);
    TracyPlot("Number of Events", static_cast<std::int64_t>(nevents));
    ZoneScoped;
    ERS_DEBUG(3, std::format("{} events to process", nevents));

    for (const auto fd : m_fds_to_remove) {
      m_ev_contexts_by_fd.erase(fd);
      std::lock_guard lck(m_mutex);
      std::erase(m_open_fds, fd);
    }
    m_fds_to_remove.clear();

    for (int event = 0; event < nevents; ++event) {
      ZoneScoped;
      const auto ctx_iter = m_ev_contexts_by_fd.find(active_events.at(event).data.fd);
      if (ctx_iter == m_ev_contexts_by_fd.end() or std::ranges::find(m_fds_to_remove, active_events.at(event).data.fd) != m_fds_to_remove.end()) {
        ERS_DEBUG(1,
                  std::format("Skipping invalid event context for fd {} events {}",
                              int{active_events.at(event).data.fd},
                              std::uint32_t{active_events.at(event).events}));
        continue;
      }
      const auto& evctx = ctx_iter->second;
      process_event(evctx);
      if ((active_events.at(event).events & EPOLLRDHUP) != 0U) {
        do_unregister_fd(evctx.fd, false);
      }
      if (m_fds_to_close.contains(evctx.fd) and not check_fd(evctx.fd)) {
        do_unregister_fd(evctx.fd, false);
      }
    }

    if (m_ev_should_stop.load() && (nevents < 1 || iterations_after_stop > MAX_ITERATIONS_AFTER_STOP)) {
      running = false;
      if (nevents > 0) {
        ERS_LOG(std::format("Stopping event loop, but still had events {} to process", nevents));
      }
      continue;
    }
    if (m_ev_should_stop.load()) {
      ERS_DEBUG(1, std::format("Stopping event loop, num events {}, num iterations after stop {}", nevents, iterations_after_stop));
      ++iterations_after_stop;
    }

    if (nevents == -1) {
      const auto errsv = errno;
      if (errsv == EINTR) {
        continue;
      }
      throw EpollWaitError(ERS_HERE, utility::error_message(errsv));
    }
  }  // end of while running
  close(m_epollfd);
  std::lock_guard lck(m_mutex);
  std::for_each(m_open_fds.begin(), m_open_fds.end(), [](auto& fd) { close(fd); });
  m_is_running = false;
  reset_thread_id();
  ERS_INFO("Event loop stopped");
}

void netio3::EpollEventLoop::run_for(const std::uint64_t sec)
{
  auto timer = create_timer([this](int) { stop(); });
  timer.start(std::chrono::seconds{sec});
  run();
  remove_timer(timer);
}

void netio3::EpollEventLoop::stop()
{
  do_stop();
}

bool netio3::EpollEventLoop::is_running() const {
  return m_is_running.load();
}

void netio3::EpollEventLoop::register_queued_fd() {
  ZoneScoped;
  EventContext ctx{};
  while (m_register_queue.try_pop(ctx)) {
    do_register_fd(ctx);
  }
}

void netio3::EpollEventLoop::unregister_queued_fd() {
  ZoneScoped;
  CloseFdQueueItem item{};
  while (m_unregister_queue.try_pop(item)) {
    if (item.wait) {
      m_fds_to_close.try_emplace(item.fd, item.close_fd);
    } else {
      do_unregister_fd(item.fd, item.close_fd);
    }
  }
}

void netio3::EpollEventLoop::do_register_fd(const EventContext& ctx, const bool track_fd) {
  ZoneScoped;
  ERS_DEBUG(2, std::format("Actually registering fd {}", ctx.fd));
  m_ev_contexts_by_fd.try_emplace(ctx.fd, ctx);
  epoll_event ev{.events = (EPOLLIN | EPOLLRDHUP), .data = {.fd = ctx.fd}};

  // Set the file descriptor to non-blocking mode.
  update_flags(ctx.fd, O_NONBLOCK);

  ERS_DEBUG(2, std::format("Adding {} to epoll {}", ctx.fd, m_epollfd));
  if (epoll_ctl(m_epollfd, EPOLL_CTL_ADD, ctx.fd, &ev) != 0) {
    ers::error(FailedAddingFd(ERS_HERE, ctx.fd, utility::error_message(errno)));
    return;
  }
  if (track_fd) {
    std::lock_guard lck(m_mutex);
    m_open_fds.push_back(ctx.fd);
  }
}

void netio3::EpollEventLoop::do_unregister_fd(const int fd, const bool close_fd) {
  ZoneScoped;
  ERS_DEBUG(2, std::format("Actually unregistering fd {}", fd));
  const auto found = [this, &fd] {
    std::lock_guard lck(m_mutex);
    return std::ranges::find(m_open_fds, fd) != m_open_fds.end();
  }();
  if (found) {
    const auto stat = epoll_ctl(m_epollfd, EPOLL_CTL_DEL, fd, nullptr);
    // EBADF means the file descriptor is already closed. This may happen and the fd is
    // automatically removed from epoll_ctl instance
    if (stat != 0 && errno != EBADF) {
      ERS_INFO(std::format("Error {} ({}) removing fd {} from m_epollfd", errno, utility::error_message(errno), fd));
    }

    m_fds_to_remove.push_back(fd);

    const auto evctx = m_ev_contexts_by_fd.at(fd);
    if (evctx.on_closed_cb != nullptr) {
      evctx.on_closed_cb(evctx.fd);
    }
  } else {
    ers::warning(FailedRemovingFd(ERS_HERE, fd));
  }

  if (close_fd) {
    const auto ret = close(fd);
    if (ret != 0) {
      ers::error(FailedClosingFd(ERS_HERE, fd, utility::error_message(errno)));
    }
  }
}

int netio3::EpollEventLoop::create_internal_signal(const std::function<void(int)>& cb)
{
  const auto flags =  EFD_NONBLOCK;
  const auto fd = eventfd(0, flags);
  const auto etx = EventContext{
    .fd = fd,
    .cb =
      [cb](int fd_cb) {
        std::uint64_t buf{};
        constexpr static auto NBYTES = 8;
        if (NBYTES != read(fd_cb, &buf, sizeof(buf))) {
          ERS_DEBUG(1, std::format("Did not read 8 bytes. Instead: {}", buf));
        }
        if (cb) {
          cb(fd_cb);
        }
      },
  };
  // Do not close when event loop is closed
  do_register_fd(etx, false);
  return etx.fd;
}

void netio3::EpollEventLoop::fire_internal_signal(const int fd)
{
  const auto buf = std::uint64_t{1};
  const auto ret = write(fd, &buf, sizeof(buf));
  constexpr static auto NBYTES = 8;
  if (ret != NBYTES) {
    ers::error(FailedFireSignal(ERS_HERE, fd, ret, utility::error_message(errno)));
  }
}

void netio3::EpollEventLoop::do_stop()
{
  ERS_INFO("Stopping evloop");
  m_ev_should_stop = true;
  fire_internal_signal(m_stop_fd_signal);
}