Program Listing for File NetioSender.cpp

Return to documentation for file (NetioSender.cpp)

#include "netio3/NetioSender.hpp"

#include <algorithm>
#include <chrono>
#include <cstdint>
#include <cstddef>
#include <cstring>
#include <exception>
#include <memory>
#include <numeric>
#include <set>
#include <span>
#include <utility>
#include <vector>

#include <ers/ers.h>

#include <netio3-backend/EventLoop/BaseEventLoop.hpp>
#include <netio3-backend/Issues.hpp>
#include <netio3-backend/Netio3Backend.hpp>
#include <netio3-backend/NetworkBuffer.hpp>

#include "netio3/MonitoringStats.hpp"
#include "netio3/Utility.hpp"
#include "netio3/BufferFormatter.hpp"
#include "netio3/NetioIssues.hpp"

using namespace netio3;

NetioSender::Connection::Connection(
    NetioSender* sender,
    EndPointAddress ep,
    const bool thread_safe,
    BaseEventLoop* evloop) :
    m_sender(sender),
    m_flush_timer(evloop->create_timer([this](int){ std::ignore = flush_callback(); })),
    m_ep(std::move(ep)),
    m_thread_safe{thread_safe} {
    ERS_DEBUG(6, std::format("Flush timer fd {}", m_flush_timer.get_fd()));
}

NetioStatus NetioSender::Connection::flush_callback() {
    ERS_DEBUG(6, "Entered");
    auto status = NetioStatus::OK;
    utility::ConditionalLockGuard lock(m_mutex, m_thread_safe);
    if (m_current_buffer != nullptr) {
        ERS_DEBUG(6, "sending buffer " << std::hex << m_current_buffer
                  << std::dec << " to " << m_ep);
        try {
            status = m_sender->m_backend->send_buffer(m_ep, m_current_buffer);
            if (status != NetioStatus::NO_RESOURCES) {
                m_current_buffer = nullptr;
            }
        }
        catch (const UnknownActiveEndpoint& except) {
            ers::error(except);
            m_current_buffer = nullptr;
        }
    }
    if (m_current_buffer == nullptr) {
        m_flush_timer.stop();
    }
    return status;
}

NetioSender::NetioSender(const NetioSenderConfig& config,
                         std::shared_ptr<BaseEventLoop> evloop,
                         const std::optional<std::string>& local_ip) :
  m_evloop(std::move(evloop)),
    m_conn_params(convert_connection_parameters(config)),
    m_flush_interval(std::chrono::microseconds (config.flush_interval)),
    m_enable_timer(config.flush_interval != 0),
    m_backend_type{config.backend_type},
    m_thread_safe{config.thread_safety == ThreadSafetyModel::SAFE} {
    ERS_DEBUG(5, "Entered.");
    const NetworkConfig backend_config{
        .mode=config.backend_mode,
        .thread_safety = config.thread_safety,
        .callbacks{
            .on_connection_established_cb = [this](const EndPointAddress& ep, const EndPointAddress&, EndpointCapabilities) {
                on_connection_established(ep);},
            .on_connection_closed_cb = [this](const EndPointAddress& ep, const std::vector<std::uint64_t>& keys) {
                on_connection_closed(ep, keys);},
            .on_connection_refused_cb = [this](const EndPointAddress& ep) {
                on_connection_refused(ep);},
            .on_send_completed_cb = [this](const EndPointAddress& ep,
                                        uint64_t key) {
                on_send_completed(ep, key);}
        },
        .local_ip = local_ip,
    };
    m_backend = NetworkBackend::create(config.backend_type,
                                       backend_config, m_evloop);
    ERS_DEBUG(5, "done");
}

