Program Listing for File BackendAsyncmsg.hpp

Return to documentation for file (BackendAsyncmsg/BackendAsyncmsg.hpp)

#ifndef NETIO3BACKEND_BACKENDASYNCMSG_BACKENDASYNCMSG_HPP
#define NETIO3BACKEND_BACKENDASYNCMSG_BACKENDASYNCMSG_HPP

#include <cstdint>
#include <map>
#include <memory>
#include <span>
#include <string_view>
#include <thread>

#include <boost/functional/hash.hpp>

#include <tbb/concurrent_queue.h>
#include <tbb/concurrent_hash_map.h>

#include <asyncmsg/Session.h>

#include "Server.hpp"
#include "Session.hpp"
#include "Types.hpp"
#include "netio3-backend/Netio3Backend.hpp"

using namespace std::chrono_literals;

namespace netio3::asyncmsg {
  class BackendAsyncmsg : public NetworkBackend
  {
  public:
    BackendAsyncmsg(const NetworkConfig& config, std::shared_ptr<BaseEventLoop> evloop);

    ~BackendAsyncmsg() override;
    BackendAsyncmsg(const BackendAsyncmsg&) = delete;
    BackendAsyncmsg(BackendAsyncmsg&&) = delete;
    BackendAsyncmsg& operator=(const BackendAsyncmsg&) = delete;
    BackendAsyncmsg& operator=(BackendAsyncmsg&&) = delete;

    void open_send_endpoint(const EndPointAddress& address,
                            const ConnectionParameters& connection_params) override;

    [[nodiscard]] EndPointAddress open_listen_endpoint(
      const EndPointAddress& address,
      const ConnectionParametersRecv& connection_params) override;

    void close_send_endpoint(const EndPointAddress& address) override;

    void close_listen_endpoint(const EndPointAddress& address) override;

    [[nodiscard]] NetioStatus send_data(const EndPointAddress& address,
                                        std::span<std::uint8_t> data,
                                        std::span<const std::uint8_t> header_data,
                                        std::uint64_t key) override;

    [[nodiscard]] NetioStatus send_data(const EndPointAddress& address,
                                        std::span<const iovec> iov,
                                        std::span<const std::uint8_t> header_data,
                                        std::uint64_t key) override;

    virtual NetioStatus send_data_copy(const EndPointAddress& address,
                                        std::span<const std::uint8_t> data,
                                        std::span<const std::uint8_t> header_data,
                                        std::uint64_t key) override;

    virtual NetioStatus send_data_copy(const EndPointAddress& address,
                                        std::span<const iovec> iov,
                                        std::span<const std::uint8_t> header_data,
                                        std::uint64_t key) override;

    [[nodiscard]] NetworkBuffer* get_buffer(const EndPointAddress& address) override;

    [[nodiscard]] NetioStatus send_buffer(const EndPointAddress& address,
                                          NetworkBuffer* buffer) override;

    [[nodiscard]] std::size_t get_num_available_buffers(const EndPointAddress& address) override;

  private:
    boost::asio::io_service& initIoService();

    void onServerShutdown(const EndPointAddress& address);

    void addEvent(const ConnectionEvent& event);

    void handleQueuedEvent();

    void start();

    void stop();

    void closeSession(std::shared_ptr<felix::asyncmsg::Session> session);

    void poll();

    [[nodiscard]] static boost::asio::ip::tcp::endpoint getEndpoint(std::string_view address,
                                                                    unsigned short port);

    struct EndpointHash {
      [[nodiscard]] static std::size_t hash(const boost::asio::ip::tcp::endpoint& endpoint)
      {
        std::size_t seed = 0;
        boost::hash_combine(seed, endpoint.address().to_string());
        boost::hash_combine(seed, endpoint.port());
        return seed;
      }

      [[nodiscard]] static bool equal(const boost::asio::ip::tcp::endpoint& lhs,
                                      const boost::asio::ip::tcp::endpoint& rhs)
      {
        return lhs == rhs;
      }
    };

    using SenderMap = tbb::
      concurrent_hash_map<boost::asio::ip::tcp::endpoint, std::shared_ptr<Session>, EndpointHash>;

    mutable std::mutex m_mutex{};
    boost::asio::io_service m_ioServiceResource{};
    std::reference_wrapper<boost::asio::io_service> m_ioService;
    std::map<boost::asio::ip::tcp::endpoint, std::shared_ptr<Server>> m_serversReceive{};
    SenderMap m_sessionsSend{};
    EventTimerHandle m_timer{m_evloop->create_timer([this](int) { poll(); })};
    std::unique_ptr<boost::asio::io_service::work> m_work{};
    std::jthread m_ioServiceThread{};
    tbb::concurrent_queue<ConnectionEvent> m_eventQueue;
    EventSignalHandle m_eventSignal;
    Mode m_mode{Mode::DELEGATE};
    bool m_useAsioEventLoop{};
    constexpr static std::string_view MYSELF = "FELIX";
    constexpr static std::string_view IP_ADDRESS_SEND = "0.0.0.0";
    constexpr static unsigned short PORT_SEND = 5555;
    constexpr static auto TIMER_INTERVAL = 100ms;
    constexpr static int MAX_POLLS = 10;
  };
}  // namespace netio3::asyncmsg

#endif  // NETIO3BACKEND_BACKENDASYNCMSG_BACKENDASYNCMSG_HPP