Program Listing for File tohost_buffer_file.cpp

Return to documentation for file (tohost_buffer_file.cpp)

#include <chrono>
#include <fcntl.h>
#include <stdexcept>
#include "tohost_buffer.hpp"
#include "log.hpp"

FileToHostBuffer::FileToHostBuffer(int dmaid, std::shared_ptr<Device> d,
    std::string& filename, unsigned int block_rate, bool repeat)
    : ToHostBuffer(dmaid, d), m_block_rate(block_rate), m_repeat(repeat)
{
    // use open rather than fopen for fifos
    int fd = open(filename.c_str(), O_RDONLY | O_NONBLOCK);
    m_fp = fdopen(fd, "r");
    if (m_fp == nullptr) {
        throw std::runtime_error("Cannot open file");
    }
    LOG_INFO("Input from file %s in %s mode", filename.c_str(),  m_repeat ? "repeat" : "non-repeat");
    m_block_size = m_device->get_block_size();
    m_wr_odd = false;
    m_rd_odd = false;
}


FileToHostBuffer::~FileToHostBuffer()
{
    m_stop_flag = true;
    m_writer_thread.join();
    LOG_INFO("Writer thread joined");
    fclose(m_fp);
}


void FileToHostBuffer::allocate_buffer(size_t size,
                            const std::string& name,
                            bool vmem, bool free_previous_cmem)
{
    m_size = size;
    if (vmem) {
         m_buffer = std::make_unique<VmemBuffer>(size);
    }
    else {
        m_buffer = std::make_unique<CmemBuffer>(size, name, free_previous_cmem);
    }
    m_block_size = m_device->get_block_size();
}


void FileToHostBuffer::dma_start_continuous()
{
    m_buffer->pend = m_buffer->paddr + m_buffer->size;
    m_buffer->pc_ptr = m_buffer->paddr;
    m_buffer->emu_fw_ptr = m_buffer->paddr;
    LOG_INFO("  cmem buffer [0x%x,0x%x] %lu Blocks", m_buffer->paddr, m_buffer->pend, m_buffer->size/1024);
    LOG_INFO("  cmem virtual address 0x%x", m_buffer->vaddr);
    LOG_INFO("  fw_ptr 0x%x", m_buffer->emu_fw_ptr);
    LOG_INFO("  pc_ptr 0x%x", m_buffer->pc_ptr);
    LOG_INFO("Spawning file-to-buffer writer thread...");
    m_writer_thread = std::thread(&FileToHostBuffer::write_in_dma_buffer, this);
}


void FileToHostBuffer::dma_wait_for_data_irq()
{
    while(!m_stop_flag){
        std::unique_lock<std::mutex> lk(m_irq_mutex);
        m_irq_cond.wait(lk); //No condition given: this allows spurious wake-up calls...
    }
}


size_t FileToHostBuffer::dma_bytes_available()
{
    std::shared_lock<std::shared_mutex> lock(m_driver_mutex);
    bool even = (m_rd_odd == m_wr_odd);
    return dma_compute_bytes_to_read(m_buffer->pc_ptr, m_buffer->emu_fw_ptr, even);
}


bool FileToHostBuffer::dma_is_full()
{
    std::shared_lock<std::shared_mutex> lock(m_driver_mutex);
    return ((m_buffer->emu_fw_ptr == m_buffer->pc_ptr) and (m_rd_odd != m_wr_odd));
}


size_t FileToHostBuffer::dma_bytes_available_nowrap()
{
    size_t available = dma_bytes_available();
    return MIN(available, m_buffer->pend - m_buffer->pc_ptr);
}


uint64_t FileToHostBuffer::dma_get_write_ptr()
{
    std::shared_lock<std::shared_mutex> lock(m_driver_mutex);
    return (m_buffer->vaddr + (m_buffer->emu_fw_ptr - m_buffer->paddr));
}


uint64_t FileToHostBuffer::dma_get_read_ptr()
{
    std::shared_lock<std::shared_mutex> lock(m_driver_mutex);
    return (m_buffer->vaddr + (m_buffer->pc_ptr - m_buffer->paddr));
}


uint64_t FileToHostBuffer::dma_get_write_offset()
{
    std::shared_lock<std::shared_mutex> lock(m_driver_mutex);
    return (m_buffer->emu_fw_ptr - m_buffer->paddr);
}


uint64_t FileToHostBuffer::dma_get_read_offset()
{
    std::shared_lock<std::shared_mutex> lock(m_driver_mutex);
    return (m_buffer->pc_ptr - m_buffer->paddr);
}


void FileToHostBuffer::dma_set_read_ptr_vaddr(uint64_t v_addr)
{
    uint64_t p_addr = m_buffer->paddr + (v_addr - m_buffer->vaddr);
    set_read_ptr_paddr(p_addr);
}


void FileToHostBuffer::dma_set_read_ptr_paddr(uint64_t p_addr)
{
    set_read_ptr_paddr(p_addr);
}


void FileToHostBuffer::set_read_ptr_paddr(uint64_t p_addr)
{
    std::scoped_lock<std::shared_mutex> lock(m_driver_mutex);
    if (p_addr == m_buffer->pc_ptr){ return; }

    if ( p_addr < m_buffer->pc_ptr) {
        unsigned int diff = (m_buffer->pc_ptr - p_addr);
        if (diff < 16*1024*1024){
            LOG_ERR("rd ptr backwards of %u bytes. Old rd 0x%lx new 0x%lx ", diff, m_buffer->pc_ptr, p_addr);
        }
        m_rd_odd = !m_rd_odd;
    }

    m_buffer->pc_ptr = p_addr;
    m_writer_cond.notify_one();
}


