Program Listing for File AsioEventLoop.cpp
↰ Return to documentation for file (EventLoop/AsioEventLoop.cpp
)
#include "netio3-backend/EventLoop/AsioEventLoop.hpp"
#include <chrono>
#include <boost/asio.hpp>
#include <unistd.h>
#include <ers/ers.h>
#include "netio3-backend/EventLoop/BaseEventLoop.hpp"
netio3::AsioEventLoop::AsioEventLoop(std::function<void()> cb_init) : m_cb_init{std::move(cb_init)}
{}
netio3::AsioEventLoop::~AsioEventLoop()
{
do_stop();
}
void netio3::AsioEventLoop::process_event(const EventContext& evc)
{
if (evc.cb != nullptr) {
evc.cb(evc.fd);
}
}
void netio3::AsioEventLoop::register_fd(const EventContext& ctx)
{
std::lock_guard lck(m_mutex);
// Create a stream_descriptor for the file descriptor.
auto streamDescriptor =
std::make_shared<boost::asio::posix::stream_descriptor>(m_ioService, ctx.fd);
// Set the file descriptor to non-blocking mode.
update_flags(ctx.fd, O_NONBLOCK);
// Store the stream_descriptor in a map.
m_streamDescriptors.try_emplace(ctx.fd, streamDescriptor);
m_ev_contexts_by_fd.try_emplace(ctx.fd, ctx);
// Start an asynchronous read operation.
start_async_read(
streamDescriptor,
[ctx] { process_event(ctx); },
[this, fd = ctx.fd](const boost::system::error_code& error) {
if (error == boost::asio::error::eof) {
remove_fd(fd);
}
ERS_LOG(std::format("Failed executing callback for {} due to {}", fd, error.message()));
});
ERS_DEBUG(2, std::format("Added file descriptor {} to Boost.Asio event loop", ctx.fd));
}
void netio3::AsioEventLoop::remove_fd(const int fd, bool /*close_fd*/, bool wait)
{
// Could use some cleanup regarding locks and call order and so on
if (wait) {
constexpr static auto TIMEOUT = std::chrono::milliseconds(100);
finish_callbacks(fd, TIMEOUT);
}
std::lock_guard lck(m_mutex);
const auto it = m_streamDescriptors.find(fd);
if (it != m_streamDescriptors.end()) {
it->second->close();
m_streamDescriptors.erase(it);
const auto evctx = m_ev_contexts_by_fd.at(fd);
if (evctx.on_closed_cb != nullptr) {
evctx.on_closed_cb(evctx.fd);
}
ERS_DEBUG(2, std::format("Removed file descriptor {} from Boost.Asio event loop", fd));
} else {
ers::warning(FailedRemovingFd(ERS_HERE, fd));
}
}
void netio3::AsioEventLoop::finish_callbacks(const int fd, std::chrono::milliseconds timeout)
{
std::unique_lock lck(m_mutex);
const auto it = m_streamDescriptors.find(fd);
if (it != m_streamDescriptors.end()) {
auto& stream_descriptor = it->second;
const auto native_fd = stream_descriptor->native_handle();
lck.unlock();
wait_for_fd(native_fd, timeout, [this, &fd]() {
process_event(m_ev_contexts_by_fd.at(fd));
});
}
}
void netio3::AsioEventLoop::run()
{
{
std::lock_guard lck(m_mutex);
set_thread_id(std::this_thread::get_id());
if (m_cb_init) {
m_cb_init();
}
m_work = std::make_unique<boost::asio::io_service::work>(m_ioService);
}
m_is_running = true;
m_ioService.run();
m_is_running = false;
ERS_INFO("Event loop stopped");
}
void netio3::AsioEventLoop::run_for(const std::uint64_t sec)
{
auto timer = create_timer([this](int) { stop(); });
timer.start(std::chrono::seconds{sec});
run();
}
void netio3::AsioEventLoop::stop()
{
std::lock_guard lck(m_mutex);
do_stop();
}
bool netio3::AsioEventLoop::is_running() const {
return m_is_running.load();
}
void netio3::AsioEventLoop::start_async_read(
const std::shared_ptr<boost::asio::posix::stream_descriptor>& stream_descriptor,
const std::function<void()>& success_handler,
const std::function<void(const boost::system::error_code&)>& error_handler) const
{
std::lock_guard lck(m_mutex);
if (not stream_descriptor->is_open()) {
return;
}
stream_descriptor->async_read_some(
boost::asio::null_buffers(),
[this, success_handler, error_handler, stream_descriptor](
const boost::system::error_code& error, std::size_t /*bytes_transferred*/) {
if (!error) {
success_handler();
} else {
error_handler(error);
}
start_async_read(stream_descriptor, success_handler, error_handler);
});
}
void netio3::AsioEventLoop::do_stop()
{
ERS_INFO("Stopping evloop");
m_work.reset();
m_ioService.stop();
reset_thread_id();
}