.. _program_listing_file_BackendLibfabric_SendSocketBuffered.hpp: Program Listing for File SendSocketBuffered.hpp =============================================== |exhale_lsh| :ref:`Return to documentation for file ` (``BackendLibfabric/SendSocketBuffered.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #ifndef NETIO3BACKEND_BACKENDLIBFABRIC_SENDSOCKETBUFFERED_HPP #define NETIO3BACKEND_BACKENDLIBFABRIC_SENDSOCKETBUFFERED_HPP #include #include #include #include "SendSocket.hpp" #include "DomainManager.hpp" #include "Helpers.hpp" #include "netio3-backend/Netio3Backend.hpp" #include "netio3-backend/NetworkBuffer.hpp" namespace netio3::libfabric { class SendSocketBuffered : public SendSocket { public: SendSocketBuffered(EndPointAddress address, const ConnectionParameters& connection_params, NetworkMode mode, fid_fabric* fabric, DomainContext& domain); SendSocketBuffered(const SendSocketBuffered&) = delete; SendSocketBuffered(SendSocketBuffered&&) = delete; SendSocketBuffered& operator=(const SendSocketBuffered&) = delete; SendSocketBuffered& operator=(SendSocketBuffered&&) = delete; ~SendSocketBuffered(); [[nodiscard]] auto operator<=>(const SendSocketBuffered& other) const { return get_endpoint().cqfd <=> other.get_endpoint().cqfd; } NetworkBuffer* get_buffer() { ZoneScoped; if (m_buffers.empty()) { throw NoBuffersAllocated(ERS_HERE, get_address().address(), get_address().port()); } ERS_DEBUG( 1, std::format( "ssocket addr {}:{} m_buffers.size={} m_available_buffers.size={} size of buffers={}", get_address().address(), get_address().port(), m_buffers.size(), m_available_buffers.unsafe_size(), m_buffers[0].size())); if (m_available_buffers.empty()) { ERS_DEBUG(2, "No buffers available"); return nullptr; } std::uint64_t key{}; const auto success = m_available_buffers.try_pop(key); if (not success) { ERS_DEBUG(2, "No buffers available"); return nullptr; } m_min_num_available_buffers.store( std::min(m_num_available_buffers.fetch_sub(1, std::memory_order_relaxed) - 1, m_min_num_available_buffers.load(std::memory_order_relaxed)), std::memory_order_relaxed); return &m_buffers.at(key); } NetioStatus send_buffer(Buffer* buffer, std::size_t size) const { ZoneScoped; iovec iov{}; iov.iov_base = buffer->data().data(); iov.iov_len = size; return send_data(iov, buffer->mr, buffer->get_key()); } [[nodiscard]] std::uint64_t release_buffer(std::uint64_t bufnum) { ERS_DEBUG(2, std::format("Releasing buffer {} pos was {}", bufnum, m_buffers[bufnum].pos())); m_buffers[bufnum].reset(); m_available_buffers.push(bufnum); m_num_available_buffers.fetch_add(1, std::memory_order_relaxed); return bufnum; } [[nodiscard]] std::size_t get_num_available_buffers(); [[nodiscard]] std::vector get_pending_sends() const { return {}; } private: void init_buffers(DomainContext& domain); [[nodiscard]] ConnectionParameters prepare_connection_parameters( const ConnectionParameters& requested) const; ConnectionParameters m_conn_parameters{}; std::vector m_buffers; tbb::concurrent_queue m_available_buffers; std::atomic m_num_available_buffers{0}; std::atomic m_min_num_available_buffers{0}; }; } // namespace netio3::libfabric #endif // NETIO3BACKEND_BACKENDLIBFABRIC_SENDSOCKETBUFFERED_HPP