Program Listing for File NetioSender.cpp
↰ Return to documentation for file (NetioSender.cpp)
#include "netio3/NetioSender.hpp"
#include <algorithm>
#include <chrono>
#include <cstdint>
#include <cstddef>
#include <cstring>
#include <exception>
#include <memory>
#include <numeric>
#include <set>
#include <span>
#include <utility>
#include <vector>
#include <ers/ers.h>
#include <netio3-backend/EventLoop/BaseEventLoop.hpp>
#include <netio3-backend/Issues.hpp>
#include <netio3-backend/Netio3Backend.hpp>
#include <netio3-backend/NetworkBuffer.hpp>
#include "netio3/MonitoringStats.hpp"
#include "netio3/Utility.hpp"
#include "netio3/BufferFormatter.hpp"
#include "netio3/NetioIssues.hpp"
using namespace netio3;
NetioSender::Connection::Connection(
NetioSender* sender,
EndPointAddress ep,
const bool thread_safe,
BaseEventLoop* evloop) :
m_sender(sender),
m_flush_timer(evloop->create_timer([this](int){ std::ignore = flush_callback(); })),
m_ep(std::move(ep)),
m_thread_safe{thread_safe} {
ERS_DEBUG(6, std::format("Flush timer fd {}", m_flush_timer.get_fd()));
}
NetioStatus NetioSender::Connection::flush_callback() {
ERS_DEBUG(6, "Entered");
auto status = NetioStatus::OK;
utility::ConditionalLockGuard lock(m_mutex, m_thread_safe);
if (m_current_buffer != nullptr) {
ERS_DEBUG(6, "sending buffer " << std::hex << m_current_buffer
<< std::dec << " to " << m_ep);
try {
status = m_sender->m_backend->send_buffer(m_ep, m_current_buffer);
if (status != NetioStatus::NO_RESOURCES) {
m_current_buffer = nullptr;
}
}
catch (const UnknownActiveEndpoint& except) {
ers::error(except);
m_current_buffer = nullptr;
}
}
if (m_current_buffer == nullptr) {
m_flush_timer.stop();
}
return status;
}
NetioSender::NetioSender(const NetioSenderConfig& config,
std::shared_ptr<BaseEventLoop> evloop,
const std::optional<std::string>& local_ip) :
m_evloop(std::move(evloop)),
m_conn_params(convert_connection_parameters(config)),
m_flush_interval(std::chrono::microseconds (config.flush_interval)),
m_enable_timer(config.flush_interval != 0),
m_backend_type{config.backend_type},
m_thread_safe{config.thread_safety == ThreadSafetyModel::SAFE} {
ERS_DEBUG(5, "Entered.");
const NetworkConfig backend_config{
.mode=config.backend_mode,
.thread_safety = config.thread_safety,
.callbacks{
.on_connection_established_cb = [this](const EndPointAddress& ep, const EndPointAddress&, EndpointCapabilities) {
on_connection_established(ep);},
.on_connection_closed_cb = [this](const EndPointAddress& ep, const std::vector<std::uint64_t>& keys) {
on_connection_closed(ep, keys);},
.on_connection_refused_cb = [this](const EndPointAddress& ep) {
on_connection_refused(ep);},
.on_send_completed_cb = [this](const EndPointAddress& ep,
uint64_t key) {
on_send_completed(ep, key);}
},
.local_ip = local_ip,
};
m_backend = NetworkBackend::create(config.backend_type,
backend_config, m_evloop);
ERS_DEBUG(5, "done");
}
NetioSender::Connection& NetioSender::open_connection(const EndPointAddress& ep) {
ERS_DEBUG(5, "ep=" << ep);
try {
m_backend->open_active_endpoint(ep, m_conn_params);
}
catch (const FailedOpenActiveEndpoint& issue) {
ERS_DEBUG(2, "Caught FailedOpenActiveEndpoint calling internal callback");
on_connection_refused(ep);
throw;
}
catch (const std::exception& except) {
ERS_DEBUG(2, "Caught exception '" << except.what()
<< "' from open_send_endpoint");
throw;
}
utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe);
m_connections.try_emplace(ep, this, ep, m_thread_safe, m_evloop.get());
utility::ConditionalLockGuard lock_ep(m_mutex_endpoints, m_thread_safe);
m_endpoints.emplace(ep);
ERS_DEBUG(5, "done");
return m_connections.at(ep);
}
void NetioSender::clear_connection(const EndPointAddress& ep) {
ERS_DEBUG(5, "clearing connection to " << ep);
m_connections.erase(ep);
utility::ConditionalLockGuard lock_ep(m_mutex_endpoints, m_thread_safe);
m_endpoints.erase(ep);
ERS_DEBUG(5, "done");
}
void NetioSender::close_connection(const EndPointAddress& ep) {
ERS_DEBUG(5, "closing connection to " << ep);
utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe);
try {
ERS_DEBUG(0, "Closing ep " << ep);
m_backend->close_active_endpoint(ep);
}
catch (const UnknownActiveEndpoint& exc) {
// Should we throw this rather than just report error?
ers::error(exc);
}
ERS_DEBUG(5, "done");
}
bool NetioSender::check_connection(const EndPointAddress& ep) {
if (m_connections.contains(ep)) {
return m_connections.at(ep).m_ready;
}
throw (NoConnection(ERS_HERE, ep.address(), ep.port()));
}
NetworkBuffer NetioSender::create_header(std::uint64_t tag, std::uint8_t user_status, std::uint32_t payload_size) {
static_assert(ZERO_COPY_SIZE_HEADER >= BufferFormatter::HDR_MAX_SIZE, "Zero-copy header larger than header buffer");
auto header = NetworkBuffer(ZERO_COPY_SIZE_HEADER);
auto formatter = BufferFormatter{};
formatter.fill_header(&header, user_status, tag, payload_size);
return header;
}
NetioStatus NetioSender::send_data(const EndPointAddress& ep, uint64_t tag,
const std::span<const uint8_t> data,
uint8_t user_status) {
ERS_DEBUG(6, std::format("NetioSender::send_data tag={:#x}", tag));
utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe);
if (not m_connections.contains(ep)) {
ERS_LOG("Failed to send data because no connection exists");
return NetioStatus::FAILED;
}
return send_data(m_connections.at(ep), tag, data, user_status);
}
NetioStatus NetioSender::send_data(const EndPointAddress& ep, uint64_t tag,
const std::span<const iovec> iov,
uint8_t user_status) {
ERS_DEBUG(6, std::format("NetioSender::send_data tag={:#x}", tag));
utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe);
if (not m_connections.contains(ep)) {
ERS_LOG("Failed to send data because no connection exists");
return NetioStatus::FAILED;
}
return send_data(m_connections.at(ep), tag, iov, user_status);
}
NetioStatus NetioSender::send_data(Connection& con, uint64_t tag,
const std::span<const uint8_t> data,
uint8_t user_status) {
ERS_DEBUG(6, std::format("NetioSender::send_data tag={:#x}", tag));
if (!con.m_ready) {
return NetioStatus::NO_RESOURCES;
}
switch (m_backend_type) {
case NetworkType::ASYNCMSG: {
const auto header = create_header(tag, user_status, data.size());
return m_backend->send_data_copy(con.m_ep, data, header.data(), tag);
}
case NetworkType::LIBFABRIC: {
utility::ConditionalLockGuard lock_con(con.m_mutex, m_thread_safe);
if (con.m_current_buffer == nullptr) {
con.m_current_buffer = m_backend->get_buffer(con.m_ep);
if (con.m_current_buffer == nullptr) {
return NetioStatus::NO_RESOURCES;
}
con.m_formatter.reset_buffer(con.m_current_buffer);
}
auto fmt_status = con.m_formatter.write(con.m_current_buffer, user_status, tag, data);
NetioStatus status;
if (fmt_status == FormatterStatus::BUFFER_OK) {
status = m_backend->send_buffer(con.m_ep, con.m_current_buffer);
con.m_current_buffer = nullptr;
} else {
ERS_DEBUG(5,
std::format("Error {} returned from buffer formatter",
formatter_status_string(fmt_status)));
status = NetioStatus::FAILED;
}
ERS_DEBUG(6, "done");
return status;
}
default:
return NetioStatus::FAILED;
}
}
NetioStatus NetioSender::send_data(Connection& con, uint64_t tag,
const std::span<const iovec> iov,
uint8_t user_status) {
ERS_DEBUG(6, std::format("NetioSender::send_data tag={:#x}", tag));
if (!con.m_ready) {
return NetioStatus::NO_RESOURCES;
}
switch (m_backend_type) {
case NetworkType::ASYNCMSG: {
const auto payload_size = std::accumulate(
iov.begin(), iov.end(), std::uint32_t{0}, [](std::uint32_t tot, iovec entry) {
return tot + entry.iov_len;
});
const auto header = create_header(tag, user_status, payload_size);
return m_backend->send_data_copy(con.m_ep, iov, header.data(), tag);
}
case NetworkType::LIBFABRIC: {
utility::ConditionalLockGuard lock_con(con.m_mutex, m_thread_safe);
if (con.m_current_buffer == nullptr) {
con.m_current_buffer = m_backend->get_buffer(con.m_ep);
if (con.m_current_buffer == nullptr) {
return NetioStatus::NO_RESOURCES;
}
con.m_formatter.reset_buffer(con.m_current_buffer);
}
auto fmt_status = con.m_formatter.write(con.m_current_buffer,
user_status, tag, iov);
NetioStatus status;
if (fmt_status == FormatterStatus::BUFFER_OK) {
status = m_backend->send_buffer(con.m_ep, con.m_current_buffer);
con.m_current_buffer = nullptr;
}
else {
status = NetioStatus::FAILED;
}
return status;
}
default:
return NetioStatus::FAILED;
}
}
NetioStatus NetioSender::send_data(const EndPointAddress& ep, uint64_t tag,
const std::span<const std::span<const uint8_t>> data,
uint8_t user_status) {
ERS_DEBUG(6, std::format("NetioSender::send_data tag={:#x}", tag));
utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe);
if (not m_connections.contains(ep)) {
ERS_LOG("Failed to send data because no connection exists");
return NetioStatus::FAILED;
}
return send_data(m_connections.at(ep), ep, tag, data, user_status);
}
NetioStatus NetioSender::send_data(Connection& con, const EndPointAddress& ep, uint64_t tag,
const std::span<const std::span<const uint8_t>> data,
uint8_t user_status) {
ERS_DEBUG(6, std::format("NetioSender::send_data tag={:#x}", tag));
if (!con.m_ready) {
return NetioStatus::NO_RESOURCES;
}
switch (m_backend_type) {
case NetworkType::ASYNCMSG: {
const auto total_size = std::accumulate(
data.begin(), data.end(), size_t{0},
[](size_t total, const std::span<const uint8_t> entry) {
return total + entry.size() + BufferFormatter::HDR_MAX_SIZE;
});
auto current_buffer = NetworkBuffer(total_size);
con.m_formatter.reset_buffer(¤t_buffer);
for (const auto& entry : data) {
const auto fmt_status =
con.m_formatter.write(¤t_buffer, user_status, tag, entry);
if (fmt_status != FormatterStatus::BUFFER_OK) {
return NetioStatus::FAILED;
}
}
//This is a blank header
const auto header = std::span<const unsigned char>();
return m_backend->send_data_copy(ep, current_buffer.data(), header, tag);
}
case NetworkType::LIBFABRIC: {
if (con.m_current_buffer != nullptr) {
const auto status = m_backend->send_buffer(ep,
con.m_current_buffer);
if (status != NetioStatus::OK) {
return status;
}
}
con.m_current_buffer = m_backend->get_buffer(ep);
if (con.m_current_buffer == nullptr) {
return NetioStatus::NO_RESOURCES;
}
con.m_formatter.reset_buffer(con.m_current_buffer);
for (const auto& entry : data) {
const auto fmt_status =
con.m_formatter.write(con.m_current_buffer, user_status, tag, entry);
if (fmt_status != FormatterStatus::BUFFER_OK) {
return NetioStatus::FAILED;
}
}
const auto status = m_backend->send_buffer(ep, con.m_current_buffer);
con.m_formatter.reset_buffer(con.m_current_buffer);
con.m_current_buffer = nullptr;
return status;
}
default:
return NetioStatus::FAILED;
}
}
NetioStatus NetioSender::buffered_send_data(const EndPointAddress& ep,
uint64_t tag,
const std::span<const iovec> iov,
uint8_t user_status) {
ERS_DEBUG(7, "Entered buffered sending to " << ep);
utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe);
if (not m_connections.contains(ep)) {
ERS_LOG("Failed to send data because no connection exists");
return NetioStatus::FAILED;
}
return buffered_send_data(m_connections.at(ep), tag, iov, user_status);
}
NetioStatus NetioSender::buffered_send_data(const EndPointAddress& ep,
uint64_t tag,
const std::span<const uint8_t> data,
uint8_t user_status) {
ERS_DEBUG(7, "Entered buffered sending to " << ep);
utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe);
if (not m_connections.contains(ep)) {
ERS_LOG("Failed to send data because no connection exists");
return NetioStatus::FAILED;
}
return buffered_send_data(m_connections.at(ep), tag, data, user_status);
}
inline NetioStatus NetioSender::check_buffer(Connection& con, size_t size) {
if (!con.m_ready) [[unlikely]] {
return NetioStatus::NO_RESOURCES;
}
if (con.m_current_buffer == nullptr) [[unlikely]] {
con.m_current_buffer = m_backend->get_buffer(con.m_ep);
if (con.m_current_buffer == nullptr) [[unlikely]] {
return NetioStatus::NO_RESOURCES;
}
con.m_formatter.reset_buffer(con.m_current_buffer);
}
const size_t space_required = size + BufferFormatter::HDR_MAX_SIZE;
auto space_available =
con.m_current_buffer->size() - con.m_current_buffer->pos();
ERS_DEBUG(7, "buffer size " << con.m_current_buffer->size()
<< " pos " << con.m_current_buffer->pos()
<< " space_available " << space_available
<< " space required " << space_required);
if (space_available < space_required) [[unlikely]] {
if (con.m_current_buffer->pos() > 0) [[likely]] {
// Partially filled buffer, try to send it then get a new one
const auto status = m_backend->send_buffer(con.m_ep,
con.m_current_buffer);
if (status != NetioStatus::OK) {
return status;
}
con.m_current_buffer = m_backend->get_buffer(con.m_ep);
if (con.m_current_buffer == nullptr) {
return NetioStatus::NO_RESOURCES;
}
con.m_formatter.reset_buffer(con.m_current_buffer);
}
else {
// Space required won't fit in an empty buffer so give up
return NetioStatus::FAILED;
}
}
// OK so we have space, copy the data into the buffer
ERS_DEBUG(7, "Adding message of length " << space_required
<< " at position " << con.m_current_buffer->pos()
<< " in network_buffer at " << con.m_current_buffer);
return NetioStatus::OK;
}
NetioStatus NetioSender::buffered_send_data(Connection& con,
uint64_t tag,
const std::span<const iovec> iov,
uint8_t user_status) {
ERS_DEBUG(7, "Entered buffered sending to " << con.m_ep);
const size_t size = std::accumulate(
iov.begin(), iov.end(), size_t{0}, [](int tot, iovec entry) { return tot + entry.iov_len; });
utility::ConditionalLockGuard lock_con(con.m_mutex, m_thread_safe);
NetioStatus status = check_buffer(con, size);
if (status == NetioStatus::OK) {
const auto fmt_status = con.m_formatter.write(
con.m_current_buffer, user_status, tag, iov);
if (fmt_status == FormatterStatus::BUFFER_OK) [[likely]] {
if (!con.m_flush_timer.is_running() && m_enable_timer) [[unlikely]] {
con.m_flush_timer.start(m_flush_interval);
}
status = NetioStatus::OK;
}
else {
status = NetioStatus::FAILED;
}
}
return status;
}
NetioStatus NetioSender::buffered_send_data(Connection& con,
uint64_t tag,
const std::span<const uint8_t> data,
uint8_t user_status) {
ERS_DEBUG(7, "Entered buffered sending to " << con.m_ep);
utility::ConditionalLockGuard lock_con(con.m_mutex, m_thread_safe);
NetioStatus status = check_buffer(con, data.size());
if (status == NetioStatus::OK) {
const auto fmt_status = con.m_formatter.write(
con.m_current_buffer, user_status, tag, data);
if (fmt_status == FormatterStatus::BUFFER_OK) [[likely]] {
if (!con.m_flush_timer.is_running() && m_enable_timer) [[unlikely]] {
con.m_flush_timer.start(m_flush_interval);
}
return NetioStatus::OK;
}
else {
return NetioStatus::FAILED;
}
}
return status;
}
NetioStatus NetioSender::flush_buffer(const EndPointAddress& ep) {
utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe);
if (m_connections.contains(ep)) {
return m_connections.at(ep).flush_callback();
}
return NetioStatus::FAILED;
}
std::vector<netio3::BufferStats> NetioSender::get_num_available_buffers() {
utility::ConditionalLockGuard lock_ep(m_mutex_endpoints, m_thread_safe);
std::vector<netio3::BufferStats> stats;
std::ranges::transform(
m_endpoints, std::back_inserter(stats), [this](const auto& ep) -> decltype(stats)::value_type {
return {.ip = ep.address(), .port = ep.port(), .available_buffers = m_backend->get_num_available_buffers(ep)};
});
return stats;
}
NetioStatus NetioSender::zero_copy_send_data(const EndPointAddress& ep,
std::uint64_t tag,
const std::span<const iovec> iov,
uint8_t user_status,
std::uint64_t key) {
ERS_DEBUG(7, "Entered zero-copy sending to " << ep);
utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe);
if (not m_connections.contains(ep)) {
ERS_LOG("Failed to send data because no connection exists");
return NetioStatus::FAILED;
}
return zero_copy_send_data(m_connections.at(ep), tag, iov, user_status, key);
}
NetioStatus NetioSender::zero_copy_send_data(const EndPointAddress& ep,
std::uint64_t tag,
const std::span<std::uint8_t> data,
uint8_t user_status,
std::uint64_t key) {
ERS_DEBUG(7, "Entered zero-copy sending to " << ep);
utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe);
if (not m_connections.contains(ep)) {
ERS_LOG("Failed to send data because no connection exists");
return NetioStatus::FAILED;
}
return zero_copy_send_data(m_connections.at(ep), tag, data, user_status, key);
}
NetioStatus NetioSender::zero_copy_send_data(Connection& con,
std::uint64_t tag,
const std::span<const iovec> iov,
uint8_t user_status,
std::uint64_t key) {
ERS_DEBUG(7, "Entered zero-copy sending to " << con.m_ep);
if (!con.m_ready) {
return NetioStatus::NO_RESOURCES;
}
const auto payload_size = std::accumulate(
iov.begin(), iov.end(), std::uint32_t{0}, [](std::uint32_t tot, iovec entry) {
return tot + entry.iov_len;
});
if (payload_size > BufferFormatter::MSG_MAX_SIZE) {
return NetioStatus::FAILED;
}
const auto header = create_header(tag, user_status, payload_size);
NetioStatus status = m_backend->send_data(con.m_ep, iov, header.data(), key);
return status;
}
NetioStatus NetioSender::zero_copy_send_data(Connection& con,
std::uint64_t tag,
const std::span<std::uint8_t> data,
uint8_t user_status,
std::uint64_t key) {
ERS_DEBUG(7, "Entered zero-copy sending to " << con.m_ep);
if (!con.m_ready) {
return NetioStatus::NO_RESOURCES;
}
if (data.size() + ZERO_COPY_SIZE_HEADER > BufferFormatter::MSG_MAX_SIZE) {
return NetioStatus::FAILED;
}
const auto header = create_header(tag, user_status, data.size());
NetioStatus status = m_backend->send_data(con.m_ep, data, header.data(), key);
return status;
}
void NetioSender::on_connection_closed(const EndPointAddress& ep, const std::vector<std::uint64_t>& keys) {
ERS_DEBUG(5, "Entered: ep=" << ep);
m_on_connection_closed_internal_cb(ep, keys);
{
utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe);
clear_connection(ep);
}
m_on_connection_closed_cb(ep);
}
void NetioSender::on_connection_established(const EndPointAddress& ep) {
ERS_DEBUG(5, "Entered with ep " << ep);
{
utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe);
m_connections.at(ep).m_ready = true;
}
if (m_on_connection_established_cb) {
m_on_connection_established_cb(ep);
}
}
void NetioSender::on_connection_refused(const EndPointAddress& ep) {
ERS_DEBUG(5, "Entered: ep=" << ep);
{
utility::ConditionalLockGuard lock(m_mutex_connection, m_thread_safe);
m_connections.erase(ep);
utility::ConditionalLockGuard lock_ep(m_mutex_endpoints, m_thread_safe);
m_endpoints.erase(ep);
}
if (m_on_connection_refused_cb) {
m_on_connection_refused_cb(ep);
}
}
void NetioSender::on_send_completed(const EndPointAddress& ep, uint64_t key) {
m_on_send_completed_cb(ep, key);
}
void NetioSender::set_on_connection_established(const OnConnectionEstablishedSimpleCb& cb) {
m_on_connection_established_cb = cb;
}
void NetioSender::set_on_connection_refused(const OnConnectionRefusedCb& cb) {
m_on_connection_refused_cb = cb;
}
void NetioSender::set_on_connection_closed(const OnConnectionClosedCb& cb) {
m_on_connection_closed_cb = cb;
}
void NetioSender::set_on_connection_closed_internal(const OnConnectionClosedKeysCb& cb) {
m_on_connection_closed_internal_cb = cb;
}
void NetioSender::set_on_send_completed(const OnSendCompleted& cb){
m_on_send_completed_cb = cb;
}
ConnectionParameters NetioSender::convert_connection_parameters(const NetioSenderConfig& config) {
if (config.mr_start != nullptr) {
return {.send_zero_copy_params{.buf_size = config.buffersize, .mr_start = config.mr_start}};
}
return {.send_buffered_params{.buf_size = config.buffersize, .num_buf = config.nbuffers}};
}