.. _program_listing_file_EventLoop_EpollEventLoop.cpp: Program Listing for File EpollEventLoop.cpp =========================================== |exhale_lsh| :ref:`Return to documentation for file ` (``EventLoop/EpollEventLoop.cpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #include "netio3-backend/EventLoop/EpollEventLoop.hpp" #include #include #include #include "netio3-backend/EventLoop/BaseEventLoop.hpp" #include "netio3-backend/Issues.hpp" #include "netio3-backend/EventLoop/Utility.hpp" netio3::EpollEventLoop::EpollEventLoop(std::function cb_init) : m_epollfd{epoll_create1(0)}, m_cb_init{std::move(cb_init)}, m_register_fd_signal{create_internal_signal([this](int) { register_queued_fd(); })}, m_unregister_fd_signal{create_internal_signal([this](int) { unregister_queued_fd(); })}, m_stop_fd_signal{create_internal_signal([](int) {})} {} netio3::EpollEventLoop::~EpollEventLoop() { do_stop(); close(m_register_fd_signal); close(m_unregister_fd_signal); close(m_stop_fd_signal); } void netio3::EpollEventLoop::process_event(const EventContext& evc) { if (evc.cb != nullptr) { evc.cb(evc.fd); } } void netio3::EpollEventLoop::register_fd(const EventContext& ctx) { ZoneScoped; ERS_DEBUG(2, std::format("Register fd {}", ctx.fd)); if (get_thread_id() == std::this_thread::get_id()) { do_register_fd(ctx); } else { m_register_queue.emplace(ctx); fire_internal_signal(m_register_fd_signal); } } void netio3::EpollEventLoop::remove_fd(const int fd, bool close_fd, bool wait) { ZoneScoped; ERS_DEBUG(2, std::format("Removing fd {}", fd)); if (get_thread_id() == std::this_thread::get_id()) { if (wait) { m_fds_to_close.try_emplace(fd, close_fd); } else { do_unregister_fd(fd, close_fd); } } else { m_unregister_queue.emplace(fd, close_fd, wait); fire_internal_signal(m_unregister_fd_signal); } } std::ostream& operator <<(std::ostream& stream, const netio3::EventContext& val) { return stream << "{" << val.fd << "}"; } void netio3::EpollEventLoop::run() { set_thread_id(std::this_thread::get_id()); if (m_cb_init) { m_cb_init(); } constexpr static auto MAX_ITERATIONS_AFTER_STOP{5}; auto iterations_after_stop{0}; bool running = true; m_is_running = true; while (running) { // don't want to block or wait too long if we're shutting down const auto timeout = m_ev_should_stop ? MIN_POLL_TIMEOUT : EPOLL_TIMEOUT; std::array active_events{}; int nevents = epoll_wait(m_epollfd, active_events.data(), MAX_EPOLL_EVENTS, timeout); TracyPlot("Number of Events", static_cast(nevents)); ZoneScoped; ERS_DEBUG(3, std::format("{} events to process", nevents)); for (const auto fd : m_fds_to_remove) { m_ev_contexts_by_fd.erase(fd); std::lock_guard lck(m_mutex); std::erase(m_open_fds, fd); } m_fds_to_remove.clear(); for (int event = 0; event < nevents; ++event) { ZoneScoped; const auto ctx_iter = m_ev_contexts_by_fd.find(active_events.at(event).data.fd); if (ctx_iter == m_ev_contexts_by_fd.end() or std::ranges::find(m_fds_to_remove, active_events.at(event).data.fd) != m_fds_to_remove.end()) { ERS_DEBUG(1, std::format("Skipping invalid event context for fd {} events {}", int{active_events.at(event).data.fd}, std::uint32_t{active_events.at(event).events})); continue; } const auto& evctx = ctx_iter->second; process_event(evctx); if ((active_events.at(event).events & EPOLLRDHUP) != 0U) { do_unregister_fd(evctx.fd, false); } if (m_fds_to_close.contains(evctx.fd) and not check_fd(evctx.fd)) { do_unregister_fd(evctx.fd, false); } } if (m_ev_should_stop.load() && (nevents < 1 || iterations_after_stop > MAX_ITERATIONS_AFTER_STOP)) { running = false; if (nevents > 0) { ERS_LOG(std::format("Stopping event loop, but still had events {} to process", nevents)); } continue; } if (m_ev_should_stop.load()) { ERS_DEBUG(1, std::format("Stopping event loop, num events {}, num iterations after stop {}", nevents, iterations_after_stop)); ++iterations_after_stop; } if (nevents == -1) { const auto errsv = errno; if (errsv == EINTR) { continue; } throw EpollWaitError(ERS_HERE, utility::error_message(errsv)); } } // end of while running close(m_epollfd); std::lock_guard lck(m_mutex); std::for_each(m_open_fds.begin(), m_open_fds.end(), [](auto& fd) { close(fd); }); m_is_running = false; reset_thread_id(); ERS_INFO("Event loop stopped"); } void netio3::EpollEventLoop::run_for(const std::uint64_t sec) { auto timer = create_timer([this](int) { stop(); }); timer.start(std::chrono::seconds{sec}); run(); remove_timer(timer); } void netio3::EpollEventLoop::stop() { do_stop(); } bool netio3::EpollEventLoop::is_running() const { return m_is_running.load(); } void netio3::EpollEventLoop::register_queued_fd() { ZoneScoped; EventContext ctx{}; while (m_register_queue.try_pop(ctx)) { do_register_fd(ctx); } } void netio3::EpollEventLoop::unregister_queued_fd() { ZoneScoped; CloseFdQueueItem item{}; while (m_unregister_queue.try_pop(item)) { if (item.wait) { m_fds_to_close.try_emplace(item.fd, item.close_fd); } else { do_unregister_fd(item.fd, item.close_fd); } } } void netio3::EpollEventLoop::do_register_fd(const EventContext& ctx, const bool track_fd) { ZoneScoped; ERS_DEBUG(2, std::format("Actually registering fd {}", ctx.fd)); m_ev_contexts_by_fd.try_emplace(ctx.fd, ctx); epoll_event ev{.events = (EPOLLIN | EPOLLRDHUP), .data = {.fd = ctx.fd}}; // Set the file descriptor to non-blocking mode. update_flags(ctx.fd, O_NONBLOCK); ERS_DEBUG(2, std::format("Adding {} to epoll {}", ctx.fd, m_epollfd)); if (epoll_ctl(m_epollfd, EPOLL_CTL_ADD, ctx.fd, &ev) != 0) { ers::error(FailedAddingFd(ERS_HERE, ctx.fd, utility::error_message(errno))); return; } if (track_fd) { std::lock_guard lck(m_mutex); m_open_fds.push_back(ctx.fd); } } void netio3::EpollEventLoop::do_unregister_fd(const int fd, const bool close_fd) { ZoneScoped; ERS_DEBUG(2, std::format("Actually unregistering fd {}", fd)); const auto found = [this, &fd] { std::lock_guard lck(m_mutex); return std::ranges::find(m_open_fds, fd) != m_open_fds.end(); }(); if (found) { const auto stat = epoll_ctl(m_epollfd, EPOLL_CTL_DEL, fd, nullptr); // EBADF means the file descriptor is already closed. This may happen and the fd is // automatically removed from epoll_ctl instance if (stat != 0 && errno != EBADF) { ERS_INFO(std::format("Error {} ({}) removing fd {} from m_epollfd", errno, utility::error_message(errno), fd)); } m_fds_to_remove.push_back(fd); const auto evctx = m_ev_contexts_by_fd.at(fd); if (evctx.on_closed_cb != nullptr) { evctx.on_closed_cb(evctx.fd); } } else { ers::warning(FailedRemovingFd(ERS_HERE, fd)); } if (close_fd) { const auto ret = close(fd); if (ret != 0) { ers::error(FailedClosingFd(ERS_HERE, fd, utility::error_message(errno))); } } } int netio3::EpollEventLoop::create_internal_signal(const std::function& cb) { const auto flags = EFD_NONBLOCK; const auto fd = eventfd(0, flags); const auto etx = EventContext{ .fd = fd, .cb = [cb](int fd_cb) { std::uint64_t buf{}; constexpr static auto NBYTES = 8; if (NBYTES != read(fd_cb, &buf, sizeof(buf))) { ERS_DEBUG(1, std::format("Did not read 8 bytes. Instead: {}", buf)); } if (cb) { cb(fd_cb); } }, }; // Do not close when event loop is closed do_register_fd(etx, false); return etx.fd; } void netio3::EpollEventLoop::fire_internal_signal(const int fd) { const auto buf = std::uint64_t{1}; const auto ret = write(fd, &buf, sizeof(buf)); constexpr static auto NBYTES = 8; if (ret != NBYTES) { ers::error(FailedFireSignal(ERS_HERE, fd, ret, utility::error_message(errno))); } } void netio3::EpollEventLoop::do_stop() { ERS_INFO("Stopping evloop"); m_ev_should_stop = true; fire_internal_signal(m_stop_fd_signal); }