Program Listing for File BufferFormatter.cpp

Return to documentation for file (BufferFormatter.cpp)

#include "netio3/BufferFormatter.hpp"

#include <cmath>
#include <numeric>

#include <netio3-backend/Netio3Backend.hpp>

using namespace netio3;

std::string netio3::formatter_status_string(FormatterStatus status) {
    switch (status) {
    case FormatterStatus::BUFFER_OK:
        return "OK";
        break;
    case FormatterStatus::BUFFER_READY:
        return "READY";
    case FormatterStatus::MESSAGE_TOO_BIG:
        return "Message too big";
    default:
        return "unknown";
    }
}

void BufferFormatter::fill_header(NetworkBuffer* buf, uint8_t status, uint64_t tag, std::size_t payload_size) {
    const bool has_status  = status != 0;
    const bool has_tag     = m_buffer_empty || tag != m_previous_tag;
    uint32_t msg_total_size = payload_size;

    if (has_status) [[unlikely]] {
        msg_total_size += sizeof(status);
    }
    if (has_tag) {
        msg_total_size += sizeof(tag);
    }

    const uint32_t header = msg_total_size | (has_status << HDR_STATUS_FLAG_POS) | (has_tag << HDR_TAG_FLAG_POS);

    buf->write(header);

    if (has_status) [[unlikely]] {
        buf->write(status);
    }
    if (has_tag) {
        buf->write(tag);
        m_previous_tag = tag;
        m_buffer_empty = false;
    }
}

bool BufferFormatter::check_size(NetworkBuffer* buf, std::size_t payload_size) {
    return (payload_size < MSG_MAX_SIZE) and (buf->pos() + payload_size + HDR_MAX_SIZE <= buf->size());
}

FormatterStatus BufferFormatter::write(NetworkBuffer* buf, uint8_t status, uint64_t tag, const std::span<const iovec> data){
    const auto payload_size = std::accumulate(
      data.begin(), data.end(), std::size_t{0}, [](std::size_t tot, iovec entry) {
          return tot + entry.iov_len;
      });

    if (!check_size(buf, payload_size)) {
        return FormatterStatus::MESSAGE_TOO_BIG;
    }
    fill_header(buf, status, tag, payload_size);

    // Fill buffer with the payload
    for (const auto& iov : data) {
        buf->write(iov);
    }
    return FormatterStatus::BUFFER_OK;
}

FormatterStatus BufferFormatter::write(NetworkBuffer* buf, uint8_t status, uint64_t tag, std::span<const std::uint8_t> data){
    const auto payload_size = data.size();

    if (!check_size(buf, payload_size)) {
        return FormatterStatus::MESSAGE_TOO_BIG;
    }
    fill_header(buf, status, tag, payload_size);

    buf->write(data);
    return FormatterStatus::BUFFER_OK;
}

FormatterStatus BufferFormatter::write(NetworkBuffer* buf, uint64_t tag, const std::span<const iovec> data){
    const auto payload_size = std::accumulate(
      data.begin(), data.end(), std::size_t{0}, [](std::size_t tot, iovec entry) {
          return tot + entry.iov_len;
      });
    if (!check_size(buf, payload_size)) {
        return FormatterStatus::MESSAGE_TOO_BIG;
    }
    /*               0 = no status   */
    fill_header(buf, 0, tag, payload_size);

    // Fill buffer with the payload
    for (const auto& iov : data) {
        buf->write(iov);
    }
    return FormatterStatus::BUFFER_OK;
}

FormatterStatus BufferFormatter::write(NetworkBuffer* buf, uint64_t tag, std::span<const std::uint8_t> data){
    const auto payload_size = data.size();
    /*                                               0 = no status   */
    if (!check_size(buf, payload_size)) {
        return FormatterStatus::MESSAGE_TOO_BIG;
    }
    /*               0 = no status   */
    fill_header(buf, 0, tag, payload_size);

    buf->write(data);
    return FormatterStatus::BUFFER_OK;
}

void BufferFormatter::decode(std::span<const std::uint8_t> buf, const std::function<void(std::uint64_t, std::span<const std::uint8_t>, std::uint8_t)>& cb){
    std::size_t current_pos = 0;
    const uint8_t* data_ptr = buf.data();
    bool is_first_msg = true;

    uint64_t previous_tag = UINT64_MAX;
    while (current_pos < buf.size()) {

        uint32_t msg_header;
        if(current_pos + sizeof(msg_header) > buf.size()) break;
        memcpy(&msg_header, data_ptr + current_pos, sizeof(msg_header));
        current_pos += sizeof(msg_header);

        uint8_t status = 0;
        if (msg_header & (1U << HDR_STATUS_FLAG_POS)) { // Check if there is a status byte
            if(current_pos + sizeof(status) > buf.size()) break;
            msg_header ^= (1U << HDR_STATUS_FLAG_POS); // Clear the most significant bit
            memcpy(&status, data_ptr + current_pos, sizeof(status));
            current_pos += sizeof(status);
            msg_header -= sizeof(status);
        }

        uint64_t tag = previous_tag;
        if (msg_header & (1U << HDR_TAG_FLAG_POS)) { // Check if there is a tag
            if(current_pos + sizeof(tag) > buf.size()) break;
            msg_header ^= (1U << HDR_TAG_FLAG_POS); // Clear the second most significant bit
            memcpy(&tag, data_ptr + current_pos, sizeof(tag));
            current_pos += sizeof(tag);
            msg_header -= sizeof(tag);
            previous_tag = tag;
        }else if(is_first_msg){
            throw std::runtime_error("First message must have a tag. Something bad happened.");
        }

        if(current_pos + msg_header > buf.size()) break;
        std::span<const uint8_t> payload = buf.subspan(current_pos, msg_header);
        current_pos += msg_header;
        std::invoke(cb, tag, payload, status);

        is_first_msg = false;
    }
}

void BufferFormatter::reset_buffer(NetworkBuffer* buf){
    buf->reset();
    m_buffer_empty = true;
}