.. _program_listing_file_EventLoop_AsioEventLoop.cpp: Program Listing for File AsioEventLoop.cpp ========================================== |exhale_lsh| :ref:`Return to documentation for file ` (``EventLoop/AsioEventLoop.cpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #include "netio3-backend/EventLoop/AsioEventLoop.hpp" #include #include #include #include #include "netio3-backend/EventLoop/BaseEventLoop.hpp" netio3::AsioEventLoop::AsioEventLoop(std::function cb_init) : m_cb_init{std::move(cb_init)} {} netio3::AsioEventLoop::~AsioEventLoop() { do_stop(); } void netio3::AsioEventLoop::process_event(const EventContext& evc) { if (evc.cb != nullptr) { evc.cb(evc.fd); } } void netio3::AsioEventLoop::register_fd(const EventContext& ctx) { std::lock_guard lck(m_mutex); // Create a stream_descriptor for the file descriptor. auto streamDescriptor = std::make_shared(m_ioService, ctx.fd); // Set the file descriptor to non-blocking mode. update_flags(ctx.fd, O_NONBLOCK); // Store the stream_descriptor in a map. m_streamDescriptors.try_emplace(ctx.fd, streamDescriptor); m_ev_contexts_by_fd.try_emplace(ctx.fd, ctx); // Start an asynchronous read operation. start_async_read( streamDescriptor, [ctx] { process_event(ctx); }, [this, fd = ctx.fd](const boost::system::error_code& error) { if (error == boost::asio::error::eof) { remove_fd(fd); } ERS_LOG(std::format("Failed executing callback for {} due to {}", fd, error.message())); }); ERS_DEBUG(2, std::format("Added file descriptor {} to Boost.Asio event loop", ctx.fd)); } void netio3::AsioEventLoop::remove_fd(const int fd, bool /*close_fd*/, bool wait) { // Could use some cleanup regarding locks and call order and so on if (wait) { constexpr static auto TIMEOUT = std::chrono::milliseconds(100); finish_callbacks(fd, TIMEOUT); } std::lock_guard lck(m_mutex); const auto it = m_streamDescriptors.find(fd); if (it != m_streamDescriptors.end()) { it->second->close(); m_streamDescriptors.erase(it); const auto evctx = m_ev_contexts_by_fd.at(fd); if (evctx.on_closed_cb != nullptr) { evctx.on_closed_cb(evctx.fd); } ERS_DEBUG(2, std::format("Removed file descriptor {} from Boost.Asio event loop", fd)); } else { ers::warning(FailedRemovingFd(ERS_HERE, fd)); } } void netio3::AsioEventLoop::finish_callbacks(const int fd, std::chrono::milliseconds timeout) { std::unique_lock lck(m_mutex); const auto it = m_streamDescriptors.find(fd); if (it != m_streamDescriptors.end()) { auto& stream_descriptor = it->second; const auto native_fd = stream_descriptor->native_handle(); lck.unlock(); wait_for_fd(native_fd, timeout, [this, &fd]() { process_event(m_ev_contexts_by_fd.at(fd)); }); } } void netio3::AsioEventLoop::run() { { std::lock_guard lck(m_mutex); set_thread_id(std::this_thread::get_id()); if (m_cb_init) { m_cb_init(); } m_work = std::make_unique(m_ioService); } m_is_running = true; m_ioService.run(); m_is_running = false; ERS_INFO("Event loop stopped"); } void netio3::AsioEventLoop::run_for(const std::uint64_t sec) { auto timer = create_timer([this](int) { stop(); }); timer.start(std::chrono::seconds{sec}); run(); } void netio3::AsioEventLoop::stop() { std::lock_guard lck(m_mutex); do_stop(); } bool netio3::AsioEventLoop::is_running() const { return m_is_running.load(); } void netio3::AsioEventLoop::start_async_read( const std::shared_ptr& stream_descriptor, const std::function& success_handler, const std::function& error_handler) const { std::lock_guard lck(m_mutex); if (not stream_descriptor->is_open()) { return; } stream_descriptor->async_read_some( boost::asio::null_buffers(), [this, success_handler, error_handler, stream_descriptor]( const boost::system::error_code& error, std::size_t /*bytes_transferred*/) { if (!error) { success_handler(); } else { error_handler(error); } start_async_read(stream_descriptor, success_handler, error_handler); }); } void netio3::AsioEventLoop::do_stop() { ERS_INFO("Stopping evloop"); m_work.reset(); m_ioService.stop(); reset_thread_id(); }