Program Listing for File EpollEventLoop.cpp
↰ Return to documentation for file (EventLoop/EpollEventLoop.cpp
)
#include "netio3-backend/EventLoop/EpollEventLoop.hpp"
#include <chrono>
#include <mutex>
#include <tracy/Tracy.hpp>
#include "netio3-backend/EventLoop/BaseEventLoop.hpp"
#include "netio3-backend/Issues.hpp"
#include "netio3-backend/EventLoop/Utility.hpp"
netio3::EpollEventLoop::EpollEventLoop(std::function<void()> cb_init) :
m_epollfd{epoll_create1(0)},
m_cb_init{std::move(cb_init)},
m_register_fd_signal{create_internal_signal([this](int) { register_queued_fd(); })},
m_unregister_fd_signal{create_internal_signal([this](int) { unregister_queued_fd(); })},
m_stop_fd_signal{create_internal_signal([](int) {})}
{}
netio3::EpollEventLoop::~EpollEventLoop()
{
do_stop();
close(m_register_fd_signal);
close(m_unregister_fd_signal);
close(m_stop_fd_signal);
}
void netio3::EpollEventLoop::process_event(const EventContext& evc)
{
if (evc.cb != nullptr) {
evc.cb(evc.fd);
}
}
void netio3::EpollEventLoop::register_fd(const EventContext& ctx)
{
ZoneScoped;
ERS_DEBUG(2, std::format("Register fd {}", ctx.fd));
if (get_thread_id() == std::this_thread::get_id()) {
do_register_fd(ctx);
} else {
m_register_queue.emplace(ctx);
fire_internal_signal(m_register_fd_signal);
}
}
void netio3::EpollEventLoop::remove_fd(const int fd, bool close_fd, bool wait)
{
ZoneScoped;
ERS_DEBUG(2, std::format("Removing fd {}", fd));
if (get_thread_id() == std::this_thread::get_id()) {
if (wait) {
m_fds_to_close.try_emplace(fd, close_fd);
} else {
do_unregister_fd(fd, close_fd);
}
} else {
m_unregister_queue.emplace(fd, close_fd, wait);
fire_internal_signal(m_unregister_fd_signal);
}
}
std::ostream& operator <<(std::ostream& stream, const netio3::EventContext& val) {
return stream << "{" << val.fd << "}";
}
void netio3::EpollEventLoop::run()
{
set_thread_id(std::this_thread::get_id());
if (m_cb_init) {
m_cb_init();
}
constexpr static auto MAX_ITERATIONS_AFTER_STOP{5};
auto iterations_after_stop{0};
bool running = true;
m_is_running = true;
while (running) {
// don't want to block or wait too long if we're shutting down
const auto timeout = m_ev_should_stop ? MIN_POLL_TIMEOUT : EPOLL_TIMEOUT;
std::array<struct epoll_event, MAX_EPOLL_EVENTS> active_events{};
int nevents = epoll_wait(m_epollfd, active_events.data(), MAX_EPOLL_EVENTS, timeout);
TracyPlot("Number of Events", static_cast<std::int64_t>(nevents));
ZoneScoped;
ERS_DEBUG(3, std::format("{} events to process", nevents));
for (const auto fd : m_fds_to_remove) {
m_ev_contexts_by_fd.erase(fd);
std::lock_guard lck(m_mutex);
std::erase(m_open_fds, fd);
}
m_fds_to_remove.clear();
for (int event = 0; event < nevents; ++event) {
ZoneScoped;
const auto ctx_iter = m_ev_contexts_by_fd.find(active_events.at(event).data.fd);
if (ctx_iter == m_ev_contexts_by_fd.end() or std::ranges::find(m_fds_to_remove, active_events.at(event).data.fd) != m_fds_to_remove.end()) {
ERS_DEBUG(1,
std::format("Skipping invalid event context for fd {} events {}",
int{active_events.at(event).data.fd},
std::uint32_t{active_events.at(event).events}));
continue;
}
const auto& evctx = ctx_iter->second;
process_event(evctx);
if ((active_events.at(event).events & EPOLLRDHUP) != 0U) {
do_unregister_fd(evctx.fd, false);
}
if (m_fds_to_close.contains(evctx.fd) and not check_fd(evctx.fd)) {
do_unregister_fd(evctx.fd, false);
}
}
if (m_ev_should_stop.load() && (nevents < 1 || iterations_after_stop > MAX_ITERATIONS_AFTER_STOP)) {
running = false;
if (nevents > 0) {
ERS_LOG(std::format("Stopping event loop, but still had events {} to process", nevents));
}
continue;
}
if (m_ev_should_stop.load()) {
ERS_DEBUG(1, std::format("Stopping event loop, num events {}, num iterations after stop {}", nevents, iterations_after_stop));
++iterations_after_stop;
}
if (nevents == -1) {
const auto errsv = errno;
if (errsv == EINTR) {
continue;
}
throw EpollWaitError(ERS_HERE, utility::error_message(errsv));
}
} // end of while running
close(m_epollfd);
std::lock_guard lck(m_mutex);
std::for_each(m_open_fds.begin(), m_open_fds.end(), [](auto& fd) { close(fd); });
m_is_running = false;
reset_thread_id();
ERS_INFO("Event loop stopped");
}
void netio3::EpollEventLoop::run_for(const std::uint64_t sec)
{
auto timer = create_timer([this](int) { stop(); });
timer.start(std::chrono::seconds{sec});
run();
remove_timer(timer);
}
void netio3::EpollEventLoop::stop()
{
do_stop();
}
bool netio3::EpollEventLoop::is_running() const {
return m_is_running.load();
}
void netio3::EpollEventLoop::register_queued_fd() {
ZoneScoped;
EventContext ctx{};
while (m_register_queue.try_pop(ctx)) {
do_register_fd(ctx);
}
}
void netio3::EpollEventLoop::unregister_queued_fd() {
ZoneScoped;
CloseFdQueueItem item{};
while (m_unregister_queue.try_pop(item)) {
if (item.wait) {
m_fds_to_close.try_emplace(item.fd, item.close_fd);
} else {
do_unregister_fd(item.fd, item.close_fd);
}
}
}
void netio3::EpollEventLoop::do_register_fd(const EventContext& ctx, const bool track_fd) {
ZoneScoped;
ERS_DEBUG(2, std::format("Actually registering fd {}", ctx.fd));
m_ev_contexts_by_fd.try_emplace(ctx.fd, ctx);
epoll_event ev{.events = (EPOLLIN | EPOLLRDHUP), .data = {.fd = ctx.fd}};
// Set the file descriptor to non-blocking mode.
update_flags(ctx.fd, O_NONBLOCK);
ERS_DEBUG(2, std::format("Adding {} to epoll {}", ctx.fd, m_epollfd));
if (epoll_ctl(m_epollfd, EPOLL_CTL_ADD, ctx.fd, &ev) != 0) {
ers::error(FailedAddingFd(ERS_HERE, ctx.fd, utility::error_message(errno)));
return;
}
if (track_fd) {
std::lock_guard lck(m_mutex);
m_open_fds.push_back(ctx.fd);
}
}
void netio3::EpollEventLoop::do_unregister_fd(const int fd, const bool close_fd) {
ZoneScoped;
ERS_DEBUG(2, std::format("Actually unregistering fd {}", fd));
const auto found = [this, &fd] {
std::lock_guard lck(m_mutex);
return std::ranges::find(m_open_fds, fd) != m_open_fds.end();
}();
if (found) {
const auto stat = epoll_ctl(m_epollfd, EPOLL_CTL_DEL, fd, nullptr);
// EBADF means the file descriptor is already closed. This may happen and the fd is
// automatically removed from epoll_ctl instance
if (stat != 0 && errno != EBADF) {
ERS_INFO(std::format("Error {} ({}) removing fd {} from m_epollfd", errno, utility::error_message(errno), fd));
}
m_fds_to_remove.push_back(fd);
const auto evctx = m_ev_contexts_by_fd.at(fd);
if (evctx.on_closed_cb != nullptr) {
evctx.on_closed_cb(evctx.fd);
}
} else {
ers::warning(FailedRemovingFd(ERS_HERE, fd));
}
if (close_fd) {
const auto ret = close(fd);
if (ret != 0) {
ers::error(FailedClosingFd(ERS_HERE, fd, utility::error_message(errno)));
}
}
}
int netio3::EpollEventLoop::create_internal_signal(const std::function<void(int)>& cb)
{
const auto flags = EFD_NONBLOCK;
const auto fd = eventfd(0, flags);
const auto etx = EventContext{
.fd = fd,
.cb =
[cb](int fd_cb) {
std::uint64_t buf{};
constexpr static auto NBYTES = 8;
if (NBYTES != read(fd_cb, &buf, sizeof(buf))) {
ERS_DEBUG(1, std::format("Did not read 8 bytes. Instead: {}", buf));
}
if (cb) {
cb(fd_cb);
}
},
};
// Do not close when event loop is closed
do_register_fd(etx, false);
return etx.fd;
}
void netio3::EpollEventLoop::fire_internal_signal(const int fd)
{
const auto buf = std::uint64_t{1};
const auto ret = write(fd, &buf, sizeof(buf));
constexpr static auto NBYTES = 8;
if (ret != NBYTES) {
ers::error(FailedFireSignal(ERS_HERE, fd, ret, utility::error_message(errno)));
}
}
void netio3::EpollEventLoop::do_stop()
{
ERS_INFO("Stopping evloop");
m_ev_should_stop = true;
fire_internal_signal(m_stop_fd_signal);
}