NetioSender::Connection& NetioSender::open_connection(const EndPointAddress& ep) {
    ERS_DEBUG(5, "ep=" << ep);
    try {
        m_backend->open_active_endpoint(ep, m_conn_params);
    }
    catch (const FailedOpenActiveEndpoint& issue) {
        ERS_DEBUG(2, "Caught FailedOpenActiveEndpoint  calling internal callback");
        on_connection_refused(ep);
        throw;
    }
    catch (const std::exception& except) {
        ERS_DEBUG(2, "Caught exception '" << except.what()
                  << "' from open_send_endpoint");
        throw;
    }

    utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe);
    m_connections.try_emplace(ep, this, ep, m_thread_safe, m_evloop.get());
    utility::ConditionalLockGuard lock_ep(m_mutex_endpoints, m_thread_safe);
    m_endpoints.emplace(ep);
    ERS_DEBUG(5, "done");
    return m_connections.at(ep);
}

void NetioSender::clear_connection(const EndPointAddress& ep) {
    ERS_DEBUG(5, "clearing connection to " << ep);
    m_connections.erase(ep);
    utility::ConditionalLockGuard lock_ep(m_mutex_endpoints, m_thread_safe);
    m_endpoints.erase(ep);
    ERS_DEBUG(5, "done");
}

void NetioSender::close_connection(const EndPointAddress& ep) {
    ERS_DEBUG(5, "closing connection to " << ep);
    utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe);
    try {
        ERS_DEBUG(0, "Closing ep " << ep);
        m_backend->close_active_endpoint(ep);
    }
    catch (const UnknownActiveEndpoint& exc) {
        // Should we throw this rather than just report error?
        ers::error(exc);
    }
    ERS_DEBUG(5, "done");
}

bool NetioSender::check_connection(const EndPointAddress& ep) {
    if (m_connections.contains(ep)) {
        return m_connections.at(ep).m_ready;
    }
    throw (NoConnection(ERS_HERE, ep.address(), ep.port()));
}

NetworkBuffer NetioSender::create_header(std::uint64_t tag, std::uint8_t user_status, std::uint32_t payload_size) {
    static_assert(ZERO_COPY_SIZE_HEADER >= BufferFormatter::HDR_MAX_SIZE, "Zero-copy header larger than header buffer");
    auto header = NetworkBuffer(ZERO_COPY_SIZE_HEADER);
    auto formatter = BufferFormatter{};
    formatter.fill_header(&header, user_status, tag, payload_size);
    return header;
}

NetioStatus NetioSender::send_data(const EndPointAddress& ep, uint64_t tag,
                                   const std::span<const uint8_t> data,
                                   uint8_t user_status) {
    ERS_DEBUG(6, std::format("NetioSender::send_data tag={:#x}", tag));

    utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe);
    if (not m_connections.contains(ep)) {
        ERS_LOG("Failed to send data because no connection exists");
        return NetioStatus::FAILED;
    }
    return send_data(m_connections.at(ep), tag, data, user_status);
}

NetioStatus NetioSender::send_data(const EndPointAddress& ep, uint64_t tag,
                                   const std::span<const iovec> iov,
                                   uint8_t user_status) {
    ERS_DEBUG(6, std::format("NetioSender::send_data tag={:#x}", tag));

    utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe);
    if (not m_connections.contains(ep)) {
        ERS_LOG("Failed to send data because no connection exists");
        return NetioStatus::FAILED;
    }
    return send_data(m_connections.at(ep), tag, iov, user_status);
}

NetioStatus NetioSender::send_data(Connection& con, uint64_t tag,
                                   const std::span<const uint8_t> data,
                                   uint8_t user_status) {
    ERS_DEBUG(6, std::format("NetioSender::send_data tag={:#x}", tag));
    if (!con.m_ready) {
        return NetioStatus::NO_RESOURCES;
    }

    switch (m_backend_type) {
    case NetworkType::ASYNCMSG: {
        const auto header = create_header(tag, user_status, data.size());
        return m_backend->send_data_copy(con.m_ep, data, header.data(), tag);
    }
    case NetworkType::LIBFABRIC: {
        utility::ConditionalLockGuard lock_con(con.m_mutex, m_thread_safe);
        if (con.m_current_buffer == nullptr) {
            con.m_current_buffer = m_backend->get_buffer(con.m_ep);
            if (con.m_current_buffer == nullptr) {
                return NetioStatus::NO_RESOURCES;
            }
            con.m_formatter.reset_buffer(con.m_current_buffer);
        }

        auto fmt_status = con.m_formatter.write(con.m_current_buffer, user_status, tag, data);
        NetioStatus status;
        if (fmt_status == FormatterStatus::BUFFER_OK) {
            status = m_backend->send_buffer(con.m_ep, con.m_current_buffer);
            con.m_current_buffer = nullptr;
        } else {
            ERS_DEBUG(5,
                      std::format("Error {} returned from buffer formatter",
                                  formatter_status_string(fmt_status)));
            status = NetioStatus::FAILED;
        }
        ERS_DEBUG(6, "done");
        return status;
    }
    default:
        return NetioStatus::FAILED;
    }
}

