Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use shared ringbuffers #378

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ cpal = { version = "0.13.3", optional = true }
#rawsample = { path = "../../rust/rawsample" }
#rawsample = { git = "https://github.com/HEnquist/rawsample", branch = "main" }
rawsample = "0.2.0"
circular-queue = "0.2.6"
parking_lot = { version = "0.12.1", features = ["hardware-lock-elision"] }
crossbeam-channel = "0.5"
rayon = "1.10.0"
audio_thread_priority = { version = "0.32.0", default-features = false }
ringbuf = "0.4.7"

[build-dependencies]
version_check = "0.9"
Expand Down
12 changes: 7 additions & 5 deletions src/basicfilters.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::sync::Arc;

use circular_queue::CircularQueue;
use ringbuf::storage::Heap;
use ringbuf::traits::*;
use ringbuf::LocalRb;

use crate::audiodevice::AudioChunk;
use crate::biquad::{Biquad, BiquadCoefficients};
Expand All @@ -21,7 +23,7 @@ pub struct Gain {
pub struct Delay {
pub name: String,
samplerate: usize,
queue: CircularQueue<PrcFmt>,
queue: LocalRb<Heap<PrcFmt>>,
biquad: Option<Biquad>,
}

Expand Down Expand Up @@ -347,9 +349,9 @@ impl Delay {

// for super-small delays, store at least a single sample
let integerdelay = integerdelay.max(1);
let mut queue = CircularQueue::with_capacity(integerdelay);
let mut queue = LocalRb::new(integerdelay);
for _ in 0..integerdelay {
queue.push(0.0);
let _ = queue.try_push(0.0);
}

Self {
Expand Down Expand Up @@ -379,7 +381,7 @@ impl Filter for Delay {
fn process_waveform(&mut self, waveform: &mut [PrcFmt]) -> Res<()> {
for item in waveform.iter_mut() {
// this returns the item that was popped while pushing
*item = self.queue.push(*item).unwrap();
*item = self.queue.push_overwrite(*item).unwrap();
}
if let Some(bq) = &mut self.biquad {
bq.process_waveform(waveform)?;
Expand Down
91 changes: 41 additions & 50 deletions src/coreaudiodevice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::helpers::PIRateController;
use crossbeam_channel::{bounded, TryRecvError, TrySendError};
use dispatch::Semaphore;
use parking_lot::{RwLock, RwLockUpgradableReadGuard};
use ringbuf::{traits::*, HeapRb};
use rubato::VecResampler;
use std::collections::VecDeque;
use std::ffi::CStr;
Expand Down Expand Up @@ -363,7 +364,7 @@ fn open_coreaudio_capture(
}

enum PlaybackDeviceMessage {
Data(Vec<u8>),
Data(usize),
}

/// Start a playback thread listening for AudioMessages via a channel.
Expand Down Expand Up @@ -412,6 +413,9 @@ impl PlaybackDevice for CoreaudioPlaybackDevice {
let mut sample_queue: VecDeque<u8> =
VecDeque::with_capacity(16 * chunksize * blockalign);

let ringbuffer = HeapRb::<u8>::new(blockalign * ( 2 * chunksize + 2048 ));
let (mut device_producer, mut device_consumer) = ringbuffer.split();

let (mut audio_unit, device_id) = match open_coreaudio_playback(
&devname,
samplerate,
Expand Down Expand Up @@ -441,10 +445,10 @@ impl PlaybackDevice for CoreaudioPlaybackDevice {
while sample_queue.len() < (blockalign * num_frames) {
trace!("playback loop needs more samples, reading from channel");
match rx_dev.try_recv() {
Ok(PlaybackDeviceMessage::Data(chunk)) => {
Ok(PlaybackDeviceMessage::Data(bytes)) => {
trace!("got chunk");
for element in chunk.iter() {
sample_queue.push_back(*element);
for element in device_consumer.pop_iter().take(bytes) {
sample_queue.push_back(element);
}
if !running {
running = true;
Expand Down Expand Up @@ -494,6 +498,14 @@ impl PlaybackDevice for CoreaudioPlaybackDevice {
Ok(()) => {}
Err(_err) => {}
}

let mut buf = vec![
0u8;
channels
* chunksize
* SampleFormat::FLOAT32LE.bytes_per_sample()
];

debug!("Playback device ready and waiting");
barrier.wait();
debug!("Playback device starts now!");
Expand Down Expand Up @@ -563,12 +575,7 @@ impl PlaybackDevice for CoreaudioPlaybackDevice {
}
}
chunk.update_stats(&mut chunk_stats);
let mut buf = vec![
0u8;
channels
* chunk.frames
* SampleFormat::FLOAT32LE.bytes_per_sample()
];

conversion_result = chunk_to_buffer_rawbytes(
&chunk,
&mut buf,
Expand All @@ -588,7 +595,8 @@ impl PlaybackDevice for CoreaudioPlaybackDevice {
else {
xtrace!("playback status blocket, skip rms update");
}
match tx_dev.send(PlaybackDeviceMessage::Data(buf)) {
let bytes = device_producer.push_slice(&buf[0..conversion_result.0]);
match tx_dev.send(PlaybackDeviceMessage::Data(bytes)) {
Ok(_) => {}
Err(err) => {
error!("Playback device channel error: {err}");
Expand Down Expand Up @@ -686,19 +694,18 @@ impl CaptureDevice for CoreaudioCaptureDevice {
let callback_frames = 512;
// TODO check if always 512!
//trace!("Estimated playback callback period to {} frames", callback_frames);
let channel_capacity = 8*chunksize/callback_frames + 1;
let channel_capacity = 16*chunksize/callback_frames + 10;

debug!("Using a capture channel capacity of {channel_capacity} buffers.");
let (tx_dev, rx_dev) = bounded(channel_capacity);
let (tx_dev_free, rx_dev_free) = bounded(channel_capacity+2);
for _ in 0..(channel_capacity+2) {
let data = vec![0u8; 4*callback_frames*blockalign];
tx_dev_free.send(data).unwrap();
}

// Semaphore used to wake up the waiting capture thread from the callback.
let semaphore = Semaphore::new(0);
let device_sph = semaphore.clone();

let ringbuffer = HeapRb::<u8>::new(blockalign * ( 2 * chunksize + 2 * callback_frames ));
let (mut device_producer, mut device_consumer) = ringbuffer.split();

trace!("Build input stream");
let (mut audio_unit, device_id) = match open_coreaudio_capture(&devname, capture_samplerate, channels, &sample_format) {
Ok(audio_unit) => audio_unit,
Expand All @@ -715,33 +722,26 @@ impl CaptureDevice for CoreaudioCaptureDevice {

type Args = render_callback::Args<data::InterleavedBytes<f32>>;

// Vec used to store the saved buffer between callback iterations.
let mut saved_buffer: Vec<Vec<u8>> = Vec::new();

let callback_res = audio_unit.set_input_callback(move |args: Args| {
let Args {
num_frames, data, ..
} = args;
trace!("capture call, read {num_frames} frames");
let mut new_data = match saved_buffer.len() {
0 => rx_dev_free.recv().unwrap(),
_ => saved_buffer.pop().unwrap(),
};
let length_bytes = data.buffer.len();
if length_bytes > new_data.len() {
debug!("Buffer is too small, resizing from {} to {}", new_data.len(), length_bytes);
new_data.resize(length_bytes, 0);
}
for (databyte, bufferbyte) in data.buffer.iter().zip(new_data.iter_mut()) {
*bufferbyte = *databyte;

let pushed_bytes = device_producer.push_slice(data.buffer);
if pushed_bytes < data.buffer.len() {
debug!(
"Capture ring buffer is full, dropped {} out of {} bytes",
data.buffer.len() - pushed_bytes,
data.buffer.len()
);
}
match tx_dev.try_send((chunk_counter, length_bytes, new_data)) {
match tx_dev.try_send((chunk_counter, pushed_bytes)) {
Ok(()) => {
device_sph.signal();
},
Err(TrySendError::Full((nbr, length_bytes, buf))) => {
Err(TrySendError::Full((nbr, length_bytes))) => {
debug!("Dropping captured chunk {nbr} with len {length_bytes}");
saved_buffer.push(buf);
}
Err(_) => {
error!("Error sending, channel disconnected");
Expand Down Expand Up @@ -770,7 +770,6 @@ impl CaptureDevice for CoreaudioCaptureDevice {
warn!("Unable to register capture device alive listener, error: {err}");
}

let chunksize_samples = channels * chunksize;
let mut capture_frames = chunksize;
capture_frames = nbr_capture_frames(
&resampler,
Expand All @@ -795,7 +794,6 @@ impl CaptureDevice for CoreaudioCaptureDevice {
let mut silence_counter = countertimer::SilenceCounter::new(silence_threshold, silence_timeout, capture_samplerate, chunksize);
let mut state = ProcessingState::Running;
let blockalign = 4*channels;
let mut data_queue: VecDeque<u8> = VecDeque::with_capacity(4 * blockalign * chunksize_samples );
// TODO check if this ever needs to be resized
let mut data_buffer = vec![0u8; 4 * blockalign * capture_frames];
let mut expected_chunk_nbr = 0;
Expand Down Expand Up @@ -890,22 +888,17 @@ impl CaptureDevice for CoreaudioCaptureDevice {
);
let capture_bytes = blockalign * capture_frames;
let mut tries = 0;
while data_queue.len() < (blockalign * capture_frames) && tries < 50 {
while device_consumer.occupied_len() < (blockalign * capture_frames) && tries < 50 {
trace!("capture device needs more samples to make chunk, reading from channel");
let _ = semaphore.wait_timeout(Duration::from_millis(20));
match rx_dev.try_recv() {
Ok((chunk_nbr, length_bytes, data)) => {
Ok((chunk_nbr, length_bytes)) => {
trace!("got chunk, length {length_bytes} bytes");
expected_chunk_nbr += 1;
if chunk_nbr > expected_chunk_nbr {
warn!("Samples were dropped, missing {} buffers", chunk_nbr-expected_chunk_nbr);
expected_chunk_nbr = chunk_nbr;
}
for element in data.iter().take(length_bytes) {
data_queue.push_back(*element);
}
// Return the buffer to the queue
tx_dev_free.send(data).unwrap();
}
Err(TryRecvError::Empty) => {
trace!("No new data from inner capture thread, try {tries} of 50");
Expand All @@ -919,7 +912,7 @@ impl CaptureDevice for CoreaudioCaptureDevice {
}
tries += 1;
}
if data_queue.len() < (blockalign * capture_frames) {
if device_consumer.occupied_len() < (blockalign * capture_frames) {
if let Some(mut capture_status) = capture_status.try_write() {
capture_status.measured_samplerate = 0;
capture_status.signal_range = 0.0;
Expand All @@ -936,17 +929,15 @@ impl CaptureDevice for CoreaudioCaptureDevice {
}
continue;
}
for element in data_buffer.iter_mut().take(capture_bytes) {
*element = data_queue.pop_front().unwrap();
}
device_consumer.pop_slice(&mut data_buffer[0..capture_bytes]);
let mut chunk = buffer_to_chunk_rawbytes(
&data_buffer[0..capture_bytes],
channels,
&SampleFormat::FLOAT32LE,
capture_bytes,
&capture_status.read().used_channels,
);
averager.add_value(capture_frames + data_queue.len()/blockalign - prev_len/blockalign);
averager.add_value(capture_frames + device_consumer.occupied_len()/blockalign - prev_len/blockalign);
if let Some(capture_status) = capture_status.try_upgradable_read() {
if averager.larger_than_millis(capture_status.update_interval as u64)
{
Expand All @@ -971,7 +962,7 @@ impl CaptureDevice for CoreaudioCaptureDevice {
else {
xtrace!("capture status blocked, skip update");
}
watcher_averager.add_value(capture_frames + data_queue.len()/blockalign - prev_len/blockalign);
watcher_averager.add_value(capture_frames + device_consumer.occupied_len()/blockalign - prev_len/blockalign);
if watcher_averager.larger_than_millis(rate_measure_interval)
{
let samples_per_sec = watcher_averager.average();
Expand All @@ -992,7 +983,7 @@ impl CaptureDevice for CoreaudioCaptureDevice {
}
}
}
prev_len = data_queue.len();
prev_len = device_consumer.occupied_len();
chunk.update_stats(&mut chunk_stats);
if let Some(mut capture_status) = capture_status.try_write() {
capture_status.signal_rms.add_record_squared(chunk_stats.rms_linear());
Expand Down
13 changes: 7 additions & 6 deletions src/dither.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use circular_queue::CircularQueue;
use rand::{rngs::SmallRng, SeedableRng};
use rand_distr::{Distribution, Triangular, Uniform};
use ringbuf::storage::Heap;
use ringbuf::traits::*;
use ringbuf::LocalRb;

use crate::{config, filters::Filter, NewValue, PrcFmt, Res};

Expand All @@ -14,17 +16,16 @@ pub struct Dither<'a> {
shaper: Option<NoiseShaper<'a>>,
}

#[derive(Clone, Debug)]
pub struct NoiseShaper<'a> {
// optimization: lifetime allows taking coefficients
// from an array instead of allocating a `Vec`.
filter: &'a [PrcFmt],
buffer: CircularQueue<PrcFmt>,
buffer: LocalRb<Heap<PrcFmt>>,
}

impl<'a> NoiseShaper<'a> {
pub fn new(filter: &'a [PrcFmt]) -> Self {
let buffer = CircularQueue::with_capacity(filter.len());
let buffer = LocalRb::new(filter.len());
Self { filter, buffer }
}

Expand Down Expand Up @@ -438,15 +439,15 @@ impl<'a> NoiseShaper<'a> {

pub fn process(&mut self, scaled: PrcFmt, dither: PrcFmt) -> PrcFmt {
let mut filt_buf = 0.0;
for (item, coeff) in self.buffer.iter().zip(self.filter) {
for (item, coeff) in self.buffer.iter().zip(self.filter.iter().rev()) {
filt_buf += coeff * item;
}

let scaled_plus_err = scaled + filt_buf;
let result = scaled_plus_err + dither;
let result_r = result.round(); // away from zero

self.buffer.push(scaled_plus_err - result_r);
self.buffer.push_overwrite(scaled_plus_err - result_r);

result_r
}
Expand Down
Loading
Loading