Program Listing for File Buffer.hpp
↰ Return to documentation for file (BackendLibfabric/Buffer.hpp
)
#ifndef NETIO3BACKEND_BACKENDLIBFABRIC_BUFFER_HPP
#define NETIO3BACKEND_BACKENDLIBFABRIC_BUFFER_HPP
#include <algorithm>
#include <rdma/fabric.h>
#include <sys/uio.h>
#include <tbb/concurrent_queue.h>
#include <tbb/concurrent_vector.h>
#include "netio3-backend/Netio3Backend.hpp"
namespace netio3::libfabric {
class DomainContext;
class Buffer : public NetworkBuffer
{
public:
Buffer(DomainContext& domain, const std::size_t size, const std::uint64_t key) :
NetworkBuffer(size), m_domain{domain}, m_key{key}
{}
Buffer(DomainContext& domain, size_t size, std::uint8_t* buf) :
NetworkBuffer({buf, size}), m_domain{domain}
{}
[[nodiscard]] const std::uint8_t* get_buffer() { return data().data(); }
[[nodiscard]] std::size_t get_size() const { return size(); }
[[nodiscard]] DomainContext& get_domain() { return m_domain; }
[[nodiscard]] std::uint64_t get_key() const { return m_key; }
// Set by libfabric not the constructor
fid_mr* mr{nullptr};
private:
std::reference_wrapper<DomainContext> m_domain;
std::uint64_t m_key{};
};
using Header = std::array<std::uint8_t, ZERO_COPY_SIZE_HEADER>;
class HeaderBuffer
{
public:
struct HeaderWrapper {
NetioStatus status{NetioStatus::OK};
std::size_t bufnum{};
iovec data{};
};
explicit HeaderBuffer(DomainContext& domain) :
m_domain{domain},
m_available_header_buffers{init_available_header_buffers()},
m_keys(ZERO_COPY_NUM_HEADER_SLOTS)
{}
[[nodiscard]] HeaderWrapper get_header(const std::span<const std::uint8_t>& data,
const std::uint64_t key)
{
if (data.size() > ZERO_COPY_SIZE_HEADER) {
return {.status = NetioStatus::FAILED};
}
std::size_t idx{};
if (not m_available_header_buffers.try_pop(idx)) {
return {.status = NetioStatus::NO_RESOURCES};
}
std::ranges::copy(data, m_header_buffer.at(idx).begin());
m_keys.at(idx) = key;
m_min_num_available_buffers.store(
std::min(m_num_available_buffers.fetch_sub(1, std::memory_order_relaxed) - 1,
m_min_num_available_buffers.load(std::memory_order_relaxed)),
std::memory_order_relaxed);
return {.bufnum = idx, .data = {&m_header_buffer.at(idx), data.size()}};
}
[[nodiscard]] std::uint64_t return_header(const std::size_t idx)
{
m_available_header_buffers.push(idx);
m_num_available_buffers.fetch_add(1, std::memory_order_relaxed);
return m_keys.at(idx);
}
[[nodiscard]] char* get_buffer() { return reinterpret_cast<char*>(m_header_buffer.data()); }
[[nodiscard]] std::size_t get_size() const { return sizeof(m_header_buffer); }
[[nodiscard]] DomainContext& get_domain() { return m_domain; }
[[nodiscard]] std::size_t get_num_available_buffers()
{
return m_min_num_available_buffers.exchange(
m_num_available_buffers.load(std::memory_order_relaxed), std::memory_order_relaxed);
};
[[nodiscard]] std::vector<std::uint64_t> get_pending_sends();
private:
[[nodiscard]] static tbb::concurrent_queue<std::size_t> init_available_header_buffers()
{
tbb::concurrent_queue<std::size_t> available_header_buffers;
for (std::size_t i = 0; i < ZERO_COPY_NUM_HEADER_SLOTS; ++i) {
available_header_buffers.push(i);
}
return available_header_buffers;
}
public:
// Set by libfabric not the constructor
fid_mr* mr{nullptr};
private:
std::reference_wrapper<DomainContext> m_domain;
std::array<Header, ZERO_COPY_NUM_HEADER_SLOTS> m_header_buffer{};
tbb::concurrent_queue<std::size_t> m_available_header_buffers;
tbb::concurrent_vector<std::uint64_t> m_keys{};
std::atomic<std::size_t> m_num_available_buffers{ZERO_COPY_NUM_HEADER_SLOTS};
std::atomic<std::size_t> m_min_num_available_buffers{ZERO_COPY_NUM_HEADER_SLOTS};
};
template<typename BufferType>
concept BufferConcept =
std::is_same_v<BufferType, Buffer> || std::is_same_v<BufferType, HeaderBuffer>;
} // namespace netio3::libfabric
#endif // NETIO3BACKEND_BACKENDLIBFABRIC_BUFFER_HPP