Program Listing for File BaseEventLoop.cpp

Return to documentation for file (EventLoop/BaseEventLoop.cpp)

#include "netio3-backend/EventLoop/BaseEventLoop.hpp"

#include <ers/ers.h>

netio3::EventSignalHandle netio3::BaseEventLoop::create_signal(
  const std::function<void(int)>& cb,
  bool useSemaphore)
{
  const auto flags = [&useSemaphore] () -> int {
    if (useSemaphore) {
      return EFD_NONBLOCK | EFD_SEMAPHORE;
    }
    return EFD_NONBLOCK;
  }();
  const auto fd = eventfd(0, flags);
  const auto etx = EventContext{
    .fd = fd,
    .cb =
      [cb](int fd_cb) {
        std::uint64_t buf{};
        constexpr static auto NBYTES = 8;
        if (NBYTES != read(fd_cb, &buf, sizeof(buf))) {
          ERS_DEBUG(1, std::format("Did not read 8 bytes. Instead: {}", buf));
        }
        if (cb) {
          cb(fd_cb);
        }
      },
  };
  register_fd(etx);
  return EventSignalHandle{fd, shared_from_this()};
}

void netio3::BaseEventLoop::remove_signal(const EventSignalHandle& handle)
{
  remove_fd(handle.get_fd(), true, true);
}

netio3::EventTimerHandle netio3::BaseEventLoop::create_timer(const std::function<void(int)>& cb)
{
  const auto fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
  const auto etx =
    EventContext{.fd = fd, .cb = [cb](int fd_cb) {
                   std::uint64_t buf{};
                   constexpr static auto NBYTES = 8;
                   if (NBYTES != read(fd_cb, &buf, sizeof(buf))) {
                     ERS_DEBUG(1, std::format("Did not read 8 bytes. Instead: {}", buf));
                   }
                   if (cb) {
                     cb(fd_cb);
                   }
                 }};
  register_fd(etx);
  return EventTimerHandle{fd, shared_from_this()};
}

void netio3::BaseEventLoop::remove_timer(const EventTimerHandle& handle)
{
  remove_fd(handle.get_fd(), true);
}

void netio3::BaseEventLoop::update_flags(const int fd, const int new_flags)
{
  const auto flags = fcntl(fd, F_GETFL) | new_flags;
  if (fcntl(fd, F_SETFL, flags) < 0) {
    ers::error(FailedUpdateFdFlags(ERS_HERE, fd, utility::error_message(errno)));
  }
}

void netio3::BaseEventLoop::wait_for_fd(const int fd,
                                        const std::chrono::milliseconds timeout,
                                        const std::function<void()>& finish_function) const
{
  const auto now = std::chrono::steady_clock::now();
  while (std::chrono::steady_clock::now() - now < timeout) {
    if (check_fd(fd)) {
      if (get_thread_id() == std::this_thread::get_id()) {
        finish_function();
        return;
      }
      constexpr static auto WAIT_TIME = std::chrono::milliseconds{10};
      std::this_thread::sleep_for(WAIT_TIME);
    } else {  // Signal handled
      return;
    }
  }
  ers::warning(SkippedEvents(ERS_HERE, fd));
}

bool netio3::BaseEventLoop::check_fd(const int fd)
{
  auto pfd = pollfd{.fd = fd, .events = POLLIN, .revents = 0};
  const auto poll_status = poll(&pfd, 1, 0);
  return poll_status > 0 && ((pfd.revents & POLLIN) != 0);
}