Program Listing for File CqReactor.cpp
↰ Return to documentation for file (BackendLibfabric/CqReactor.cpp
)
#include "CqReactor.hpp"
#include <utility>
#include <rdma/fi_eq.h>
#include <tracy/Tracy.hpp>
#include "Issues.hpp"
#include "SendSocketZeroCopy.hpp"
#include "SendSocketBuffered.hpp"
netio3::libfabric::CqReactor::CqReactor(fid_fabric* fabric, OnDataCb on_data_cb) :
m_fabric{fabric}, m_on_data_cb{std::move(on_data_cb)}
{}
template<netio3::libfabric::SendSocketConcept SocketType>
std::vector<std::uint64_t> netio3::libfabric::CqReactor::on_send_socket_cq_event(
SocketType& ssocket) const
{
ZoneScoped;
ERS_DEBUG(2, "Entered");
std::array<fi_cq_data_entry, MAX_CQ_EVENTS> completion_entry{};
const auto ret =
fi_cq_read(ssocket.get_endpoint().cq.get(), &completion_entry, ssocket.get_endpoint().cq_size);
ERS_DEBUG(
3, std::format("send socket fd {} : {} completion events", ssocket.get_eq_ev_ctx().fd, ret));
if (ret < 0) {
if (ret == -FI_EAGAIN) { // If no completions are available to return from the CQ, -FI_EAGAIN
// will be returned.
fid* fp = &ssocket.get_endpoint().cq->fid;
fi_trywait(m_fabric, &fp, 1);
return {};
}
if (ret == -FI_EAVAIL) {
fi_cq_err_entry err_entry{};
const auto r = fi_cq_readerr(ssocket.get_endpoint().cq.get(), &err_entry, 0);
if (r < 0) {
ers::error(LibFabricCqError(
ERS_HERE,
std::format(
"Failed to retrieve details on Completion Queue error of send socket, error {} - {}",
r,
fi_strerror(-r))));
}
ers::error(LibFabricCqError(ERS_HERE,
std::format("Completion Queue read error {} of send socket: ",
err_entry.err,
fi_strerror(err_entry.err))));
ers::error(LibFabricCqError(ERS_HERE,
std::format("Provider-specific error {}: {}",
err_entry.prov_errno,
fi_cq_strerror(ssocket.get_endpoint().cq.get(),
err_entry.prov_errno,
err_entry.err_data,
nullptr,
0))));
if (err_entry.err == FI_EIO) {
// I/O error, the CM event code can handle this
ERS_LOG("Send socket CQ I/O error, connection possibly closed: ignored");
return {};
}
if (err_entry.err == FI_ECANCELED) {
// Operation Cancelled
ERS_LOG("Send socket CQ operation cancelled");
return {};
}
ers::error(LibFabricCqError(
ERS_HERE,
std::format("Send socket Completion Queue unhandled specific read error {}: {}",
ret,
fi_strerror(err_entry.err))));
} else {
ers::error(LibFabricCqError(
ERS_HERE,
std::format(
"Send socket Completion Queue unhandled read error {} - {}", ret, fi_strerror(-ret))));
}
} else {
std::vector<std::uint64_t> keys;
keys.reserve(ret);
for (long i = 0; i < ret; ++i) {
auto buffer_idx = reinterpret_cast<std::uint64_t>(completion_entry.at(i).op_context);
const auto key = ssocket.release_buffer(buffer_idx);
ERS_DEBUG(2, std::format("Send completed. Key was {}", key));
keys.push_back(key);
}
return keys;
}
return {};
}
// cppcheck-suppress unusedFunction
void netio3::libfabric::CqReactor::on_recv_socket_cq_event(ReceiveSocket& rsocket) const
{
ZoneScoped;
ERS_DEBUG(2, "Entered");
std::array<fi_cq_data_entry, MAX_CQ_EVENTS> completion_entry{};
const auto ret =
fi_cq_read(rsocket.get_endpoint().rcq.get(), &completion_entry, rsocket.get_endpoint().cq_size);
ERS_DEBUG(
3, std::format("recv socket fd {}, {} completion events", rsocket.get_cq_ev_ctx().fd, ret));
if (ret < 0) {
if (ret == -FI_EAGAIN) {
fid* fp = &rsocket.get_endpoint().rcq->fid;
fi_trywait(m_fabric, &fp, 1);
return;
}
if (ret == -FI_EAVAIL) {
fi_cq_err_entry err_entry{};
const auto r = fi_cq_readerr(rsocket.get_endpoint().cq.get(), &err_entry, 0);
if (r < 0) {
ers::error(LibFabricError(
ERS_HERE,
std::format(
"Failed to retrieve details on Completion Queue error of recv socket, error {} - {}",
r,
fi_strerror(-r))));
}
ers::error(LibFabricError(ERS_HERE,
std::format("code {} reading Completion Queue of recv socket: ",
err_entry.err,
fi_strerror(err_entry.err))));
ers::error(LibFabricError(ERS_HERE,
std::format("Provider-specific error {} - {}",
err_entry.prov_errno,
fi_cq_strerror(rsocket.get_endpoint().cq.get(),
err_entry.prov_errno,
err_entry.err_data,
nullptr,
0))));
if (err_entry.err == FI_EIO) {
// I/O error, the CM event code can handle this
ERS_LOG("Send socket CQ I/O error, connection possibly closed: ignored");
return;
}
if (err_entry.err == FI_ECANCELED) {
// Operation Cancelled
ERS_LOG("Send socket CQ operation cancelled");
return;
}
} else {
ers::error(LibFabricError(ERS_HERE,
std::format("Recv socket unhandled Completion Queue error {} - {}",
ret,
fi_strerror(-ret))));
}
} else {
for (long i = 0; i < ret; ++i) {
auto* buffer = static_cast<Buffer*>(completion_entry.at(i).op_context);
const auto* data = buffer->get_buffer();
const auto size = completion_entry.at(i).len;
ERS_DEBUG(3, std::format("Packet received, size={}", size));
if ((completion_entry.at(i).flags & FI_REMOTE_CQ_DATA) != 0U) {
ERS_DEBUG(1, "Completion has remote CQ data");
[[maybe_unused]] const auto imm = completion_entry.at(i).data;
ERS_DEBUG(1, std::format("recv completion immediate data: {:#x}", imm));
} else {
ERS_DEBUG(1, std::format("Completion has NO remote CQ data, size is: {}", size));
}
if (m_on_data_cb != nullptr) {
m_on_data_cb({data, size});
}
// FLX-1194, posting buffers to be done by USER !! TODO: check
// how to handle this now in netio3. For testing reasons, I am
// adding it again to post the buffer right away
rsocket.post_buffer(buffer);
}
}
}
template std::vector<std::uint64_t> netio3::libfabric::CqReactor::on_send_socket_cq_event<
netio3::libfabric::SendSocketBuffered>(netio3::libfabric::SendSocketBuffered&) const;
template std::vector<std::uint64_t> netio3::libfabric::CqReactor::on_send_socket_cq_event<
netio3::libfabric::SendSocketZeroCopy>(netio3::libfabric::SendSocketZeroCopy&) const;