NetioStatus NetioSender::send_data(Connection& con, uint64_t tag,
                                   const std::span<const iovec> iov,
                                   uint8_t user_status) {
    ERS_DEBUG(6, std::format("NetioSender::send_data tag={:#x}", tag));

    if (!con.m_ready) {
        return NetioStatus::NO_RESOURCES;
    }

    switch (m_backend_type) {
    case NetworkType::ASYNCMSG: {
        const auto payload_size = std::accumulate(
        iov.begin(), iov.end(), std::uint32_t{0}, [](std::uint32_t tot, iovec entry) {
            return tot + entry.iov_len;
        });
        const auto header = create_header(tag, user_status, payload_size);
        return m_backend->send_data_copy(con.m_ep, iov, header.data(), tag);
    }
    case NetworkType::LIBFABRIC: {
        utility::ConditionalLockGuard lock_con(con.m_mutex, m_thread_safe);
        if (con.m_current_buffer == nullptr) {
            con.m_current_buffer = m_backend->get_buffer(con.m_ep);
            if (con.m_current_buffer == nullptr) {
                return NetioStatus::NO_RESOURCES;
            }
            con.m_formatter.reset_buffer(con.m_current_buffer);
        }

        auto fmt_status = con.m_formatter.write(con.m_current_buffer,
                                                user_status, tag, iov);
        NetioStatus status;
        if (fmt_status == FormatterStatus::BUFFER_OK) {
            status = m_backend->send_buffer(con.m_ep, con.m_current_buffer);
            con.m_current_buffer = nullptr;
        }
        else {
            status = NetioStatus::FAILED;
        }
        return status;
    }
    default:
        return NetioStatus::FAILED;
    }
}

NetioStatus NetioSender::send_data(const EndPointAddress& ep, uint64_t tag,
                                   const std::span<const std::span<const uint8_t>> data,
                                   uint8_t user_status) {
    ERS_DEBUG(6, std::format("NetioSender::send_data tag={:#x}", tag));

    utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe);
    if (not m_connections.contains(ep)) {
        ERS_LOG("Failed to send data because no connection exists");
        return NetioStatus::FAILED;
    }
    return send_data(m_connections.at(ep), ep, tag, data, user_status);
}

