Program Listing for File SendBufferManager.hpp

Return to documentation for file (BackendLibfabric/SendBufferManager.hpp)

#ifndef NETIO3BACKEND_BACKENDLIBFABRIC_SENDBUFFERMANAGER_HPP
#define NETIO3BACKEND_BACKENDLIBFABRIC_SENDBUFFERMANAGER_HPP

#include <tracy/Tracy.hpp>

#include "Buffer.hpp"
#include "netio3-backend/Issues.hpp"
#include "netio3-backend/Netio3Backend.hpp"

namespace netio3::libfabric {
  class SendBufferManager
  {
  public:
    SendBufferManager(ConnectionParametersSendBuffered connection_params, DomainManager& domain_manager);
    SendBufferManager(const SendBufferManager&) = delete;
    SendBufferManager(SendBufferManager&&) = delete;
    SendBufferManager& operator=(const SendBufferManager&) = delete;
    SendBufferManager& operator=(SendBufferManager&&) = delete;

    ~SendBufferManager();

    NetworkBuffer* get_buffer()
    {
      ZoneScoped;
      if (m_buffers.empty()) {
        throw NoBuffersAllocated(ERS_HERE, "", 0);
      }
      ERS_DEBUG(1,
                std::format("m_buffers.size={} m_available_buffers.size={} size of buffers={}",
                            m_buffers.size(),
                            m_available_buffers.unsafe_size(),
                            m_buffers[0].size()));
      if (m_available_buffers.empty()) {
        ERS_DEBUG(2, "No buffers available");
        return nullptr;
      }
      std::uint64_t key{};
      const auto success = m_available_buffers.try_pop(key);
      if (not success) {
        ERS_DEBUG(2, "No buffers available");
        return nullptr;
      }
      m_min_num_available_buffers.store(
        std::min(m_num_available_buffers.fetch_sub(1, std::memory_order_relaxed) - 1,
                 m_min_num_available_buffers.load(std::memory_order_relaxed)),
        std::memory_order_relaxed);
      return &m_buffers.at(key);
    }

    [[nodiscard]] std::uint64_t release_buffer(std::uint64_t bufnum)
    {
      ERS_DEBUG(2, std::format("Releasing buffer {} pos was {}", bufnum, m_buffers[bufnum].pos()));
      m_buffers[bufnum].reset();
      m_available_buffers.push(bufnum);
      m_num_available_buffers.fetch_add(1, std::memory_order_relaxed);
      return bufnum;
    }

    [[nodiscard]] std::size_t get_num_available_buffers();

  private:
    void init_buffers(DomainManager& domain_manager);

    ConnectionParametersSendBuffered m_conn_parameters{};
    std::vector<Buffer> m_buffers;
    tbb::concurrent_queue<std::uint64_t> m_available_buffers;
    std::atomic<std::size_t> m_num_available_buffers{0};
    std::atomic<std::size_t> m_min_num_available_buffers{0};
  };
}  // namespace netio3::libfabric

#endif  // NETIO3BACKEND_BACKENDLIBFABRIC_SENDBUFFERMANAGER_HPP