Program Listing for File SendBufferManager.hpp
↰ Return to documentation for file (BackendLibfabric/SendBufferManager.hpp)
#ifndef NETIO3BACKEND_BACKENDLIBFABRIC_SENDBUFFERMANAGER_HPP
#define NETIO3BACKEND_BACKENDLIBFABRIC_SENDBUFFERMANAGER_HPP
#include <tracy/Tracy.hpp>
#include "Buffer.hpp"
#include "netio3-backend/Issues.hpp"
#include "netio3-backend/Netio3Backend.hpp"
namespace netio3::libfabric {
class SendBufferManager
{
public:
SendBufferManager(ConnectionParametersSendBuffered connection_params, DomainManager& domain_manager);
SendBufferManager(const SendBufferManager&) = delete;
SendBufferManager(SendBufferManager&&) = delete;
SendBufferManager& operator=(const SendBufferManager&) = delete;
SendBufferManager& operator=(SendBufferManager&&) = delete;
~SendBufferManager();
NetworkBuffer* get_buffer()
{
ZoneScoped;
if (m_buffers.empty()) {
throw NoBuffersAllocated(ERS_HERE, "", 0);
}
ERS_DEBUG(1,
std::format("m_buffers.size={} m_available_buffers.size={} size of buffers={}",
m_buffers.size(),
m_available_buffers.unsafe_size(),
m_buffers[0].size()));
if (m_available_buffers.empty()) {
ERS_DEBUG(2, "No buffers available");
return nullptr;
}
std::uint64_t key{};
const auto success = m_available_buffers.try_pop(key);
if (not success) {
ERS_DEBUG(2, "No buffers available");
return nullptr;
}
m_min_num_available_buffers.store(
std::min(m_num_available_buffers.fetch_sub(1, std::memory_order_relaxed) - 1,
m_min_num_available_buffers.load(std::memory_order_relaxed)),
std::memory_order_relaxed);
return &m_buffers.at(key);
}
[[nodiscard]] std::uint64_t release_buffer(std::uint64_t bufnum)
{
ERS_DEBUG(2, std::format("Releasing buffer {} pos was {}", bufnum, m_buffers[bufnum].pos()));
m_buffers[bufnum].reset();
m_available_buffers.push(bufnum);
m_num_available_buffers.fetch_add(1, std::memory_order_relaxed);
return bufnum;
}
[[nodiscard]] std::size_t get_num_available_buffers();
private:
void init_buffers(DomainManager& domain_manager);
ConnectionParametersSendBuffered m_conn_parameters{};
std::vector<Buffer> m_buffers;
tbb::concurrent_queue<std::uint64_t> m_available_buffers;
std::atomic<std::size_t> m_num_available_buffers{0};
std::atomic<std::size_t> m_min_num_available_buffers{0};
};
} // namespace netio3::libfabric
#endif // NETIO3BACKEND_BACKENDLIBFABRIC_SENDBUFFERMANAGER_HPP