NetioStatus NetioSender::send_data(Connection& con, const EndPointAddress& ep, uint64_t tag,
                                   const std::span<const std::span<const uint8_t>> data,
                                   uint8_t user_status) {
    ERS_DEBUG(6, std::format("NetioSender::send_data tag={:#x}", tag));

    if (!con.m_ready) {
        return NetioStatus::NO_RESOURCES;
    }

    switch (m_backend_type) {
    case NetworkType::ASYNCMSG: {
        const auto total_size = std::accumulate(
            data.begin(), data.end(), size_t{0},
            [](size_t total, const std::span<const uint8_t> entry) {
                return total + entry.size() + BufferFormatter::HDR_MAX_SIZE;
            });
        auto current_buffer = NetworkBuffer(total_size);
        con.m_formatter.reset_buffer(&current_buffer);
        for (const auto& entry : data) {
            const auto fmt_status =
                con.m_formatter.write(&current_buffer, user_status, tag, entry);
            if (fmt_status != FormatterStatus::BUFFER_OK) {
                return NetioStatus::FAILED;
            }
        }
        //This is a blank header
        const auto header = std::span<const unsigned char>();
        return m_backend->send_data_copy(ep, current_buffer.data(), header, tag);
    }
    case NetworkType::LIBFABRIC: {
        if (con.m_current_buffer != nullptr) {
            const auto status = m_backend->send_buffer(ep,
                                                       con.m_current_buffer);
            if (status != NetioStatus::OK) {
                return status;
            }
        }
        con.m_current_buffer = m_backend->get_buffer(ep);
        if (con.m_current_buffer == nullptr) {
            return NetioStatus::NO_RESOURCES;
        }
        con.m_formatter.reset_buffer(con.m_current_buffer);
        for (const auto& entry : data) {
            const auto fmt_status =
                con.m_formatter.write(con.m_current_buffer, user_status, tag, entry);
            if (fmt_status != FormatterStatus::BUFFER_OK) {
                return NetioStatus::FAILED;
            }
        }
        const auto status = m_backend->send_buffer(ep, con.m_current_buffer);
        con.m_formatter.reset_buffer(con.m_current_buffer);
        con.m_current_buffer = nullptr;
        return status;
    }
    default:
        return NetioStatus::FAILED;
    }
}

NetioStatus NetioSender::buffered_send_data(const EndPointAddress& ep,
                                            uint64_t tag,
                                            const std::span<const iovec> iov,
                                            uint8_t user_status) {
    ERS_DEBUG(7, "Entered buffered sending to " << ep);
    utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe);
    if (not m_connections.contains(ep)) {
        ERS_LOG("Failed to send data because no connection exists");
        return NetioStatus::FAILED;
    }
    return buffered_send_data(m_connections.at(ep), tag, iov, user_status);
}


NetioStatus NetioSender::buffered_send_data(const EndPointAddress& ep,
                                            uint64_t tag,
                                            const std::span<const uint8_t> data,
                                            uint8_t user_status) {
    ERS_DEBUG(7, "Entered buffered sending to " << ep);
    utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe);
    if (not m_connections.contains(ep)) {
        ERS_LOG("Failed to send data because no connection exists");
        return NetioStatus::FAILED;
    }
    return buffered_send_data(m_connections.at(ep), tag, data, user_status);
}

inline NetioStatus NetioSender::check_buffer(Connection& con, size_t size) {
    if (!con.m_ready) [[unlikely]] {
        return NetioStatus::NO_RESOURCES;
    }

    if (con.m_current_buffer == nullptr) [[unlikely]] {
        con.m_current_buffer = m_backend->get_buffer(con.m_ep);
        if (con.m_current_buffer == nullptr) [[unlikely]] {
            return NetioStatus::NO_RESOURCES;
        }
        con.m_formatter.reset_buffer(con.m_current_buffer);
    }

    const size_t space_required = size + BufferFormatter::HDR_MAX_SIZE;
    auto space_available =
        con.m_current_buffer->size() - con.m_current_buffer->pos();
    ERS_DEBUG(7, "buffer size " << con.m_current_buffer->size()
              << " pos " << con.m_current_buffer->pos()
              << " space_available " << space_available
              << " space required " << space_required);
    if (space_available < space_required) [[unlikely]] {
        if (con.m_current_buffer->pos() > 0) [[likely]] {
            // Partially filled buffer, try to send it then get a new one
            const auto status = m_backend->send_buffer(con.m_ep,
                                                       con.m_current_buffer);
            if (status != NetioStatus::OK) {
                return status;
            }
            con.m_current_buffer = m_backend->get_buffer(con.m_ep);
            if (con.m_current_buffer == nullptr) {
                return NetioStatus::NO_RESOURCES;
            }
            con.m_formatter.reset_buffer(con.m_current_buffer);
        }
        else {
            // Space required won't fit in an empty buffer so give up
            return NetioStatus::FAILED;
        }
    }

    // OK so we have space, copy the data into the buffer
    ERS_DEBUG(7, "Adding message of length " << space_required
              << " at position " << con.m_current_buffer->pos()
              << " in network_buffer at " << con.m_current_buffer);

    return NetioStatus::OK;
}

