Program Listing for File CqReactor.cpp

Return to documentation for file (BackendLibfabric/CqReactor.cpp)

#include "CqReactor.hpp"

#include <utility>

#include <rdma/fi_eq.h>

#include <tracy/Tracy.hpp>

#include "Issues.hpp"
#include "SendSocketZeroCopy.hpp"
#include "SendSocketBuffered.hpp"

netio3::libfabric::CqReactor::CqReactor(fid_fabric* fabric, OnDataCb on_data_cb) :
  m_fabric{fabric}, m_on_data_cb{std::move(on_data_cb)}
{}

template<netio3::libfabric::SendSocketConcept SocketType>
std::vector<std::uint64_t> netio3::libfabric::CqReactor::on_send_socket_cq_event(
  SocketType& ssocket) const
{
  ZoneScoped;
  ERS_DEBUG(2, "Entered");
  std::array<fi_cq_data_entry, MAX_CQ_EVENTS> completion_entry{};
  const auto ret =
    fi_cq_read(ssocket.get_endpoint().cq.get(), &completion_entry, ssocket.get_endpoint().cq_size);
  ERS_DEBUG(
    3, std::format("send socket fd {} : {} completion events", ssocket.get_eq_ev_ctx().fd, ret));
  if (ret < 0) {
    if (ret == -FI_EAGAIN) {  // If no completions are available to return from the CQ, -FI_EAGAIN
                              // will be returned.
      fid* fp = &ssocket.get_endpoint().cq->fid;
      fi_trywait(m_fabric, &fp, 1);
      return {};
    }
    if (ret == -FI_EAVAIL) {
      fi_cq_err_entry err_entry{};
      const auto r = fi_cq_readerr(ssocket.get_endpoint().cq.get(), &err_entry, 0);
      if (r < 0) {
        ers::error(LibFabricCqError(
          ERS_HERE,
          std::format(
            "Failed to retrieve details on Completion Queue error of send socket, error {} - {}",
            r,
            fi_strerror(-r))));
      }
      ers::error(LibFabricCqError(ERS_HERE,
                                  std::format("Completion Queue read error {} of send socket: ",
                                              err_entry.err,
                                              fi_strerror(err_entry.err))));
      ers::error(LibFabricCqError(ERS_HERE,
                                  std::format("Provider-specific error {}: {}",
                                              err_entry.prov_errno,
                                              fi_cq_strerror(ssocket.get_endpoint().cq.get(),
                                                             err_entry.prov_errno,
                                                             err_entry.err_data,
                                                             nullptr,
                                                             0))));
      if (err_entry.err == FI_EIO) {
        // I/O error, the CM event code can handle this
        ERS_LOG("Send socket CQ I/O error, connection possibly closed: ignored");
        return {};
      }
      if (err_entry.err == FI_ECANCELED) {
        // Operation Cancelled
        ERS_LOG("Send socket CQ operation cancelled");
        return {};
      }
      ers::error(LibFabricCqError(
        ERS_HERE,
        std::format("Send socket Completion Queue unhandled specific read error {}: {}",
                    ret,
                    fi_strerror(err_entry.err))));
    } else {
      ers::error(LibFabricCqError(
        ERS_HERE,
        std::format(
          "Send socket Completion Queue unhandled read error {} - {}", ret, fi_strerror(-ret))));
    }
  } else {
    std::vector<std::uint64_t> keys;
    keys.reserve(ret);
    for (long i = 0; i < ret; ++i) {
      auto buffer_idx = reinterpret_cast<std::uint64_t>(completion_entry.at(i).op_context);
      const auto key = ssocket.release_buffer(buffer_idx);
      ERS_DEBUG(2, std::format("Send completed. Key was {}", key));
      keys.push_back(key);
    }
    return keys;
  }
  return {};
}

// cppcheck-suppress unusedFunction
void netio3::libfabric::CqReactor::on_recv_socket_cq_event(ReceiveSocket& rsocket) const
{
  ZoneScoped;
  ERS_DEBUG(2, "Entered");
  std::array<fi_cq_data_entry, MAX_CQ_EVENTS> completion_entry{};
  const auto ret =
    fi_cq_read(rsocket.get_endpoint().rcq.get(), &completion_entry, rsocket.get_endpoint().cq_size);
  ERS_DEBUG(
    3, std::format("recv socket fd {}, {} completion events", rsocket.get_cq_ev_ctx().fd, ret));

  if (ret < 0) {
    if (ret == -FI_EAGAIN) {
      fid* fp = &rsocket.get_endpoint().rcq->fid;
      fi_trywait(m_fabric, &fp, 1);
      return;
    }
    if (ret == -FI_EAVAIL) {
      fi_cq_err_entry err_entry{};
      const auto r = fi_cq_readerr(rsocket.get_endpoint().cq.get(), &err_entry, 0);
      if (r < 0) {
        ers::error(LibFabricError(
          ERS_HERE,
          std::format(
            "Failed to retrieve details on Completion Queue error of recv socket, error {} - {}",
            r,
            fi_strerror(-r))));
      }
      ers::error(LibFabricError(ERS_HERE,
                                std::format("code {} reading Completion Queue of recv socket: ",
                                            err_entry.err,
                                            fi_strerror(err_entry.err))));
      ers::error(LibFabricError(ERS_HERE,
                                std::format("Provider-specific error {} - {}",
                                            err_entry.prov_errno,
                                            fi_cq_strerror(rsocket.get_endpoint().cq.get(),
                                                           err_entry.prov_errno,
                                                           err_entry.err_data,
                                                           nullptr,
                                                           0))));
      if (err_entry.err == FI_EIO) {
        // I/O error, the CM event code can handle this
        ERS_LOG("Send socket CQ I/O error, connection possibly closed: ignored");
        return;
      }
      if (err_entry.err == FI_ECANCELED) {
        // Operation Cancelled
        ERS_LOG("Send socket CQ operation cancelled");
        return;
      }
    } else {
      ers::error(LibFabricError(ERS_HERE,
                                std::format("Recv socket unhandled Completion Queue error {} - {}",
                                            ret,
                                            fi_strerror(-ret))));
    }
  } else {
    for (long i = 0; i < ret; ++i) {
      auto* buffer = static_cast<Buffer*>(completion_entry.at(i).op_context);
      const auto* data = buffer->get_buffer();
      const auto size = completion_entry.at(i).len;
      ERS_DEBUG(3, std::format("Packet received, size={}", size));
      if ((completion_entry.at(i).flags & FI_REMOTE_CQ_DATA) != 0U) {
        ERS_DEBUG(1, "Completion has remote CQ data");
        [[maybe_unused]] const auto imm = completion_entry.at(i).data;
        ERS_DEBUG(1, std::format("recv completion immediate data: {:#x}", imm));
      } else {
        ERS_DEBUG(1, std::format("Completion has NO remote CQ data, size is: {}", size));
      }
      if (m_on_data_cb != nullptr) {
        m_on_data_cb({data, size});
      }
      // FLX-1194, posting buffers to be done by USER !! TODO: check
      // how to handle this now in netio3. For testing reasons, I am
      // adding it again to post the buffer right away
      rsocket.post_buffer(buffer);
    }
  }
}

template std::vector<std::uint64_t> netio3::libfabric::CqReactor::on_send_socket_cq_event<
  netio3::libfabric::SendSocketBuffered>(netio3::libfabric::SendSocketBuffered&) const;
template std::vector<std::uint64_t> netio3::libfabric::CqReactor::on_send_socket_cq_event<
  netio3::libfabric::SendSocketZeroCopy>(netio3::libfabric::SendSocketZeroCopy&) const;