Program Listing for File tohost_buffer_file.cpp
↰ Return to documentation for file (tohost_buffer_file.cpp
)
#include <chrono>
#include <fcntl.h>
#include <stdexcept>
#include "tohost_buffer.hpp"
#include "log.hpp"
FileToHostBuffer::FileToHostBuffer(int dmaid, std::shared_ptr<Device> d,
std::string& filename, unsigned int block_rate, bool repeat)
: ToHostBuffer(dmaid, d), m_block_rate(block_rate), m_repeat(repeat)
{
// use open rather than fopen for fifos
int fd = open(filename.c_str(), O_RDONLY | O_NONBLOCK);
m_fp = fdopen(fd, "r");
if (m_fp == nullptr) {
throw std::runtime_error("Cannot open file");
}
LOG_INFO("Input from file %s in %s mode", filename.c_str(), m_repeat ? "repeat" : "non-repeat");
m_block_size = m_device->get_block_size();
m_wr_odd = false;
m_rd_odd = false;
}
FileToHostBuffer::~FileToHostBuffer()
{
m_stop_flag = true;
m_writer_thread.join();
LOG_INFO("Writer thread joined");
fclose(m_fp);
}
void FileToHostBuffer::allocate_buffer(size_t size,
const std::string& name,
bool vmem, bool free_previous_cmem)
{
m_size = size;
if (vmem) {
m_buffer = std::make_unique<VmemBuffer>(size);
}
else {
m_buffer = std::make_unique<CmemBuffer>(size, name, free_previous_cmem);
}
m_block_size = m_device->get_block_size();
}
void FileToHostBuffer::dma_start_continuous()
{
m_buffer->pend = m_buffer->paddr + m_buffer->size;
m_buffer->pc_ptr = m_buffer->paddr;
m_buffer->emu_fw_ptr = m_buffer->paddr;
LOG_INFO(" cmem buffer [0x%x,0x%x] %lu Blocks", m_buffer->paddr, m_buffer->pend, m_buffer->size/1024);
LOG_INFO(" cmem virtual address 0x%x", m_buffer->vaddr);
LOG_INFO(" fw_ptr 0x%x", m_buffer->emu_fw_ptr);
LOG_INFO(" pc_ptr 0x%x", m_buffer->pc_ptr);
LOG_INFO("Spawning file-to-buffer writer thread...");
m_writer_thread = std::thread(&FileToHostBuffer::write_in_dma_buffer, this);
}
void FileToHostBuffer::dma_wait_for_data_irq()
{
while(!m_stop_flag){
std::unique_lock<std::mutex> lk(m_irq_mutex);
m_irq_cond.wait(lk); //No condition given: this allows spurious wake-up calls...
}
}
size_t FileToHostBuffer::dma_bytes_available()
{
std::shared_lock<std::shared_mutex> lock(m_driver_mutex);
bool even = (m_rd_odd == m_wr_odd);
return dma_compute_bytes_to_read(m_buffer->pc_ptr, m_buffer->emu_fw_ptr, even);
}
bool FileToHostBuffer::dma_is_full()
{
std::shared_lock<std::shared_mutex> lock(m_driver_mutex);
return ((m_buffer->emu_fw_ptr == m_buffer->pc_ptr) and (m_rd_odd != m_wr_odd));
}
size_t FileToHostBuffer::dma_bytes_available_nowrap()
{
size_t available = dma_bytes_available();
return MIN(available, m_buffer->pend - m_buffer->pc_ptr);
}
uint64_t FileToHostBuffer::dma_get_write_ptr()
{
std::shared_lock<std::shared_mutex> lock(m_driver_mutex);
return (m_buffer->vaddr + (m_buffer->emu_fw_ptr - m_buffer->paddr));
}
uint64_t FileToHostBuffer::dma_get_read_ptr()
{
std::shared_lock<std::shared_mutex> lock(m_driver_mutex);
return (m_buffer->vaddr + (m_buffer->pc_ptr - m_buffer->paddr));
}
uint64_t FileToHostBuffer::dma_get_write_offset()
{
std::shared_lock<std::shared_mutex> lock(m_driver_mutex);
return (m_buffer->emu_fw_ptr - m_buffer->paddr);
}
uint64_t FileToHostBuffer::dma_get_read_offset()
{
std::shared_lock<std::shared_mutex> lock(m_driver_mutex);
return (m_buffer->pc_ptr - m_buffer->paddr);
}
void FileToHostBuffer::dma_set_read_ptr_vaddr(uint64_t v_addr)
{
uint64_t p_addr = m_buffer->paddr + (v_addr - m_buffer->vaddr);
set_read_ptr_paddr(p_addr);
}
void FileToHostBuffer::dma_set_read_ptr_paddr(uint64_t p_addr)
{
set_read_ptr_paddr(p_addr);
}
void FileToHostBuffer::set_read_ptr_paddr(uint64_t p_addr)
{
std::scoped_lock<std::shared_mutex> lock(m_driver_mutex);
if (p_addr == m_buffer->pc_ptr){ return; }
if ( p_addr < m_buffer->pc_ptr) {
unsigned int diff = (m_buffer->pc_ptr - p_addr);
if (diff < 16*1024*1024){
LOG_ERR("rd ptr backwards of %u bytes. Old rd 0x%lx new 0x%lx ", diff, m_buffer->pc_ptr, p_addr);
}
m_rd_odd = !m_rd_odd;
}
m_buffer->pc_ptr = p_addr;
m_writer_cond.notify_one();
}
void FileToHostBuffer::dma_advance_read_ptr(size_t bytes)
{
if (bytes == 0){return;}
std::scoped_lock<std::shared_mutex> lock(m_driver_mutex);
m_buffer->pc_ptr += bytes;
if(m_buffer->pc_ptr == m_buffer->pend){
m_buffer->pc_ptr = m_buffer->paddr;
m_rd_odd = !m_rd_odd;
}
m_writer_cond.notify_one();
}
// private
void FileToHostBuffer::write_in_dma_buffer()
{
LOG_INFO("write_in_dma_buffer thread started");
uint64_t rd_ptr, wr_ptr;
bool even;
while (!m_stop_flag){
//Block on conditional variable if the DMA buffer is full
//to prevent a continuous lock of m_driver_mutex.
//This is realistic: an FLX card does not produce data nor interrupts
//when the DMA is full.
bool is_full = dma_is_full();
if (is_full){
std::unique_lock<std::mutex> lk(m_stop_writer_mutex);
using namespace std::chrono_literals;
m_writer_cond.wait_for(lk, 100ms, [&]{return !dma_is_full();});
}
{
std::shared_lock<std::shared_mutex> lock(m_driver_mutex);
wr_ptr = m_buffer->emu_fw_ptr;
rd_ptr = m_buffer->pc_ptr;
even = (m_rd_odd == m_wr_odd);
}
size_t max_blocks = blocks_to_write(rd_ptr, wr_ptr, even);
void *ptr = reinterpret_cast<void*>(m_buffer->vaddr + (wr_ptr - m_buffer->paddr));
if ((wr_ptr + max_blocks*m_block_size) > m_buffer->pend) {
throw std::range_error("Writing past end of buffer");
}
auto start = std::chrono::system_clock::now();
size_t count = fread(ptr, m_block_size, max_blocks, m_fp);
if (count > max_blocks) {
throw std::range_error("fread wrote more blocks than allowed!");
}
auto end = std::chrono::system_clock::now();
if (count > 0)
{
std::scoped_lock<std::shared_mutex> lock(m_driver_mutex);
writer_updates_fw_ptr(count);
LOG_TRACE("Written %lu blocks at 0x%lx, fw_ptr 0x%lx pc_ptr 0x%lx [0x%lx, 0x%lx]", count, m_buffer->vaddr + (wr_ptr - m_buffer->paddr), m_buffer->emu_fw_ptr, m_buffer->pc_ptr, m_buffer->paddr, m_buffer->pend);
LOG_TRACE("Block header 0x%x, second word 0x%x", *(uint32_t*)ptr, *((uint32_t*)ptr + 1) );
}
m_irq_cond.notify_one();
if (reset_file()) { break; }
if(m_block_rate > 0 && count > 0){
auto elapsed = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start);
throttle_writer(elapsed, count);
}
}
}
bool FileToHostBuffer::reset_file()
{
bool stop = false;
if (feof(m_fp)){
if (m_repeat){
fseek(m_fp, 0, SEEK_SET);
} else {
stop = true;
}
}
return stop;
}
size_t FileToHostBuffer::blocks_to_write(uint64_t rd_ptr, uint64_t wr_ptr, bool even)
{
size_t max_blocks{0};
if (wr_ptr > rd_ptr) {
//Only up to end of buffer
max_blocks = (m_size - (wr_ptr-m_buffer->paddr)) / m_block_size;
}
else if (wr_ptr < rd_ptr) {
max_blocks = (rd_ptr - wr_ptr) / m_block_size;
}
else {
if (even) {
//In this case the DMA buffer is empty, but we write blocks
//only until the end of the buffer.
max_blocks = (m_size - (wr_ptr-m_buffer->paddr)) / m_block_size;
} else {
max_blocks = 0;
}
}
max_blocks = limit_block_rate(max_blocks);
return max_blocks;
}
size_t FileToHostBuffer::limit_block_rate(size_t max_blocks)
{
if (m_block_rate == 0) {
return max_blocks;
} else if (m_block_rate < 6000){
return (max_blocks > 1) ? 1 : max_blocks;
} else if (m_block_rate < 12000){
return (max_blocks > 2) ? 2 : max_blocks;
} else if (m_block_rate < 120000){
return (max_blocks > 10) ? 10 : max_blocks;
} else {
return (max_blocks > 32) ? 32 : max_blocks;
}
}
void FileToHostBuffer::throttle_writer(const std::chrono::nanoseconds & elapsed, size_t count)
{
long block_read_time = elapsed.count()/count;
long target_time = 1.0e9*(1.0/(double)m_block_rate);
long sleep_time = target_time - block_read_time;
std::this_thread::sleep_for(std::chrono::nanoseconds(sleep_time));
}
void FileToHostBuffer::writer_updates_fw_ptr(size_t written_blocks)
{
LOG_TRACE("New wr ptr 0x%lx start 0x%lx end 0x%lx", m_buffer->emu_fw_ptr, m_buffer->paddr, m_buffer->pend);
m_buffer->emu_fw_ptr += (m_block_size*written_blocks);
if (m_buffer->emu_fw_ptr > m_buffer->pend){
throw std::range_error("Moving write pointer beyond end of buffer");
}
if (m_buffer->emu_fw_ptr == m_buffer->pend){
m_buffer->emu_fw_ptr = m_buffer->paddr;
m_wr_odd = !m_wr_odd;
}
}