Program Listing for File Session.hpp

Return to documentation for file (BackendAsyncmsg/Session.hpp)

#ifndef NETIO3BACKEND_BACKENDASYNCMSG_SESSION_HPP
#define NETIO3BACKEND_BACKENDASYNCMSG_SESSION_HPP

#include <cstdint>
#include <functional>
#include <memory>

#include <boost/asio/io_service.hpp>

#include <asyncmsg/Session.h>

#include "Buffer.hpp"
#include "Types.hpp"
#include "ReceiveMessage.hpp"
#include "SendMessageUnbuffered.hpp"

namespace netio3::asyncmsg {
  class Session : public felix::asyncmsg::Session
  {
  public:
    Session(boost::asio::io_service& ioService,
            NetworkConfig config,
            BaseEventLoop* eventloop,
            std::function<void(std::shared_ptr<felix::asyncmsg::Session>)> connectionClosedCb,
            std::function<void(const ConnectionEvent&)> addEventCb,
            bool useAsioEventLoop,
            Mode mode,
            const ConnectionParameters& connectionParams = {});

    void sendData(std::unique_ptr<SendMessageUnbuffered> message);

    void onOpen() override;

    void onOpenError(const boost::system::error_code& error) override;

    [[nodiscard]] std::unique_ptr<felix::asyncmsg::InputMessage>
    createMessage(std::uint32_t typeId, std::uint32_t transactionId, std::uint32_t size) override;

    void onReceive(std::unique_ptr<felix::asyncmsg::InputMessage> message) override;

    void onReceiveError(const boost::system::error_code& error,
                        std::unique_ptr<felix::asyncmsg::InputMessage> message) override;

    void onSend(std::unique_ptr<const felix::asyncmsg::OutputMessage> message) override;

    void onSendError(const boost::system::error_code& error,
                     std::unique_ptr<const felix::asyncmsg::OutputMessage> message) override;

    void onClose() override;

    void onCloseError(const boost::system::error_code& error) override;

    [[nodiscard]] NetworkBuffer* getBuffer();

    [[nodiscard]] std::size_t getNumAvailableBuffers();

    [[nodiscard]] std::vector<std::uint64_t> getPendingSends() const;
  private:
    void handleOnReceive(const ReceiveMessage::Message& message) const;

    void handleOnSend(std::unique_ptr<const felix::asyncmsg::OutputMessage> message);

    void trigger_on_connection_established_cb();

    void trigger_on_connection_refused_cb();

    void trigger_on_connection_closed_cb();

    void trigger_cb(ConnectionEvent::Type eventType);

    void checkAndReturnBuffer(const felix::asyncmsg::OutputMessage* message);

    NetworkConfig m_config{};
    BufferManager m_bufferManager;
    std::vector<std::uint64_t> m_pendingSends{};
    tbb::concurrent_queue<ReceiveMessage::Message> m_messageQueue;
    EventSignalHandle m_messageSignal;
    std::function<void(std::shared_ptr<felix::asyncmsg::Session>)> m_connectionClosedCb{};
    std::function<void(const ConnectionEvent&)> m_addEventCb{};
    bool m_useAsioEventLoop{};
    Mode m_mode{};
    int m_numCloseAttempts{};
    mutable std::mutex m_mutex_pending_sends;
    constexpr static int MAX_CLOSE_ATTEMPTS{5};
  };
}  // namespace netio3::asyncmsg

#endif  // NETIO3BACKEND_BACKENDASYNCMSG_SESSION_HPP