NetioStatus NetioSender::buffered_send_data(Connection& con,
                                            uint64_t tag,
                                            const std::span<const iovec> iov,
                                            uint8_t user_status) {
    ERS_DEBUG(7, "Entered buffered sending to " << con.m_ep);
    const size_t size = std::accumulate(
      iov.begin(), iov.end(), size_t{0}, [](int tot, iovec entry) { return tot + entry.iov_len; });

    utility::ConditionalLockGuard lock_con(con.m_mutex, m_thread_safe);
    NetioStatus status = check_buffer(con, size);
    if (status == NetioStatus::OK) {
        const auto fmt_status = con.m_formatter.write(
            con.m_current_buffer, user_status, tag, iov);
        if (fmt_status == FormatterStatus::BUFFER_OK) [[likely]] {
            if (!con.m_flush_timer.is_running() && m_enable_timer) [[unlikely]] {
                con.m_flush_timer.start(m_flush_interval);
            }
            status = NetioStatus::OK;
        }
        else {
            status = NetioStatus::FAILED;
        }
    }
    return status;
}


NetioStatus NetioSender::buffered_send_data(Connection& con,
                                            uint64_t tag,
                                            const std::span<const uint8_t> data,
                                            uint8_t user_status) {
    ERS_DEBUG(7, "Entered buffered sending to " << con.m_ep);

    utility::ConditionalLockGuard lock_con(con.m_mutex, m_thread_safe);
    NetioStatus status = check_buffer(con, data.size());
    if (status == NetioStatus::OK) {
        const auto fmt_status = con.m_formatter.write(
            con.m_current_buffer, user_status, tag, data);
        if (fmt_status == FormatterStatus::BUFFER_OK) [[likely]] {
            if (!con.m_flush_timer.is_running() && m_enable_timer) [[unlikely]] {
                con.m_flush_timer.start(m_flush_interval);
            }
            return NetioStatus::OK;
        }
        else {
            return NetioStatus::FAILED;
        }
    }
    return status;
}

NetioStatus NetioSender::flush_buffer(const EndPointAddress& ep) {
    utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe);

    if (m_connections.contains(ep)) {
        return m_connections.at(ep).flush_callback();
    }
    return NetioStatus::FAILED;
}

std::vector<netio3::BufferStats> NetioSender::get_num_available_buffers() {
    utility::ConditionalLockGuard lock_ep(m_mutex_endpoints, m_thread_safe);
    std::vector<netio3::BufferStats> stats;
    std::ranges::transform(
      m_endpoints, std::back_inserter(stats), [this](const auto& ep) -> decltype(stats)::value_type {
          return {.ip = ep.address(), .port = ep.port(), .available_buffers = m_backend->get_num_available_buffers(ep)};
      });
    return stats;
}


NetioStatus NetioSender::zero_copy_send_data(const EndPointAddress& ep,
                                             std::uint64_t tag,
                                             const std::span<const iovec> iov,
                                             uint8_t user_status,
                                             std::uint64_t key) {
    ERS_DEBUG(7, "Entered zero-copy sending to " << ep);
    utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe);
    if (not m_connections.contains(ep)) {
        ERS_LOG("Failed to send data because no connection exists");
        return NetioStatus::FAILED;
    }
    return zero_copy_send_data(m_connections.at(ep), tag, iov, user_status, key);
}


NetioStatus NetioSender::zero_copy_send_data(const EndPointAddress& ep,
                                             std::uint64_t tag,
                                             const std::span<std::uint8_t> data,
                                             uint8_t user_status,
                                             std::uint64_t key) {
    ERS_DEBUG(7, "Entered zero-copy sending to " << ep);
    utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe);
    if (not m_connections.contains(ep)) {
        ERS_LOG("Failed to send data because no connection exists");
        return NetioStatus::FAILED;
    }
    return zero_copy_send_data(m_connections.at(ep), tag, data, user_status, key);
}


