Program Listing for File SendSocketBuffered.hpp
↰ Return to documentation for file (BackendLibfabric/SendSocketBuffered.hpp
)
#ifndef NETIO3BACKEND_BACKENDLIBFABRIC_SENDSOCKETBUFFERED_HPP
#define NETIO3BACKEND_BACKENDLIBFABRIC_SENDSOCKETBUFFERED_HPP
#include <atomic>
#include <tbb/concurrent_queue.h>
#include <tracy/Tracy.hpp>
#include "SendSocket.hpp"
#include "DomainManager.hpp"
#include "Helpers.hpp"
#include "netio3-backend/Netio3Backend.hpp"
#include "netio3-backend/NetworkBuffer.hpp"
namespace netio3::libfabric {
class SendSocketBuffered : public SendSocket
{
public:
SendSocketBuffered(EndPointAddress address,
const ConnectionParameters& connection_params,
NetworkMode mode,
fid_fabric* fabric,
DomainContext& domain);
SendSocketBuffered(const SendSocketBuffered&) = delete;
SendSocketBuffered(SendSocketBuffered&&) = delete;
SendSocketBuffered& operator=(const SendSocketBuffered&) = delete;
SendSocketBuffered& operator=(SendSocketBuffered&&) = delete;
~SendSocketBuffered();
[[nodiscard]] auto operator<=>(const SendSocketBuffered& other) const
{
return get_endpoint().cqfd <=> other.get_endpoint().cqfd;
}
NetworkBuffer* get_buffer()
{
ZoneScoped;
if (m_buffers.empty()) {
throw NoBuffersAllocated(ERS_HERE, get_address().address(), get_address().port());
}
ERS_DEBUG(
1,
std::format(
"ssocket addr {}:{} m_buffers.size={} m_available_buffers.size={} size of buffers={}",
get_address().address(),
get_address().port(),
m_buffers.size(),
m_available_buffers.unsafe_size(),
m_buffers[0].size()));
if (m_available_buffers.empty()) {
ERS_DEBUG(2, "No buffers available");
return nullptr;
}
std::uint64_t key{};
const auto success = m_available_buffers.try_pop(key);
if (not success) {
ERS_DEBUG(2, "No buffers available");
return nullptr;
}
m_min_num_available_buffers.store(
std::min(m_num_available_buffers.fetch_sub(1, std::memory_order_relaxed) - 1,
m_min_num_available_buffers.load(std::memory_order_relaxed)),
std::memory_order_relaxed);
return &m_buffers.at(key);
}
NetioStatus send_buffer(Buffer* buffer, std::size_t size) const
{
ZoneScoped;
iovec iov{};
iov.iov_base = buffer->data().data();
iov.iov_len = size;
return send_data(iov, buffer->mr, buffer->get_key());
}
[[nodiscard]] std::uint64_t release_buffer(std::uint64_t bufnum)
{
ERS_DEBUG(2, std::format("Releasing buffer {} pos was {}", bufnum, m_buffers[bufnum].pos()));
m_buffers[bufnum].reset();
m_available_buffers.push(bufnum);
m_num_available_buffers.fetch_add(1, std::memory_order_relaxed);
return bufnum;
}
[[nodiscard]] std::size_t get_num_available_buffers();
[[nodiscard]] std::vector<std::uint64_t> get_pending_sends() const { return {}; }
private:
void init_buffers(DomainContext& domain);
[[nodiscard]] ConnectionParameters prepare_connection_parameters(
const ConnectionParameters& requested) const;
ConnectionParameters m_conn_parameters{};
std::vector<Buffer> m_buffers;
tbb::concurrent_queue<std::uint64_t> m_available_buffers;
std::atomic<std::size_t> m_num_available_buffers{0};
std::atomic<std::size_t> m_min_num_available_buffers{0};
};
} // namespace netio3::libfabric
#endif // NETIO3BACKEND_BACKENDLIBFABRIC_SENDSOCKETBUFFERED_HPP