Program Listing for File SendSocketBuffered.hpp

Return to documentation for file (BackendLibfabric/SendSocketBuffered.hpp)

#ifndef NETIO3BACKEND_BACKENDLIBFABRIC_SENDSOCKETBUFFERED_HPP
#define NETIO3BACKEND_BACKENDLIBFABRIC_SENDSOCKETBUFFERED_HPP

#include <atomic>

#include <tbb/concurrent_queue.h>

#include <tracy/Tracy.hpp>

#include "SendSocket.hpp"
#include "DomainManager.hpp"
#include "Helpers.hpp"
#include "netio3-backend/Netio3Backend.hpp"
#include "netio3-backend/NetworkBuffer.hpp"

namespace netio3::libfabric {
  class SendSocketBuffered : public SendSocket
  {
  public:
    SendSocketBuffered(EndPointAddress address,
                       const ConnectionParameters& connection_params,
                       NetworkMode mode,
                       fid_fabric* fabric,
                       DomainContext& domain);
    SendSocketBuffered(const SendSocketBuffered&) = delete;
    SendSocketBuffered(SendSocketBuffered&&) = delete;
    SendSocketBuffered& operator=(const SendSocketBuffered&) = delete;
    SendSocketBuffered& operator=(SendSocketBuffered&&) = delete;

    ~SendSocketBuffered();

    [[nodiscard]] auto operator<=>(const SendSocketBuffered& other) const
    {
      return get_endpoint().cqfd <=> other.get_endpoint().cqfd;
    }

    NetworkBuffer* get_buffer()
    {
      ZoneScoped;
      if (m_buffers.empty()) {
        throw NoBuffersAllocated(ERS_HERE, get_address().address(), get_address().port());
      }
      ERS_DEBUG(
        1,
        std::format(
          "ssocket addr {}:{} m_buffers.size={} m_available_buffers.size={} size of buffers={}",
          get_address().address(),
          get_address().port(),
          m_buffers.size(),
          m_available_buffers.unsafe_size(),
          m_buffers[0].size()));
      if (m_available_buffers.empty()) {
        ERS_DEBUG(2, "No buffers available");
        return nullptr;
      }
      std::uint64_t key{};
      const auto success = m_available_buffers.try_pop(key);
      if (not success) {
        ERS_DEBUG(2, "No buffers available");
        return nullptr;
      }
      m_min_num_available_buffers.store(
        std::min(m_num_available_buffers.fetch_sub(1, std::memory_order_relaxed) - 1,
                 m_min_num_available_buffers.load(std::memory_order_relaxed)),
        std::memory_order_relaxed);
      return &m_buffers.at(key);
    }

    NetioStatus send_buffer(Buffer* buffer, std::size_t size) const
    {
      ZoneScoped;
      iovec iov{};
      iov.iov_base = buffer->data().data();
      iov.iov_len = size;
      return send_data(iov, buffer->mr, buffer->get_key());
    }

    [[nodiscard]] std::uint64_t release_buffer(std::uint64_t bufnum)
    {
      ERS_DEBUG(2, std::format("Releasing buffer {} pos was {}", bufnum, m_buffers[bufnum].pos()));
      m_buffers[bufnum].reset();
      m_available_buffers.push(bufnum);
      m_num_available_buffers.fetch_add(1, std::memory_order_relaxed);
      return bufnum;
    }

    [[nodiscard]] std::size_t get_num_available_buffers();

    [[nodiscard]] std::vector<std::uint64_t> get_pending_sends() const { return {}; }

  private:
    void init_buffers(DomainContext& domain);

    [[nodiscard]] ConnectionParameters prepare_connection_parameters(
      const ConnectionParameters& requested) const;

    ConnectionParameters m_conn_parameters{};
    std::vector<Buffer> m_buffers;
    tbb::concurrent_queue<std::uint64_t> m_available_buffers;
    std::atomic<std::size_t> m_num_available_buffers{0};
    std::atomic<std::size_t> m_min_num_available_buffers{0};
  };
}  // namespace netio3::libfabric

#endif  // NETIO3BACKEND_BACKENDLIBFABRIC_SENDSOCKETBUFFERED_HPP