void FileToHostBuffer::dma_advance_read_ptr(size_t bytes)
{
    if (bytes == 0){return;}
    std::scoped_lock<std::shared_mutex> lock(m_driver_mutex);
    m_buffer->pc_ptr += bytes;
    if(m_buffer->pc_ptr == m_buffer->pend){
        m_buffer->pc_ptr = m_buffer->paddr;
        m_rd_odd = !m_rd_odd;
    }
    m_writer_cond.notify_one();
}


// private
void FileToHostBuffer::write_in_dma_buffer()
{
    LOG_INFO("write_in_dma_buffer thread started");
    uint64_t rd_ptr, wr_ptr;
    bool even;

    while (!m_stop_flag){

        //Block on conditional variable if the DMA buffer is full
        //to prevent a continuous lock of m_driver_mutex.
        //This is realistic: an FLX card does not produce data nor interrupts
        //when the DMA is full.
        bool is_full = dma_is_full();
        if (is_full){
            std::unique_lock<std::mutex> lk(m_stop_writer_mutex);
            using namespace std::chrono_literals;
            m_writer_cond.wait_for(lk, 100ms, [&]{return !dma_is_full();});
        }

        {
            std::shared_lock<std::shared_mutex> lock(m_driver_mutex);
            wr_ptr = m_buffer->emu_fw_ptr;
            rd_ptr = m_buffer->pc_ptr;
            even = (m_rd_odd == m_wr_odd);
        }

        size_t max_blocks = blocks_to_write(rd_ptr, wr_ptr, even);
        void *ptr = reinterpret_cast<void*>(m_buffer->vaddr + (wr_ptr - m_buffer->paddr));

        if ((wr_ptr + max_blocks*m_block_size) > m_buffer->pend) {
            throw std::range_error("Writing past end of buffer");
        }

        auto start = std::chrono::system_clock::now();

        size_t count = fread(ptr, m_block_size, max_blocks, m_fp);
        if (count > max_blocks) {
            throw std::range_error("fread wrote more blocks than allowed!");
        }

        auto end = std::chrono::system_clock::now();

        if (count > 0)
        {
            std::scoped_lock<std::shared_mutex> lock(m_driver_mutex);
            writer_updates_fw_ptr(count);
            LOG_TRACE("Written %lu blocks at 0x%lx, fw_ptr 0x%lx pc_ptr 0x%lx [0x%lx, 0x%lx]", count, m_buffer->vaddr + (wr_ptr - m_buffer->paddr), m_buffer->emu_fw_ptr, m_buffer->pc_ptr, m_buffer->paddr, m_buffer->pend);
            LOG_TRACE("Block header 0x%x, second word 0x%x", *(uint32_t*)ptr, *((uint32_t*)ptr + 1) );
        }

        m_irq_cond.notify_one();
        if (reset_file()) { break; }

        if(m_block_rate > 0 && count > 0){
            auto elapsed = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start);
            throttle_writer(elapsed, count);
        }
    }
}


bool FileToHostBuffer::reset_file()
{
    bool stop = false;
    if (feof(m_fp)){
        if (m_repeat){
            fseek(m_fp, 0, SEEK_SET);
        } else {
            stop = true;
        }
    }
    return stop;
}


size_t FileToHostBuffer::blocks_to_write(uint64_t rd_ptr, uint64_t wr_ptr, bool even)
{
    size_t max_blocks{0};
    if (wr_ptr > rd_ptr) {
        //Only up to end of buffer
        max_blocks = (m_size - (wr_ptr-m_buffer->paddr)) / m_block_size;
    }
    else if (wr_ptr < rd_ptr) {
        max_blocks = (rd_ptr - wr_ptr) / m_block_size;
    }
    else {
        if (even) {
            //In this case the DMA buffer is empty, but we write blocks
            //only until the end of the buffer.
            max_blocks = (m_size - (wr_ptr-m_buffer->paddr)) / m_block_size;
        } else {
            max_blocks = 0;
        }
    }
    max_blocks = limit_block_rate(max_blocks);
    return max_blocks;
}


size_t FileToHostBuffer::limit_block_rate(size_t max_blocks)
{
    if (m_block_rate == 0) {
        return max_blocks;
    } else if (m_block_rate < 6000){
        return (max_blocks > 1) ? 1 : max_blocks;
    } else if (m_block_rate < 12000){
        return (max_blocks > 2) ? 2 : max_blocks;
    } else if (m_block_rate < 120000){
        return (max_blocks > 10) ? 10 : max_blocks;
    } else {
        return (max_blocks > 32) ? 32 : max_blocks;
    }
}


void FileToHostBuffer::throttle_writer(const std::chrono::nanoseconds & elapsed, size_t count)
{
    long block_read_time = elapsed.count()/count;
    long target_time = 1.0e9*(1.0/(double)m_block_rate);
    long sleep_time = target_time - block_read_time;
    std::this_thread::sleep_for(std::chrono::nanoseconds(sleep_time));
}


void FileToHostBuffer::writer_updates_fw_ptr(size_t written_blocks)
{
    LOG_TRACE("New wr ptr 0x%lx start 0x%lx end 0x%lx", m_buffer->emu_fw_ptr, m_buffer->paddr, m_buffer->pend);
    m_buffer->emu_fw_ptr += (m_block_size*written_blocks);
    if (m_buffer->emu_fw_ptr > m_buffer->pend){
        throw std::range_error("Moving write pointer beyond end of buffer");
    }
    if (m_buffer->emu_fw_ptr == m_buffer->pend){
        m_buffer->emu_fw_ptr = m_buffer->paddr;
        m_wr_odd = !m_wr_odd;
    }
}