Program Listing for File Buffer.hpp

Return to documentation for file (BackendLibfabric/Buffer.hpp)

#ifndef NETIO3BACKEND_BACKENDLIBFABRIC_BUFFER_HPP
#define NETIO3BACKEND_BACKENDLIBFABRIC_BUFFER_HPP

#include <algorithm>

#include <rdma/fabric.h>

#include <sys/uio.h>

#include <tbb/concurrent_queue.h>
#include <tbb/concurrent_vector.h>


#include "netio3-backend/Netio3Backend.hpp"

namespace netio3::libfabric {

  class DomainContext;

  class Buffer : public NetworkBuffer
  {
  public:
    Buffer(DomainContext& domain, const std::size_t size, const std::uint64_t key) :
      NetworkBuffer(size), m_domain{domain}, m_key{key}
    {}

    Buffer(DomainContext& domain, size_t size, std::uint8_t* buf) :
      NetworkBuffer({buf, size}), m_domain{domain}
    {}

    [[nodiscard]] const std::uint8_t* get_buffer() { return data().data(); }

    [[nodiscard]] std::size_t get_size() const { return size(); }

    [[nodiscard]] DomainContext& get_domain() { return m_domain; }

    [[nodiscard]] std::uint64_t get_key() const { return m_key; }

    // Set by libfabric not the constructor
    fid_mr* mr{nullptr};

  private:
    std::reference_wrapper<DomainContext> m_domain;
    std::uint64_t m_key{};
  };

  using Header = std::array<std::uint8_t, ZERO_COPY_SIZE_HEADER>;

  class HeaderBuffer
  {
  public:
    struct HeaderWrapper {
      NetioStatus status{NetioStatus::OK};
      std::size_t bufnum{};
      iovec data{};
    };

    explicit HeaderBuffer(DomainContext& domain) :
      m_domain{domain},
      m_available_header_buffers{init_available_header_buffers()},
      m_keys(ZERO_COPY_NUM_HEADER_SLOTS)
    {}

    [[nodiscard]] HeaderWrapper get_header(const std::span<const std::uint8_t>& data,
                                           const std::uint64_t key)
    {
      if (data.size() > ZERO_COPY_SIZE_HEADER) {
        return {.status = NetioStatus::FAILED};
      }
      std::size_t idx{};
      if (not m_available_header_buffers.try_pop(idx)) {
        return {.status = NetioStatus::NO_RESOURCES};
      }
      std::ranges::copy(data, m_header_buffer.at(idx).begin());
      m_keys.at(idx) = key;
      m_min_num_available_buffers.store(
        std::min(m_num_available_buffers.fetch_sub(1, std::memory_order_relaxed) - 1,
                 m_min_num_available_buffers.load(std::memory_order_relaxed)),
        std::memory_order_relaxed);
      return {.bufnum = idx, .data = {&m_header_buffer.at(idx), data.size()}};
    }

    [[nodiscard]] std::uint64_t return_header(const std::size_t idx)
    {
      m_available_header_buffers.push(idx);
      m_num_available_buffers.fetch_add(1, std::memory_order_relaxed);
      return m_keys.at(idx);
    }

    [[nodiscard]] char* get_buffer() { return reinterpret_cast<char*>(m_header_buffer.data()); }

    [[nodiscard]] std::size_t get_size() const { return sizeof(m_header_buffer); }

    [[nodiscard]] DomainContext& get_domain() { return m_domain; }

    [[nodiscard]] std::size_t get_num_available_buffers()
    {
      return m_min_num_available_buffers.exchange(
        m_num_available_buffers.load(std::memory_order_relaxed), std::memory_order_relaxed);
    };

    [[nodiscard]] std::vector<std::uint64_t> get_pending_sends();

  private:
    [[nodiscard]] static tbb::concurrent_queue<std::size_t> init_available_header_buffers()
    {
      tbb::concurrent_queue<std::size_t> available_header_buffers;
      for (std::size_t i = 0; i < ZERO_COPY_NUM_HEADER_SLOTS; ++i) {
        available_header_buffers.push(i);
      }
      return available_header_buffers;
    }

  public:
    // Set by libfabric not the constructor
    fid_mr* mr{nullptr};

  private:
    std::reference_wrapper<DomainContext> m_domain;
    std::array<Header, ZERO_COPY_NUM_HEADER_SLOTS> m_header_buffer{};
    tbb::concurrent_queue<std::size_t> m_available_header_buffers;
    tbb::concurrent_vector<std::uint64_t> m_keys{};
    std::atomic<std::size_t> m_num_available_buffers{ZERO_COPY_NUM_HEADER_SLOTS};
    std::atomic<std::size_t> m_min_num_available_buffers{ZERO_COPY_NUM_HEADER_SLOTS};
  };

  template<typename BufferType>
  concept BufferConcept =
    std::is_same_v<BufferType, Buffer> || std::is_same_v<BufferType, HeaderBuffer>;
}  // namespace netio3::libfabric

#endif  // NETIO3BACKEND_BACKENDLIBFABRIC_BUFFER_HPP