Program Listing for File Server.cpp
↰ Return to documentation for file (BackendAsyncmsg/Server.cpp
)
#include "Server.hpp"
#include "Session.hpp"
#include "netio3-backend/Issues.hpp"
netio3::asyncmsg::Server::Server(boost::asio::io_service& ioService,
NetworkConfig config,
std::function<void(const EndPointAddress&)> onShutdownCb,
std::function<void(const ConnectionEvent&)> addEventCb,
BaseEventLoop* eventloop,
const bool useAsioEventLoop,
const Mode mode) :
felix::asyncmsg::Server(ioService),
m_ioService{ioService},
m_config{std::move(config)},
m_onShutdownCb{std::move(onShutdownCb)},
m_addEventCb{std::move(addEventCb)},
m_eventloop{std::move(eventloop)},
m_useAsioEventLoop{useAsioEventLoop},
m_mode{mode}
{}
void netio3::asyncmsg::Server::startAccept()
{
std::lock_guard lock(m_mutex);
asyncAccept(std::make_shared<Session>(
m_ioService,
m_config,
m_eventloop,
[this, weakSelf = weak_from_this()](const std::shared_ptr<felix::asyncmsg::Session>& session) {
if (auto self = weakSelf.lock()) {
closeSession(session);
}
},
[this](const ConnectionEvent& event) { m_addEventCb(event); },
m_useAsioEventLoop,
m_mode));
}
void netio3::asyncmsg::Server::onAccept(std::shared_ptr<felix::asyncmsg::Session> session)
{
{
std::lock_guard lock(m_mutex);
ERS_INFO(std::format("Accepted connection from {}:{}",
session->remoteEndpoint().address().to_string(),
session->remoteEndpoint().port()));
m_sessions.emplace(session);
}
startAccept();
}
void netio3::asyncmsg::Server::onAcceptError(const boost::system::error_code& error,
std::shared_ptr<felix::asyncmsg::Session> session)
{
{
std::lock_guard lock(m_mutex);
ers::error(FailedOpenReceiveEndpoint(ERS_HERE,
session->remoteEndpoint().address().to_string(),
session->remoteEndpoint().port(),
error.message()));
const auto endpoint = EndPointAddress(session->remoteEndpoint().address().to_string(),
session->remoteEndpoint().port());
if (not m_useAsioEventLoop and m_mode == Mode::DELEGATE) {
m_addEventCb({ConnectionEvent::Type::REFUSED, endpoint});
} else if (m_config.on_connection_refused_cb) {
m_config.on_connection_refused_cb(endpoint);
}
}
startAccept();
}
void netio3::asyncmsg::Server::shutdown()
{
std::lock_guard lock(m_mutex);
m_shuttingDown = true;
for (const auto& session : m_sessions) {
session->asyncClose();
}
try {
close();
} catch (const boost::system::system_error& e) {
throw FailedCloseListenEndpoint(
ERS_HERE, localEndpoint().address().to_string(), localEndpoint().port(), e.what(), e);
}
}
void netio3::asyncmsg::Server::closeSession(std::shared_ptr<felix::asyncmsg::Session> session)
{
std::unique_lock lock(m_mutex);
m_sessions.erase(session);
if (m_shuttingDown and m_sessions.empty()) {
if (m_onShutdownCb) {
lock.unlock();
m_onShutdownCb({localEndpoint().address().to_string(), localEndpoint().port()});
}
}
}