.. _program_listing_file_NetioSender.cpp: Program Listing for File NetioSender.cpp ======================================== |exhale_lsh| :ref:`Return to documentation for file ` (``NetioSender.cpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #include "netio3/NetioSender.hpp" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "netio3/MonitoringStats.hpp" #include "netio3/Utility.hpp" #include "netio3/BufferFormatter.hpp" #include "netio3/NetioIssues.hpp" using namespace netio3; NetioSender::Connection::Connection( NetioSender* sender, EndPointAddress ep, const bool thread_safe, BaseEventLoop* evloop) : m_sender(sender), m_flush_timer(evloop->create_timer([this](int){ std::ignore = flush_callback(); })), m_ep(std::move(ep)), m_thread_safe{thread_safe} { ERS_DEBUG(6, std::format("Flush timer fd {}", m_flush_timer.get_fd())); } NetioStatus NetioSender::Connection::flush_callback() { ERS_DEBUG(6, "Entered"); auto status = NetioStatus::OK; utility::ConditionalLockGuard lock(m_mutex, m_thread_safe); if (m_current_buffer != nullptr) { ERS_DEBUG(6, "sending buffer " << std::hex << m_current_buffer << std::dec << " to " << m_ep); try { status = m_sender->m_backend->send_buffer(m_ep, m_current_buffer); if (status != NetioStatus::NO_RESOURCES) { m_current_buffer = nullptr; } } catch (const UnknownActiveEndpoint& except) { ers::error(except); m_current_buffer = nullptr; } } if (m_current_buffer == nullptr) { m_flush_timer.stop(); } return status; } NetioSender::NetioSender(const NetioSenderConfig& config, std::shared_ptr evloop, const std::optional& local_ip) : m_evloop(std::move(evloop)), m_conn_params(convert_connection_parameters(config)), m_flush_interval(std::chrono::microseconds (config.flush_interval)), m_enable_timer(config.flush_interval != 0), m_backend_type{config.backend_type}, m_thread_safe{config.thread_safety == ThreadSafetyModel::SAFE} { ERS_DEBUG(5, "Entered."); const NetworkConfig backend_config{ .mode=config.backend_mode, .thread_safety = config.thread_safety, .callbacks{ .on_connection_established_cb = [this](const EndPointAddress& ep, const EndPointAddress&, EndpointCapabilities) { on_connection_established(ep);}, .on_connection_closed_cb = [this](const EndPointAddress& ep, const std::vector& keys) { on_connection_closed(ep, keys);}, .on_connection_refused_cb = [this](const EndPointAddress& ep) { on_connection_refused(ep);}, .on_send_completed_cb = [this](const EndPointAddress& ep, uint64_t key) { on_send_completed(ep, key);} }, .local_ip = local_ip, }; m_backend = NetworkBackend::create(config.backend_type, backend_config, m_evloop); ERS_DEBUG(5, "done"); } NetioSender::Connection& NetioSender::open_connection(const EndPointAddress& ep) { ERS_DEBUG(5, "ep=" << ep); try { m_backend->open_active_endpoint(ep, m_conn_params); } catch (const FailedOpenActiveEndpoint& issue) { ERS_DEBUG(2, "Caught FailedOpenActiveEndpoint calling internal callback"); on_connection_refused(ep); throw; } catch (const std::exception& except) { ERS_DEBUG(2, "Caught exception '" << except.what() << "' from open_send_endpoint"); throw; } utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe); m_connections.try_emplace(ep, this, ep, m_thread_safe, m_evloop.get()); utility::ConditionalLockGuard lock_ep(m_mutex_endpoints, m_thread_safe); m_endpoints.emplace(ep); ERS_DEBUG(5, "done"); return m_connections.at(ep); } void NetioSender::clear_connection(const EndPointAddress& ep) { ERS_DEBUG(5, "clearing connection to " << ep); m_connections.erase(ep); utility::ConditionalLockGuard lock_ep(m_mutex_endpoints, m_thread_safe); m_endpoints.erase(ep); ERS_DEBUG(5, "done"); } void NetioSender::close_connection(const EndPointAddress& ep) { ERS_DEBUG(5, "closing connection to " << ep); utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe); try { ERS_DEBUG(0, "Closing ep " << ep); m_backend->close_active_endpoint(ep); } catch (const UnknownActiveEndpoint& exc) { // Should we throw this rather than just report error? ers::error(exc); } ERS_DEBUG(5, "done"); } bool NetioSender::check_connection(const EndPointAddress& ep) { if (m_connections.contains(ep)) { return m_connections.at(ep).m_ready; } throw (NoConnection(ERS_HERE, ep.address(), ep.port())); } NetworkBuffer NetioSender::create_header(std::uint64_t tag, std::uint8_t user_status, std::uint32_t payload_size) { static_assert(ZERO_COPY_SIZE_HEADER >= BufferFormatter::HDR_MAX_SIZE, "Zero-copy header larger than header buffer"); auto header = NetworkBuffer(ZERO_COPY_SIZE_HEADER); auto formatter = BufferFormatter{}; formatter.fill_header(&header, user_status, tag, payload_size); return header; } NetioStatus NetioSender::send_data(const EndPointAddress& ep, uint64_t tag, const std::span data, uint8_t user_status) { ERS_DEBUG(6, std::format("NetioSender::send_data tag={:#x}", tag)); utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe); if (not m_connections.contains(ep)) { ERS_LOG("Failed to send data because no connection exists"); return NetioStatus::FAILED; } return send_data(m_connections.at(ep), tag, data, user_status); } NetioStatus NetioSender::send_data(const EndPointAddress& ep, uint64_t tag, const std::span iov, uint8_t user_status) { ERS_DEBUG(6, std::format("NetioSender::send_data tag={:#x}", tag)); utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe); if (not m_connections.contains(ep)) { ERS_LOG("Failed to send data because no connection exists"); return NetioStatus::FAILED; } return send_data(m_connections.at(ep), tag, iov, user_status); } NetioStatus NetioSender::send_data(Connection& con, uint64_t tag, const std::span data, uint8_t user_status) { ERS_DEBUG(6, std::format("NetioSender::send_data tag={:#x}", tag)); if (!con.m_ready) { return NetioStatus::NO_RESOURCES; } switch (m_backend_type) { case NetworkType::ASYNCMSG: { const auto header = create_header(tag, user_status, data.size()); return m_backend->send_data_copy(con.m_ep, data, header.data(), tag); } case NetworkType::LIBFABRIC: { utility::ConditionalLockGuard lock_con(con.m_mutex, m_thread_safe); if (con.m_current_buffer == nullptr) { con.m_current_buffer = m_backend->get_buffer(con.m_ep); if (con.m_current_buffer == nullptr) { return NetioStatus::NO_RESOURCES; } con.m_formatter.reset_buffer(con.m_current_buffer); } auto fmt_status = con.m_formatter.write(con.m_current_buffer, user_status, tag, data); NetioStatus status; if (fmt_status == FormatterStatus::BUFFER_OK) { status = m_backend->send_buffer(con.m_ep, con.m_current_buffer); con.m_current_buffer = nullptr; } else { ERS_DEBUG(5, std::format("Error {} returned from buffer formatter", formatter_status_string(fmt_status))); status = NetioStatus::FAILED; } ERS_DEBUG(6, "done"); return status; } default: return NetioStatus::FAILED; } } NetioStatus NetioSender::send_data(Connection& con, uint64_t tag, const std::span iov, uint8_t user_status) { ERS_DEBUG(6, std::format("NetioSender::send_data tag={:#x}", tag)); if (!con.m_ready) { return NetioStatus::NO_RESOURCES; } switch (m_backend_type) { case NetworkType::ASYNCMSG: { const auto payload_size = std::accumulate( iov.begin(), iov.end(), std::uint32_t{0}, [](std::uint32_t tot, iovec entry) { return tot + entry.iov_len; }); const auto header = create_header(tag, user_status, payload_size); return m_backend->send_data_copy(con.m_ep, iov, header.data(), tag); } case NetworkType::LIBFABRIC: { utility::ConditionalLockGuard lock_con(con.m_mutex, m_thread_safe); if (con.m_current_buffer == nullptr) { con.m_current_buffer = m_backend->get_buffer(con.m_ep); if (con.m_current_buffer == nullptr) { return NetioStatus::NO_RESOURCES; } con.m_formatter.reset_buffer(con.m_current_buffer); } auto fmt_status = con.m_formatter.write(con.m_current_buffer, user_status, tag, iov); NetioStatus status; if (fmt_status == FormatterStatus::BUFFER_OK) { status = m_backend->send_buffer(con.m_ep, con.m_current_buffer); con.m_current_buffer = nullptr; } else { status = NetioStatus::FAILED; } return status; } default: return NetioStatus::FAILED; } } NetioStatus NetioSender::send_data(const EndPointAddress& ep, uint64_t tag, const std::span> data, uint8_t user_status) { ERS_DEBUG(6, std::format("NetioSender::send_data tag={:#x}", tag)); utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe); if (not m_connections.contains(ep)) { ERS_LOG("Failed to send data because no connection exists"); return NetioStatus::FAILED; } return send_data(m_connections.at(ep), ep, tag, data, user_status); } NetioStatus NetioSender::send_data(Connection& con, const EndPointAddress& ep, uint64_t tag, const std::span> data, uint8_t user_status) { ERS_DEBUG(6, std::format("NetioSender::send_data tag={:#x}", tag)); if (!con.m_ready) { return NetioStatus::NO_RESOURCES; } switch (m_backend_type) { case NetworkType::ASYNCMSG: { const auto total_size = std::accumulate( data.begin(), data.end(), size_t{0}, [](size_t total, const std::span entry) { return total + entry.size() + BufferFormatter::HDR_MAX_SIZE; }); auto current_buffer = NetworkBuffer(total_size); con.m_formatter.reset_buffer(¤t_buffer); for (const auto& entry : data) { const auto fmt_status = con.m_formatter.write(¤t_buffer, user_status, tag, entry); if (fmt_status != FormatterStatus::BUFFER_OK) { return NetioStatus::FAILED; } } //This is a blank header const auto header = std::span(); return m_backend->send_data_copy(ep, current_buffer.data(), header, tag); } case NetworkType::LIBFABRIC: { if (con.m_current_buffer != nullptr) { const auto status = m_backend->send_buffer(ep, con.m_current_buffer); if (status != NetioStatus::OK) { return status; } } con.m_current_buffer = m_backend->get_buffer(ep); if (con.m_current_buffer == nullptr) { return NetioStatus::NO_RESOURCES; } con.m_formatter.reset_buffer(con.m_current_buffer); for (const auto& entry : data) { const auto fmt_status = con.m_formatter.write(con.m_current_buffer, user_status, tag, entry); if (fmt_status != FormatterStatus::BUFFER_OK) { return NetioStatus::FAILED; } } const auto status = m_backend->send_buffer(ep, con.m_current_buffer); con.m_formatter.reset_buffer(con.m_current_buffer); con.m_current_buffer = nullptr; return status; } default: return NetioStatus::FAILED; } } NetioStatus NetioSender::buffered_send_data(const EndPointAddress& ep, uint64_t tag, const std::span iov, uint8_t user_status) { ERS_DEBUG(7, "Entered buffered sending to " << ep); utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe); if (not m_connections.contains(ep)) { ERS_LOG("Failed to send data because no connection exists"); return NetioStatus::FAILED; } return buffered_send_data(m_connections.at(ep), tag, iov, user_status); } NetioStatus NetioSender::buffered_send_data(const EndPointAddress& ep, uint64_t tag, const std::span data, uint8_t user_status) { ERS_DEBUG(7, "Entered buffered sending to " << ep); utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe); if (not m_connections.contains(ep)) { ERS_LOG("Failed to send data because no connection exists"); return NetioStatus::FAILED; } return buffered_send_data(m_connections.at(ep), tag, data, user_status); } inline NetioStatus NetioSender::check_buffer(Connection& con, size_t size) { if (!con.m_ready) [[unlikely]] { return NetioStatus::NO_RESOURCES; } if (con.m_current_buffer == nullptr) [[unlikely]] { con.m_current_buffer = m_backend->get_buffer(con.m_ep); if (con.m_current_buffer == nullptr) [[unlikely]] { return NetioStatus::NO_RESOURCES; } con.m_formatter.reset_buffer(con.m_current_buffer); } const size_t space_required = size + BufferFormatter::HDR_MAX_SIZE; auto space_available = con.m_current_buffer->size() - con.m_current_buffer->pos(); ERS_DEBUG(7, "buffer size " << con.m_current_buffer->size() << " pos " << con.m_current_buffer->pos() << " space_available " << space_available << " space required " << space_required); if (space_available < space_required) [[unlikely]] { if (con.m_current_buffer->pos() > 0) [[likely]] { // Partially filled buffer, try to send it then get a new one const auto status = m_backend->send_buffer(con.m_ep, con.m_current_buffer); if (status != NetioStatus::OK) { return status; } con.m_current_buffer = m_backend->get_buffer(con.m_ep); if (con.m_current_buffer == nullptr) { return NetioStatus::NO_RESOURCES; } con.m_formatter.reset_buffer(con.m_current_buffer); } else { // Space required won't fit in an empty buffer so give up return NetioStatus::FAILED; } } // OK so we have space, copy the data into the buffer ERS_DEBUG(7, "Adding message of length " << space_required << " at position " << con.m_current_buffer->pos() << " in network_buffer at " << con.m_current_buffer); return NetioStatus::OK; } NetioStatus NetioSender::buffered_send_data(Connection& con, uint64_t tag, const std::span iov, uint8_t user_status) { ERS_DEBUG(7, "Entered buffered sending to " << con.m_ep); const size_t size = std::accumulate( iov.begin(), iov.end(), size_t{0}, [](int tot, iovec entry) { return tot + entry.iov_len; }); utility::ConditionalLockGuard lock_con(con.m_mutex, m_thread_safe); NetioStatus status = check_buffer(con, size); if (status == NetioStatus::OK) { const auto fmt_status = con.m_formatter.write( con.m_current_buffer, user_status, tag, iov); if (fmt_status == FormatterStatus::BUFFER_OK) [[likely]] { if (!con.m_flush_timer.is_running() && m_enable_timer) [[unlikely]] { con.m_flush_timer.start(m_flush_interval); } status = NetioStatus::OK; } else { status = NetioStatus::FAILED; } } return status; } NetioStatus NetioSender::buffered_send_data(Connection& con, uint64_t tag, const std::span data, uint8_t user_status) { ERS_DEBUG(7, "Entered buffered sending to " << con.m_ep); utility::ConditionalLockGuard lock_con(con.m_mutex, m_thread_safe); NetioStatus status = check_buffer(con, data.size()); if (status == NetioStatus::OK) { const auto fmt_status = con.m_formatter.write( con.m_current_buffer, user_status, tag, data); if (fmt_status == FormatterStatus::BUFFER_OK) [[likely]] { if (!con.m_flush_timer.is_running() && m_enable_timer) [[unlikely]] { con.m_flush_timer.start(m_flush_interval); } return NetioStatus::OK; } else { return NetioStatus::FAILED; } } return status; } NetioStatus NetioSender::flush_buffer(const EndPointAddress& ep) { utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe); if (m_connections.contains(ep)) { return m_connections.at(ep).flush_callback(); } return NetioStatus::FAILED; } std::vector NetioSender::get_num_available_buffers() { utility::ConditionalLockGuard lock_ep(m_mutex_endpoints, m_thread_safe); std::vector stats; std::ranges::transform( m_endpoints, std::back_inserter(stats), [this](const auto& ep) -> decltype(stats)::value_type { return {.ip = ep.address(), .port = ep.port(), .available_buffers = m_backend->get_num_available_buffers(ep)}; }); return stats; } NetioStatus NetioSender::zero_copy_send_data(const EndPointAddress& ep, std::uint64_t tag, const std::span iov, uint8_t user_status, std::uint64_t key) { ERS_DEBUG(7, "Entered zero-copy sending to " << ep); utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe); if (not m_connections.contains(ep)) { ERS_LOG("Failed to send data because no connection exists"); return NetioStatus::FAILED; } return zero_copy_send_data(m_connections.at(ep), tag, iov, user_status, key); } NetioStatus NetioSender::zero_copy_send_data(const EndPointAddress& ep, std::uint64_t tag, const std::span data, uint8_t user_status, std::uint64_t key) { ERS_DEBUG(7, "Entered zero-copy sending to " << ep); utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe); if (not m_connections.contains(ep)) { ERS_LOG("Failed to send data because no connection exists"); return NetioStatus::FAILED; } return zero_copy_send_data(m_connections.at(ep), tag, data, user_status, key); } NetioStatus NetioSender::zero_copy_send_data(Connection& con, std::uint64_t tag, const std::span iov, uint8_t user_status, std::uint64_t key) { ERS_DEBUG(7, "Entered zero-copy sending to " << con.m_ep); if (!con.m_ready) { return NetioStatus::NO_RESOURCES; } const auto payload_size = std::accumulate( iov.begin(), iov.end(), std::uint32_t{0}, [](std::uint32_t tot, iovec entry) { return tot + entry.iov_len; }); if (payload_size > BufferFormatter::MSG_MAX_SIZE) { return NetioStatus::FAILED; } const auto header = create_header(tag, user_status, payload_size); NetioStatus status = m_backend->send_data(con.m_ep, iov, header.data(), key); return status; } NetioStatus NetioSender::zero_copy_send_data(Connection& con, std::uint64_t tag, const std::span data, uint8_t user_status, std::uint64_t key) { ERS_DEBUG(7, "Entered zero-copy sending to " << con.m_ep); if (!con.m_ready) { return NetioStatus::NO_RESOURCES; } if (data.size() + ZERO_COPY_SIZE_HEADER > BufferFormatter::MSG_MAX_SIZE) { return NetioStatus::FAILED; } const auto header = create_header(tag, user_status, data.size()); NetioStatus status = m_backend->send_data(con.m_ep, data, header.data(), key); return status; } void NetioSender::on_connection_closed(const EndPointAddress& ep, const std::vector& keys) { ERS_DEBUG(5, "Entered: ep=" << ep); m_on_connection_closed_internal_cb(ep, keys); { utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe); clear_connection(ep); } m_on_connection_closed_cb(ep); } void NetioSender::on_connection_established(const EndPointAddress& ep) { ERS_DEBUG(5, "Entered with ep " << ep); { utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe); m_connections.at(ep).m_ready = true; } if (m_on_connection_established_cb) { m_on_connection_established_cb(ep); } } void NetioSender::on_connection_refused(const EndPointAddress& ep) { ERS_DEBUG(5, "Entered: ep=" << ep); { utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe); m_connections.erase(ep); utility::ConditionalLockGuard lock_ep(m_mutex_endpoints, m_thread_safe); m_endpoints.erase(ep); } if (m_on_connection_refused_cb) { m_on_connection_refused_cb(ep); } } void NetioSender::on_send_completed(const EndPointAddress& ep, uint64_t key) { m_on_send_completed_cb(ep, key); } void NetioSender::set_on_connection_established(const OnConnectionEstablishedSimpleCb& cb) { m_on_connection_established_cb = cb; } void NetioSender::set_on_connection_refused(const OnConnectionRefusedCb& cb) { m_on_connection_refused_cb = cb; } void NetioSender::set_on_connection_closed(const OnConnectionClosedCb& cb) { m_on_connection_closed_cb = cb; } void NetioSender::set_on_connection_closed_internal(const OnConnectionClosedKeysCb& cb) { m_on_connection_closed_internal_cb = cb; } void NetioSender::set_on_send_completed(const OnSendCompleted& cb){ m_on_send_completed_cb = cb; } ConnectionParameters NetioSender::convert_connection_parameters(const NetioSenderConfig& config) { if (config.mr_start != nullptr) { return {.send_zero_copy_params{.buf_size = config.buffersize, .mr_start = config.mr_start}}; } return {.send_buffered_params{.buf_size = config.buffersize, .num_buf = config.nbuffers}}; }