.. _program_listing_file_BackendLibfabric_Buffer.hpp: Program Listing for File Buffer.hpp =================================== |exhale_lsh| :ref:`Return to documentation for file ` (``BackendLibfabric/Buffer.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #ifndef NETIO3BACKEND_BACKENDLIBFABRIC_BUFFER_HPP #define NETIO3BACKEND_BACKENDLIBFABRIC_BUFFER_HPP #include #include #include #include #include #include "netio3-backend/Netio3Backend.hpp" namespace netio3::libfabric { class DomainContext; class Buffer : public NetworkBuffer { public: Buffer(DomainContext& domain, const std::size_t size, const std::uint64_t key) : NetworkBuffer(size), m_domain{domain}, m_key{key} {} Buffer(DomainContext& domain, size_t size, std::uint8_t* buf) : NetworkBuffer({buf, size}), m_domain{domain} {} [[nodiscard]] const std::uint8_t* get_buffer() { return data().data(); } [[nodiscard]] std::size_t get_size() const { return size(); } [[nodiscard]] DomainContext& get_domain() { return m_domain; } [[nodiscard]] std::uint64_t get_key() const { return m_key; } // Set by libfabric not the constructor fid_mr* mr{nullptr}; private: std::reference_wrapper m_domain; std::uint64_t m_key{}; }; using Header = std::array; class HeaderBuffer { public: struct HeaderWrapper { NetioStatus status{NetioStatus::OK}; std::size_t bufnum{}; iovec data{}; }; explicit HeaderBuffer(DomainContext& domain) : m_domain{domain}, m_available_header_buffers{init_available_header_buffers()}, m_keys(ZERO_COPY_NUM_HEADER_SLOTS) {} [[nodiscard]] HeaderWrapper get_header(const std::span& data, const std::uint64_t key) { if (data.size() > ZERO_COPY_SIZE_HEADER) { return {.status = NetioStatus::FAILED}; } std::size_t idx{}; if (not m_available_header_buffers.try_pop(idx)) { return {.status = NetioStatus::NO_RESOURCES}; } std::ranges::copy(data, m_header_buffer.at(idx).begin()); m_keys.at(idx) = key; m_min_num_available_buffers.store( std::min(m_num_available_buffers.fetch_sub(1, std::memory_order_relaxed) - 1, m_min_num_available_buffers.load(std::memory_order_relaxed)), std::memory_order_relaxed); return {.bufnum = idx, .data = {&m_header_buffer.at(idx), data.size()}}; } [[nodiscard]] std::uint64_t return_header(const std::size_t idx) { m_available_header_buffers.push(idx); m_num_available_buffers.fetch_add(1, std::memory_order_relaxed); return m_keys.at(idx); } [[nodiscard]] char* get_buffer() { return reinterpret_cast(m_header_buffer.data()); } [[nodiscard]] std::size_t get_size() const { return sizeof(m_header_buffer); } [[nodiscard]] DomainContext& get_domain() { return m_domain; } [[nodiscard]] std::size_t get_num_available_buffers() { return m_min_num_available_buffers.exchange( m_num_available_buffers.load(std::memory_order_relaxed), std::memory_order_relaxed); }; [[nodiscard]] std::vector get_pending_sends(); private: [[nodiscard]] static tbb::concurrent_queue init_available_header_buffers() { tbb::concurrent_queue available_header_buffers; for (std::size_t i = 0; i < ZERO_COPY_NUM_HEADER_SLOTS; ++i) { available_header_buffers.push(i); } return available_header_buffers; } public: // Set by libfabric not the constructor fid_mr* mr{nullptr}; private: std::reference_wrapper m_domain; std::array m_header_buffer{}; tbb::concurrent_queue m_available_header_buffers; tbb::concurrent_vector m_keys{}; std::atomic m_num_available_buffers{ZERO_COPY_NUM_HEADER_SLOTS}; std::atomic m_min_num_available_buffers{ZERO_COPY_NUM_HEADER_SLOTS}; }; template concept BufferConcept = std::is_same_v || std::is_same_v; } // namespace netio3::libfabric #endif // NETIO3BACKEND_BACKENDLIBFABRIC_BUFFER_HPP