NetioStatus NetioSender::zero_copy_send_data(Connection& con,
                                             std::uint64_t tag,
                                             const std::span<const iovec> iov,
                                             uint8_t user_status,
                                             std::uint64_t key) {
    ERS_DEBUG(7, "Entered zero-copy sending to " << con.m_ep);
    if (!con.m_ready) {
        return NetioStatus::NO_RESOURCES;
    }

    const auto payload_size = std::accumulate(
      iov.begin(), iov.end(), std::uint32_t{0}, [](std::uint32_t tot, iovec entry) {
          return tot + entry.iov_len;
      });
    if (payload_size > BufferFormatter::MSG_MAX_SIZE) {
        return NetioStatus::FAILED;
    }
    const auto header = create_header(tag, user_status, payload_size);

    NetioStatus status = m_backend->send_data(con.m_ep, iov, header.data(), key);

    return status;
}


NetioStatus NetioSender::zero_copy_send_data(Connection& con,
                                             std::uint64_t tag,
                                             const std::span<std::uint8_t> data,
                                             uint8_t user_status,
                                             std::uint64_t key) {
    ERS_DEBUG(7, "Entered zero-copy sending to " << con.m_ep);
    if (!con.m_ready) {
        return NetioStatus::NO_RESOURCES;
    }

    if (data.size() + ZERO_COPY_SIZE_HEADER > BufferFormatter::MSG_MAX_SIZE) {
        return NetioStatus::FAILED;
    }
    const auto header = create_header(tag, user_status, data.size());
    NetioStatus status = m_backend->send_data(con.m_ep, data, header.data(), key);

    return status;
}

void NetioSender::on_connection_closed(const EndPointAddress& ep, const std::vector<std::uint64_t>& keys) {
    ERS_DEBUG(5, "Entered: ep=" << ep);
    m_on_connection_closed_internal_cb(ep, keys);
    {
        utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe);
        clear_connection(ep);
    }
    m_on_connection_closed_cb(ep);
}

void NetioSender::on_connection_established(const EndPointAddress& ep) {
    ERS_DEBUG(5, "Entered with ep " << ep);
    {
        utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe);
        m_connections.at(ep).m_ready = true;
    }
    if (m_on_connection_established_cb) {
        m_on_connection_established_cb(ep);
    }
}

void NetioSender::on_connection_refused(const EndPointAddress& ep) {
    ERS_DEBUG(5, "Entered: ep=" << ep);
    {
        utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe);
        m_connections.erase(ep);
        utility::ConditionalLockGuard lock_ep(m_mutex_endpoints, m_thread_safe);
        m_endpoints.erase(ep);
    }
    if (m_on_connection_refused_cb) {
        m_on_connection_refused_cb(ep);
    }
}

void NetioSender::on_send_completed(const EndPointAddress& ep, uint64_t key) {
    m_on_send_completed_cb(ep, key);
}

void NetioSender::set_on_connection_established(const OnConnectionEstablishedSimpleCb& cb) {
    m_on_connection_established_cb = cb;
}

void NetioSender::set_on_connection_refused(const OnConnectionRefusedCb& cb) {
    m_on_connection_refused_cb = cb;
}

void NetioSender::set_on_connection_closed(const OnConnectionClosedCb& cb) {
    m_on_connection_closed_cb = cb;
}

void NetioSender::set_on_connection_closed_internal(const OnConnectionClosedKeysCb& cb) {
    m_on_connection_closed_internal_cb = cb;
}

void NetioSender::set_on_send_completed(const OnSendCompleted& cb){
    m_on_send_completed_cb = cb;
}

ConnectionParameters NetioSender::convert_connection_parameters(const NetioSenderConfig& config) {
    if (config.mr_start != nullptr) {
        return {.send_zero_copy_params{.buf_size = config.buffersize, .mr_start = config.mr_start}};
    }
    return {.send_buffered_params{.buf_size = config.buffersize, .num_buf = config.nbuffers}};
}