Program Listing for File Session.hpp
↰ Return to documentation for file (BackendAsyncmsg/Session.hpp
)
#ifndef NETIO3BACKEND_BACKENDASYNCMSG_SESSION_HPP
#define NETIO3BACKEND_BACKENDASYNCMSG_SESSION_HPP
#include <cstdint>
#include <functional>
#include <memory>
#include <boost/asio/io_service.hpp>
#include <asyncmsg/Session.h>
#include "Buffer.hpp"
#include "Types.hpp"
#include "ReceiveMessage.hpp"
#include "SendMessageUnbuffered.hpp"
namespace netio3::asyncmsg {
class Session : public felix::asyncmsg::Session
{
public:
Session(boost::asio::io_service& ioService,
NetworkConfig config,
BaseEventLoop* eventloop,
std::function<void(std::shared_ptr<felix::asyncmsg::Session>)> connectionClosedCb,
std::function<void(const ConnectionEvent&)> addEventCb,
bool useAsioEventLoop,
Mode mode,
const ConnectionParameters& connectionParams = {});
void sendData(std::unique_ptr<SendMessageUnbuffered> message);
void onOpen() override;
void onOpenError(const boost::system::error_code& error) override;
[[nodiscard]] std::unique_ptr<felix::asyncmsg::InputMessage>
createMessage(std::uint32_t typeId, std::uint32_t transactionId, std::uint32_t size) override;
void onReceive(std::unique_ptr<felix::asyncmsg::InputMessage> message) override;
void onReceiveError(const boost::system::error_code& error,
std::unique_ptr<felix::asyncmsg::InputMessage> message) override;
void onSend(std::unique_ptr<const felix::asyncmsg::OutputMessage> message) override;
void onSendError(const boost::system::error_code& error,
std::unique_ptr<const felix::asyncmsg::OutputMessage> message) override;
void onClose() override;
void onCloseError(const boost::system::error_code& error) override;
[[nodiscard]] NetworkBuffer* getBuffer();
[[nodiscard]] std::size_t getNumAvailableBuffers();
[[nodiscard]] std::vector<std::uint64_t> getPendingSends() const;
private:
void handleOnReceive(const ReceiveMessage::Message& message) const;
void handleOnSend(std::unique_ptr<const felix::asyncmsg::OutputMessage> message);
void trigger_on_connection_established_cb();
void trigger_on_connection_refused_cb();
void trigger_on_connection_closed_cb();
void trigger_cb(ConnectionEvent::Type eventType);
void checkAndReturnBuffer(const felix::asyncmsg::OutputMessage* message);
NetworkConfig m_config{};
BufferManager m_bufferManager;
std::vector<std::uint64_t> m_pendingSends{};
tbb::concurrent_queue<ReceiveMessage::Message> m_messageQueue;
EventSignalHandle m_messageSignal;
std::function<void(std::shared_ptr<felix::asyncmsg::Session>)> m_connectionClosedCb{};
std::function<void(const ConnectionEvent&)> m_addEventCb{};
bool m_useAsioEventLoop{};
Mode m_mode{};
int m_numCloseAttempts{};
mutable std::mutex m_mutex_pending_sends;
constexpr static int MAX_CLOSE_ATTEMPTS{5};
};
} // namespace netio3::asyncmsg
#endif // NETIO3BACKEND_BACKENDASYNCMSG_SESSION_HPP