Program Listing for File BackendAsyncmsg.hpp
↰ Return to documentation for file (BackendAsyncmsg/BackendAsyncmsg.hpp
)
#ifndef NETIO3BACKEND_BACKENDASYNCMSG_BACKENDASYNCMSG_HPP
#define NETIO3BACKEND_BACKENDASYNCMSG_BACKENDASYNCMSG_HPP
#include <cstdint>
#include <map>
#include <memory>
#include <span>
#include <string_view>
#include <thread>
#include <boost/functional/hash.hpp>
#include <tbb/concurrent_queue.h>
#include <tbb/concurrent_hash_map.h>
#include <asyncmsg/Session.h>
#include "Server.hpp"
#include "Session.hpp"
#include "Types.hpp"
#include "netio3-backend/Netio3Backend.hpp"
using namespace std::chrono_literals;
namespace netio3::asyncmsg {
class BackendAsyncmsg : public NetworkBackend
{
public:
BackendAsyncmsg(const NetworkConfig& config, std::shared_ptr<BaseEventLoop> evloop);
~BackendAsyncmsg() override;
BackendAsyncmsg(const BackendAsyncmsg&) = delete;
BackendAsyncmsg(BackendAsyncmsg&&) = delete;
BackendAsyncmsg& operator=(const BackendAsyncmsg&) = delete;
BackendAsyncmsg& operator=(BackendAsyncmsg&&) = delete;
void open_send_endpoint(const EndPointAddress& address,
const ConnectionParameters& connection_params) override;
[[nodiscard]] EndPointAddress open_listen_endpoint(
const EndPointAddress& address,
const ConnectionParametersRecv& connection_params) override;
void close_send_endpoint(const EndPointAddress& address) override;
void close_listen_endpoint(const EndPointAddress& address) override;
[[nodiscard]] NetioStatus send_data(const EndPointAddress& address,
std::span<std::uint8_t> data,
std::span<const std::uint8_t> header_data,
std::uint64_t key) override;
[[nodiscard]] NetioStatus send_data(const EndPointAddress& address,
std::span<const iovec> iov,
std::span<const std::uint8_t> header_data,
std::uint64_t key) override;
virtual NetioStatus send_data_copy(const EndPointAddress& address,
std::span<const std::uint8_t> data,
std::span<const std::uint8_t> header_data,
std::uint64_t key) override;
virtual NetioStatus send_data_copy(const EndPointAddress& address,
std::span<const iovec> iov,
std::span<const std::uint8_t> header_data,
std::uint64_t key) override;
[[nodiscard]] NetworkBuffer* get_buffer(const EndPointAddress& address) override;
[[nodiscard]] NetioStatus send_buffer(const EndPointAddress& address,
NetworkBuffer* buffer) override;
[[nodiscard]] std::size_t get_num_available_buffers(const EndPointAddress& address) override;
private:
boost::asio::io_service& initIoService();
void onServerShutdown(const EndPointAddress& address);
void addEvent(const ConnectionEvent& event);
void handleQueuedEvent();
void start();
void stop();
void closeSession(std::shared_ptr<felix::asyncmsg::Session> session);
void poll();
[[nodiscard]] static boost::asio::ip::tcp::endpoint getEndpoint(std::string_view address,
unsigned short port);
struct EndpointHash {
[[nodiscard]] static std::size_t hash(const boost::asio::ip::tcp::endpoint& endpoint)
{
std::size_t seed = 0;
boost::hash_combine(seed, endpoint.address().to_string());
boost::hash_combine(seed, endpoint.port());
return seed;
}
[[nodiscard]] static bool equal(const boost::asio::ip::tcp::endpoint& lhs,
const boost::asio::ip::tcp::endpoint& rhs)
{
return lhs == rhs;
}
};
using SenderMap = tbb::
concurrent_hash_map<boost::asio::ip::tcp::endpoint, std::shared_ptr<Session>, EndpointHash>;
mutable std::mutex m_mutex{};
boost::asio::io_service m_ioServiceResource{};
std::reference_wrapper<boost::asio::io_service> m_ioService;
std::map<boost::asio::ip::tcp::endpoint, std::shared_ptr<Server>> m_serversReceive{};
SenderMap m_sessionsSend{};
EventTimerHandle m_timer{m_evloop->create_timer([this](int) { poll(); })};
std::unique_ptr<boost::asio::io_service::work> m_work{};
std::jthread m_ioServiceThread{};
tbb::concurrent_queue<ConnectionEvent> m_eventQueue;
EventSignalHandle m_eventSignal;
Mode m_mode{Mode::DELEGATE};
bool m_useAsioEventLoop{};
constexpr static std::string_view MYSELF = "FELIX";
constexpr static std::string_view IP_ADDRESS_SEND = "0.0.0.0";
constexpr static unsigned short PORT_SEND = 5555;
constexpr static auto TIMER_INTERVAL = 100ms;
constexpr static int MAX_POLLS = 10;
};
} // namespace netio3::asyncmsg
#endif // NETIO3BACKEND_BACKENDASYNCMSG_BACKENDASYNCMSG_HPP