diff --git a/CHANGELOG.md b/CHANGELOG.md index 76b474c..2a096d4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,12 @@ +## v4.0.0 +New features: +- Add RACE processor. +- Add option for custom log filtering. +Changes: +- Add microsecond delay unit. +- Allow larger buffer target levels. +- Change mixer config rules to not allow duplicated channels. + ## v3.0.0 New features: - Optional multithreaded filter processing. diff --git a/README.md b/README.md index 948a599..5282ec8 100644 --- a/README.md +++ b/README.md @@ -495,6 +495,18 @@ Alternatively, the log level can be changed with the verbosity flag. By passing the verbosity flag once, `-v`, `debug` messages are enabled. If it's given twice, `-vv`, it also prints `trace` messages. +The option `custom_log_spec` can be used to define custom filters for the logs. +When provided, this option overrides what is given by `-v` and `--loglevel`. +Using this option, the log level can be set to different values for different modules. +Example, set the base log level to `info`, but increase it to `trace` for the +Wasapi backend (which is the `camillalib::wasapidevice` module): +``` +--custom_log_spec="info, camillalib::wasapidevice=trace +``` +Module names are shown in square brackets in the log messages. +See the [flexi-logger documentation](https://docs.rs/flexi_logger/latest/flexi_logger/struct.LogSpecification.html) +for more info on how to write the logger specification. + The log messages are normally written to the terminal via stderr, but they can instead be written to a file by giving the `--logfile` option. The argument should be the path to the logfile. @@ -1685,8 +1697,8 @@ Allowed ranges: - low_boost: 0 to 20 ### Delay -The delay filter provides a delay in milliseconds, millimetres or samples. -The `unit` can be `ms`, `mm` or `samples`, and if left out it defaults to `ms`. +The delay filter provides a delay in milliseconds, microseconds, millimetres or samples. +The `unit` can be `ms`, `us`, `mm` or `samples`, and if left out it defaults to `ms`. When giving the delay in millimetres, the speed of sound of is assumed to be 343 m/s (dry air at 20 degrees Celsius). If the `subsample` parameter is set to `true`, then it will use use an IIR filter to achieve subsample delay precision. @@ -2250,6 +2262,153 @@ pipeline: * `monitor_channels`: a list of channels used when estimating the loudness. Optional, defaults to all channels. * `process_channels`: a list of channels to be gated. Optional, defaults to all channels. +### RACE +The "RACE" processor implements the recursive part of the +[Recursive Ambiophonic Crosstalk Elimination (RACE)](http://www.filmaker.com/papers/RGRM-RACE_rev.pdf) algorithm. +The RACE processor processes a aingle pair of channels. +Multiple processors can be used to process additional channel pairs if needed. + +Parameters: +* `channels`: number of channels, must match the number of channels of the pipeline where the compressor is inserted. +* `channel_a`: channel number of first channel of the pair. +* `channel_b`: channel number of second channel of the pair. +* `attenuation`: attenuation in dB, must be larger than zero. Typical values are 2 - 3 dB. +* `delay`: delay value, must be larger than zero. Typical values are in the range 0.06 - 0.1 ms +* `delay_unit`: unit for delay, see the `Delay` filter. +* `subsample_delay`: enable subsample delay values, see the `Delay` filter. + +The RACE algorithm is normally used with filters, +to only process a limited range of the audio spectrum. +![RACE algoriths](race.png) + +The RACE processor implements the recursive function block +indicated by the dashed rectangle. +This processor is meant to be combined with normal CamillaDSP mixers +and filters to make up the complete solution. + +Example configuration implementing RACE with filters: +```yml +processors: + race: + type: RACE + parameters: + channels: 6 + channel_a: 2 + channel_b: 3 + attenuation: 3 + delay: 0.09 + +mixers: + 2to6: + channels: + in: 2 + out: 6 + mapping: + - dest: 0 + sources: + - channel: 0 + gain: 0 + inverted: false + - dest: 1 + sources: + - channel: 1 + gain: 0 + inverted: false + - dest: 2 + sources: + - channel: 0 + gain: 0 + inverted: false + - dest: 3 + sources: + - channel: 1 + gain: 0 + inverted: false + - dest: 4 + sources: + - channel: 0 + gain: 0 + inverted: false + - dest: 5 + sources: + - channel: 1 + gain: 0 + inverted: false + 6to2: + channels: + in: 6 + out: 2 + mapping: + - dest: 0 + sources: + - channel: 0 + gain: -3 + inverted: false + - channel: 2 + gain: -3 + inverted: false + - channel: 4 + gain: -3 + inverted: false + - dest: 1 + sources: + - channel: 1 + gain: -3 + inverted: false + - channel: 3 + gain: -3 + inverted: false + - channel: 5 + gain: -3 + inverted: false + +filters: + highpass_lower: + type: BiquadCombo + parameters: + type: LinkwitzRileyHighpass + freq: 250 + order: 4 + lowpass_lower: + type: BiquadCombo + parameters: + type: LinkwitzRileyLowpass + freq: 250 + order: 4 + highpass_upper: + type: BiquadCombo + parameters: + type: LinkwitzRileyHighpass + freq: 5000 + order: 4 + lowpass_upper: + type: BiquadCombo + parameters: + type: LinkwitzRileyLowpass + freq: 5000 + order: 4 + +pipeline: + - type: Mixer + name: 2to6 + - type: Filter + channels: [0, 1] + names: + - lowpass_lower + - type: Filter + channels: [2, 3] + names: + - highpass_lower + - lowpass_upper + - type: Filter + channels: [4, 5] + names: + - highpass_upper + - type: Processor + name: race + - type: Mixer + name: 6to2 +``` ## Pipeline The pipeline section defines the processing steps between input and output. diff --git a/race.graphml b/race.graphml new file mode 100644 index 0000000..410d633 --- /dev/null +++ b/race.graphml @@ -0,0 +1,584 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + L + + + + + + + + + + + + HP +5kHz + + + + + + + + + + + + LP +250Hz + + + + + + + + + + + + BP +250 - 5000 Hz + + + + + + + + + + + + + + + + + + + + + + + + + invert + + + + + + + + + + + + attenuate +2 - 3 dB + + + + + + + + + + + + delay +60 - 100 us + + + + + + + + + + + + + + + + + + + + + + + + + R + + + + + + + + + + + + HP +5kHz + + + + + + + + + + + + LP +250Hz + + + + + + + + + + + + BP +250 - 5000 Hz + + + + + + + + + + + + + + + + + + + + + + + + + invert + + + + + + + + + + + + attenuate +2 - 3 dB + + + + + + + + + + + + delay +60 - 100 us + + + + + + + + + + + + + + + + + + + + + + + + + L + + + + + + + + + + + + R + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/race.png b/race.png new file mode 100644 index 0000000..aee8d1d Binary files /dev/null and b/race.png differ diff --git a/src/alsadevice.rs b/src/alsadevice.rs index 5660fa6..b85784c 100644 --- a/src/alsadevice.rs +++ b/src/alsadevice.rs @@ -10,19 +10,19 @@ use alsa::pcm::{Access, Format, Frames, HwParams}; use alsa::poll::Descriptors; use alsa::{Direction, ValueOr, PCM}; use alsa_sys; +use audio_thread_priority::{ + demote_current_thread_from_real_time, promote_current_thread_to_real_time, +}; +use crossbeam_channel; use nix::errno::Errno; use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard}; use rubato::VecResampler; use std::ffi::CString; use std::fmt::Debug; -use std::sync::{mpsc, Arc, Barrier}; +use std::sync::{Arc, Barrier}; use std::thread; use std::time::Instant; -use audio_thread_priority::{ - demote_current_thread_from_real_time, promote_current_thread_to_real_time, -}; - use crate::alsadevice_buffermanager::{ CaptureBufferManager, DeviceBufferManager, PlaybackBufferManager, }; @@ -73,13 +73,13 @@ pub struct AlsaCaptureDevice { } struct CaptureChannels { - audio: mpsc::SyncSender, + audio: crossbeam_channel::Sender, status: crossbeam_channel::Sender, - command: mpsc::Receiver, + command: crossbeam_channel::Receiver, } struct PlaybackChannels { - audio: mpsc::Receiver, + audio: crossbeam_channel::Receiver, status: crossbeam_channel::Sender, } @@ -520,14 +520,20 @@ fn playback_loop_bytes( match msg { Ok(AudioMessage::Audio(chunk)) => { // measure delay only on running non-stalled device - let delay_at_chunk_recvd = if !device_stalled + let avail_at_chunk_recvd = if !device_stalled && pcmdevice.state_raw() == alsa_sys::SND_PCM_STATE_RUNNING as i32 { - pcmdevice.status().ok().map(|status| status.get_delay()) + pcmdevice.avail().ok() } else { None }; - //trace!("PB: Delay at chunk rcvd: {:?}", delay_at_chunk_recvd); + let waiting_chunks_in_channel = channels.audio.len(); + trace!( + "Avail frames in buffer: {:?}, waiting chunks in channel: {}", + avail_at_chunk_recvd, + waiting_chunks_in_channel + ); + //trace!("PB: Avail at chunk rcvd: {:?}", avail_at_chunk_recvd); conversion_result = chunk_to_buffer_rawbytes(&chunk, &mut buffer, ¶ms.sample_format); @@ -602,10 +608,11 @@ fn playback_loop_bytes( } else { xtrace!("playback status blocked, skip update"); } - if let Some(delay) = delay_at_chunk_recvd { - if delay != 0 { - buffer_avg.add_value(delay as f64); - } + if let Some(avail) = avail_at_chunk_recvd { + let delay = buf_manager.current_delay(avail); + buffer_avg.add_value( + delay as f64 + (params.chunksize * waiting_chunks_in_channel) as f64, + ); } if timer.larger_than_millis((1000.0 * params.adjust_period) as u64) { if let Some(avg_delay) = buffer_avg.average() { @@ -694,7 +701,7 @@ fn playback_loop_bytes( } } -fn drain_check_eos(audio: &mpsc::Receiver) -> Option { +fn drain_check_eos(audio: &crossbeam_channel::Receiver) -> Option { let mut eos: Option = None; while let Some(msg) = audio.try_iter().next() { if let AudioMessage::EndOfStream = msg { @@ -874,8 +881,8 @@ fn capture_loop_bytes( } } } - Err(mpsc::TryRecvError::Empty) => {} - Err(mpsc::TryRecvError::Disconnected) => { + Err(crossbeam_channel::TryRecvError::Empty) => {} + Err(crossbeam_channel::TryRecvError::Disconnected) => { error!("Command channel was closed"); break; } @@ -1102,7 +1109,7 @@ fn nbr_capture_bytes_and_frames( impl PlaybackDevice for AlsaPlaybackDevice { fn start( &mut self, - channel: mpsc::Receiver, + channel: crossbeam_channel::Receiver, barrier: Arc, status_channel: crossbeam_channel::Sender, playback_status: Arc>, @@ -1176,10 +1183,10 @@ impl PlaybackDevice for AlsaPlaybackDevice { impl CaptureDevice for AlsaCaptureDevice { fn start( &mut self, - channel: mpsc::SyncSender, + channel: crossbeam_channel::Sender, barrier: Arc, status_channel: crossbeam_channel::Sender, - command_channel: mpsc::Receiver, + command_channel: crossbeam_channel::Receiver, capture_status: Arc>, processing_params: Arc, ) -> Res>> { diff --git a/src/alsadevice_buffermanager.rs b/src/alsadevice_buffermanager.rs index 4534b18..fd61aef 100644 --- a/src/alsadevice_buffermanager.rs +++ b/src/alsadevice_buffermanager.rs @@ -125,6 +125,8 @@ pub trait DeviceBufferManager { // +1 to make sure the device really stalls data.bufsize - data.avail_min + 1 } + + fn current_delay(&self, avail: Frames) -> Frames; } #[derive(Debug)] @@ -182,6 +184,10 @@ impl DeviceBufferManager for CaptureBufferManager { self.data.threshold = threshold; Ok(()) } + + fn current_delay(&self, avail: Frames) -> Frames { + avail + } } #[derive(Debug)] @@ -233,4 +239,8 @@ impl DeviceBufferManager for PlaybackBufferManager { self.data.threshold = threshold; Ok(()) } + + fn current_delay(&self, avail: Frames) -> Frames { + self.data.bufsize - avail + } } diff --git a/src/alsadevice_utils.rs b/src/alsadevice_utils.rs index 4f8fa39..0178f12 100644 --- a/src/alsadevice_utils.rs +++ b/src/alsadevice_utils.rs @@ -123,14 +123,15 @@ pub fn list_pcm_devices(input: bool) -> Vec<(String, String)> { }; for hint in hints { if hint.name.is_some() - && hint.desc.is_some() && (hint.direction.is_none() || hint .direction .map(|dir| dir == direction) .unwrap_or_default()) { - names.push((hint.name.unwrap(), hint.desc.unwrap())) + let name = hint.name.unwrap(); + let description = hint.desc.unwrap_or(name.clone()); + names.push((name, description)) } } names @@ -292,7 +293,7 @@ pub struct ElemData<'a> { numid: u32, } -impl<'a> ElemData<'a> { +impl ElemData<'_> { pub fn read_as_int(&self) -> Option { self.element .read() diff --git a/src/audiodevice.rs b/src/audiodevice.rs index da104ca..0e4e075 100644 --- a/src/audiodevice.rs +++ b/src/audiodevice.rs @@ -28,7 +28,6 @@ use rubato::{ }; use std::error; use std::fmt; -use std::sync::mpsc; use std::sync::{Arc, Barrier}; use std::thread; use std::time::Instant; @@ -222,7 +221,7 @@ pub fn rms_and_peak(data: &[PrcFmt]) -> (PrcFmt, PrcFmt) { pub trait PlaybackDevice { fn start( &mut self, - channel: mpsc::Receiver, + channel: crossbeam_channel::Receiver, barrier: Arc, status_channel: crossbeam_channel::Sender, playback_status: Arc>, @@ -233,10 +232,10 @@ pub trait PlaybackDevice { pub trait CaptureDevice { fn start( &mut self, - channel: mpsc::SyncSender, + channel: crossbeam_channel::Sender, barrier: Arc, status_channel: crossbeam_channel::Sender, - command_channel: mpsc::Receiver, + command_channel: crossbeam_channel::Receiver, capture_status: Arc>, processing_params: Arc, ) -> Res>>; diff --git a/src/basicfilters.rs b/src/basicfilters.rs index eb8d318..47d0cc9 100644 --- a/src/basicfilters.rs +++ b/src/basicfilters.rs @@ -291,6 +291,10 @@ impl Gain { let linear = conf.scale() == config::GainScale::Linear; Gain::new(name, gain, inverted, mute, linear) } + + pub fn process_single(&self, value: PrcFmt) -> PrcFmt { + value * self.gain + } } impl Filter for Gain { @@ -364,6 +368,7 @@ impl Delay { pub fn from_config(name: &str, samplerate: usize, conf: config::DelayParameters) -> Self { let delay_samples = match conf.unit() { + config::TimeUnit::Microseconds => conf.delay / 1000000.0 * (samplerate as PrcFmt), config::TimeUnit::Milliseconds => conf.delay / 1000.0 * (samplerate as PrcFmt), config::TimeUnit::Millimetres => conf.delay / 1000.0 * (samplerate as PrcFmt) / 343.0, config::TimeUnit::Samples => conf.delay, @@ -371,6 +376,14 @@ impl Delay { Self::new(name, samplerate, delay_samples, conf.subsample()) } + + pub fn process_single(&mut self, input: PrcFmt) -> PrcFmt { + let mut value = self.queue.push_overwrite(input).unwrap(); + if let Some(bq) = &mut self.biquad { + value = bq.process_single(value); + } + value + } } impl Filter for Delay { diff --git a/src/bin.rs b/src/bin.rs index 976d9dc..5a248d8 100644 --- a/src/bin.rs +++ b/src/bin.rs @@ -29,7 +29,6 @@ use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard}; use std::env; use std::path::PathBuf; use std::sync::atomic::AtomicBool; -use std::sync::mpsc; use std::sync::{Arc, Barrier}; use std::thread; use std::time::Duration; @@ -76,10 +75,11 @@ fn custom_colored_logger_format( let level = record.level(); write!( w, - "{} {:<5} [{}:{}] {}", + "{} {:<5} [{}] <{}:{}> {}", now.now().format("%Y-%m-%d %H:%M:%S%.6f"), flexi_logger::style(level).paint(level.to_string()), - record.file().unwrap_or(""), + record.module_path().unwrap_or("*unknown module*"), + record.file().unwrap_or("*unknown file*"), record.line().unwrap_or(0), &record.args() ) @@ -93,10 +93,11 @@ pub fn custom_logger_format( ) -> Result<(), std::io::Error> { write!( w, - "{} {:<5} [{}:{}] {}", + "{} {:<5} [{}] <{}:{}> {}", now.now().format("%Y-%m-%d %H:%M:%S%.6f"), record.level(), - record.file().unwrap_or(""), + record.module_path().unwrap_or("*unknown module*"), + record.file().unwrap_or("*unknown file*"), record.line().unwrap_or(0), &record.args() ) @@ -124,15 +125,15 @@ fn run( return Ok(ExitState::Exit); } }; - let (tx_pb, rx_pb) = mpsc::sync_channel(active_config.devices.queuelimit()); - let (tx_cap, rx_cap) = mpsc::sync_channel(active_config.devices.queuelimit()); + let (tx_pb, rx_pb) = crossbeam_channel::bounded(active_config.devices.queuelimit()); + let (tx_cap, rx_cap) = crossbeam_channel::bounded(active_config.devices.queuelimit()); let (tx_status, rx_status) = crossbeam_channel::unbounded(); let tx_status_pb = tx_status.clone(); let tx_status_cap = tx_status; - let (tx_command_cap, rx_command_cap) = mpsc::channel(); - let (tx_pipeconf, rx_pipeconf) = mpsc::channel(); + let (tx_command_cap, rx_command_cap) = crossbeam_channel::unbounded(); + let (tx_pipeconf, rx_pipeconf) = crossbeam_channel::unbounded(); let barrier = Arc::new(Barrier::new(4)); let barrier_pb = barrier.clone(); @@ -541,6 +542,15 @@ fn main_process() -> i32 { .value_parser(clap::value_parser!(u32)) .action(ArgAction::Set), ) + .arg( + Arg::new("custom_log_spec") + .help("Custom logger specification") + .long("custom_log_spec") + .value_name("LOG_SPEC") + .display_order(104) + .value_parser(clap::value_parser!(String)) + .action(ArgAction::Set), + ) .arg( Arg::new("gain") .help("Initial gain in dB for main volume control") @@ -739,6 +749,9 @@ fn main_process() -> i32 { if let Some(level) = matches.get_one::("loglevel") { loglevel = level; } + if let Some(spec) = matches.get_one::("custom_log_spec") { + loglevel = spec; + } let logger = if let Some(logfile) = matches.get_one::("logfile") { let mut path = PathBuf::from(logfile); @@ -748,7 +761,7 @@ fn main_process() -> i32 { path = fullpath; } let mut logger = flexi_logger::Logger::try_with_str(loglevel) - .unwrap() + .expect("The provided logger specification is invalid") .format(custom_logger_format) .log_to_file(flexi_logger::FileSpec::try_from(path).unwrap()) .write_mode(flexi_logger::WriteMode::Async); @@ -770,7 +783,7 @@ fn main_process() -> i32 { logger.start().unwrap() } else { flexi_logger::Logger::try_with_str(loglevel) - .unwrap() + .expect("The provided logger specification is invalid") .format(custom_colored_logger_format) .set_palette("196;208;-;27;8".to_string()) .log_to_stderr() @@ -1060,7 +1073,7 @@ fn main_process() -> i32 { #[cfg(feature = "websocket")] { - let (tx_state, rx_state) = mpsc::sync_channel(1); + let (tx_state, rx_state) = crossbeam_channel::bounded(1); let processing_params_clone = processing_params.clone(); let active_config_path_clone = active_config_path.clone(); diff --git a/src/biquad.rs b/src/biquad.rs index 4a0aff2..186c50a 100644 --- a/src/biquad.rs +++ b/src/biquad.rs @@ -423,7 +423,7 @@ impl Biquad { /// Process a single sample, SSE2 version #[cfg(all(target_arch = "x86_64", not(feature = "32bit")))] - fn process_single(&mut self, input: PrcFmt) -> PrcFmt { + pub fn process_single(&mut self, input: PrcFmt) -> PrcFmt { unsafe { // load input let input_input = _mm_load1_pd(&input); @@ -461,7 +461,7 @@ impl Biquad { /// Process a single sample, generic version #[cfg(not(all(target_arch = "x86_64", not(feature = "32bit"))))] - fn process_single(&mut self, input: PrcFmt) -> PrcFmt { + pub fn process_single(&mut self, input: PrcFmt) -> PrcFmt { let out = self.s1 + self.coeffs.b0 * input; self.s1 = self.s2 + self.coeffs.b1 * input - self.coeffs.a1 * out; self.s2 = self.coeffs.b2 * input - self.coeffs.a2 * out; diff --git a/src/config.rs b/src/config.rs index ad33403..46c97b5 100644 --- a/src/config.rs +++ b/src/config.rs @@ -2,6 +2,7 @@ use crate::compressor; use crate::filters; use crate::mixer; use crate::noisegate; +use crate::race; use crate::wavtools::{find_data_in_wav_stream, WavParams}; use parking_lot::RwLock; use serde::{de, Deserialize, Serialize}; @@ -1123,6 +1124,8 @@ impl DelayParameters { #[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq)] #[serde(deny_unknown_fields)] pub enum TimeUnit { + #[serde(rename = "us")] + Microseconds, #[serde(rename = "ms")] Milliseconds, #[serde(rename = "mm")] @@ -1259,6 +1262,11 @@ pub enum Processor { description: Option, parameters: NoiseGateParameters, }, + RACE { + #[serde(default)] + description: Option, + parameters: RACEParameters, + }, } #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] @@ -1323,6 +1331,30 @@ impl NoiseGateParameters { } } +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +#[serde(deny_unknown_fields)] +pub struct RACEParameters { + pub channels: usize, + pub channel_a: usize, + pub channel_b: usize, + pub delay: PrcFmt, + #[serde(default)] + pub subsample_delay: Option, + #[serde(default)] + pub delay_unit: Option, + pub attenuation: PrcFmt, +} + +impl RACEParameters { + pub fn subsample_delay(&self) -> bool { + self.subsample_delay.unwrap_or_default() + } + + pub fn delay_unit(&self) -> TimeUnit { + self.delay_unit.unwrap_or(TimeUnit::Milliseconds) + } +} + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[serde(deny_unknown_fields)] pub struct LimiterParameters { @@ -1825,12 +1857,12 @@ pub fn validate_config(conf: &mut Configuration, filename: Option<&str>) -> Res< } #[cfg(target_os = "linux")] let target_level_limit = if matches!(conf.devices.playback, PlaybackDevice::Alsa { .. }) { - 4 * conf.devices.chunksize + (4 + conf.devices.queuelimit()) * conf.devices.chunksize } else { - 2 * conf.devices.chunksize + (2 + conf.devices.queuelimit()) * conf.devices.chunksize }; #[cfg(not(target_os = "linux"))] - let target_level_limit = 2 * conf.devices.chunksize; + let target_level_limit = (2 + conf.devices.queuelimit()) * conf.devices.chunksize; if conf.devices.target_level() > target_level_limit { let msg = format!("target_level cannot be larger than {}", target_level_limit); @@ -2092,6 +2124,26 @@ pub fn validate_config(conf: &mut Configuration, filename: Option<&str>) -> Res< } } } + Processor::RACE { parameters, .. } => { + let channels = parameters.channels; + if channels != num_channels { + let msg = format!( + "RACE processor '{}' has wrong number of channels. Expected {}, found {}.", + step.name, num_channels, channels + ); + return Err(ConfigError::new(&msg).into()); + } + match race::validate_race(parameters) { + Ok(_) => {} + Err(err) => { + let msg = format!( + "Invalid RACE processor '{}'. Reason: {}", + step.name, err + ); + return Err(ConfigError::new(&msg).into()); + } + } + } } } } else { diff --git a/src/coreaudiodevice.rs b/src/coreaudiodevice.rs index 52eb28c..38da659 100644 --- a/src/coreaudiodevice.rs +++ b/src/coreaudiodevice.rs @@ -60,7 +60,7 @@ fn take_ownership(device_id: AudioDeviceID) -> Res { debug!("We have exclusive access."); } else { warn!( - "Could not get exclusive access. CamillaDSP pid: {camilla_pid}, device owner pid: {device_pid}" + "Could not get exclusive access. CamillaDSP pid: {camilla_pid}, device owner pid: {device_pid}." ); } } @@ -68,7 +68,10 @@ fn take_ownership(device_id: AudioDeviceID) -> Res { } fn release_ownership(device_id: AudioDeviceID) -> Res<()> { - trace!("Releasing any device ownership for device id {}", device_id); + trace!( + "Releasing any device ownership for device id {}.", + device_id + ); let device_owner_pid = match get_hogging_pid(device_id) { Ok(pid) => pid, Err(CoreAudioError::AudioCodec(AudioCodecError::UnknownProperty)) => return Ok(()), @@ -84,7 +87,7 @@ fn release_ownership(device_id: AudioDeviceID) -> Res<()> { debug!("Exclusive access released."); } else { warn!( - "Could not release exclusive access. CamillaDSP pid: {camilla_pid}, device owner pid: {new_device_pid}" + "Could not release exclusive access. CamillaDSP pid: {camilla_pid}, device owner pid: {new_device_pid}." ); } } @@ -197,7 +200,10 @@ fn open_coreaudio_playback( exclusive: bool, ) -> Res<(AudioUnit, AudioDeviceID)> { let device_id = if let Some(name) = devname { - trace!("Available playback devices: {:?}", list_device_names(false)); + trace!( + "Available playback devices: {:?}.", + list_device_names(false) + ); match get_device_id_from_name_and_scope(name, false) { Some(dev) => dev, None => { @@ -214,12 +220,12 @@ fn open_coreaudio_playback( } } }; - trace!("Playback device id: {}", device_id); + trace!("Playback device id: {}.", device_id); let mut audio_unit = audio_unit_from_device_id(device_id, false) .map_err(|e| ConfigError::new(&format!("{e}")))?; - trace!("Created playback audio unit"); + trace!("Created playback audio unit."); if exclusive { take_ownership(device_id)?; } else { @@ -246,20 +252,20 @@ fn open_coreaudio_playback( }; trace!( - "Available formats: {:?}", + "Available formats: {:?}.", get_supported_physical_stream_formats(device_id) ); if let Some(phys_asbd) = find_matching_physical_format(device_id, physical_stream_format) { - debug!("Set phys playback stream format"); + debug!("Set phys playback stream format."); set_device_physical_stream_format(device_id, phys_asbd).map_err(|_| { - ConfigError::new("Failed to find matching physical playback format") + ConfigError::new("Failed to find matching physical playback format.") })?; } else { - let msg = "Failed to find matching physical playback format"; + let msg = "Failed to find matching physical playback format."; return Err(ConfigError::new(msg).into()); } } else { - trace!("Set playback device sample rate"); + trace!("Set playback device sample rate."); set_device_sample_rate(device_id, samplerate as f64) .map_err(|e| ConfigError::new(&format!("{e}")))?; } @@ -277,7 +283,7 @@ fn open_coreaudio_playback( .set_property(id, Scope::Input, Element::Output, Some(&asbd)) .map_err(|e| ConfigError::new(&format!("{e}")))?; - debug!("Opened CoreAudio playback device {devname:?}"); + debug!("Opened CoreAudio playback device {devname:?}."); Ok((audio_unit, device_id)) } @@ -288,11 +294,11 @@ fn open_coreaudio_capture( sample_format: &Option, ) -> Res<(AudioUnit, AudioDeviceID)> { let device_id = if let Some(name) = devname { - debug!("Available capture devices: {:?}", list_device_names(true)); + debug!("Available capture devices: {:?}.", list_device_names(true)); match get_device_id_from_name_and_scope(name, true) { Some(dev) => dev, None => { - let msg = format!("Could not find capture device '{name}'"); + let msg = format!("Could not find capture device '{name}'."); return Err(ConfigError::new(&msg).into()); } } @@ -300,7 +306,7 @@ fn open_coreaudio_capture( match get_default_device_id(true) { Some(dev) => dev, None => { - let msg = "Could not get default capture device".to_string(); + let msg = "Could not get default capture device.".to_string(); return Err(ConfigError::new(&msg).into()); } } @@ -329,15 +335,16 @@ fn open_coreaudio_capture( }; trace!( - "Available formats: {:?}", + "Available formats: {:?}.", get_supported_physical_stream_formats(device_id) ); if let Some(phys_asbd) = find_matching_physical_format(device_id, physical_stream_format) { - debug!("Set phys capture stream format"); - set_device_physical_stream_format(device_id, phys_asbd) - .map_err(|_| ConfigError::new("Failed to find matching physical capture format"))?; + debug!("Set phys capture stream format."); + set_device_physical_stream_format(device_id, phys_asbd).map_err(|_| { + ConfigError::new("Failed to find matching physical capture format.") + })?; } else { - let msg = "Failed to find matching physical capture format"; + let msg = "Failed to find matching physical capture format."; return Err(ConfigError::new(msg).into()); } } else { @@ -345,7 +352,7 @@ fn open_coreaudio_capture( .map_err(|e| ConfigError::new(&format!("{e}")))?; } - debug!("Set capture stream format"); + debug!("Set capture stream format."); let stream_format = StreamFormat { sample_rate: samplerate as f64, sample_format: coreaudio::audio_unit::SampleFormat::F32, @@ -359,7 +366,7 @@ fn open_coreaudio_capture( .set_property(id, Scope::Output, Element::Input, Some(&asbd)) .map_err(|e| ConfigError::new(&format!("{e}")))?; - debug!("Opened CoreAudio capture device {devname:?}"); + debug!("Opened CoreAudio capture device {devname:?}."); Ok((audio_unit, device_id)) } @@ -371,7 +378,7 @@ enum PlaybackDeviceMessage { impl PlaybackDevice for CoreaudioPlaybackDevice { fn start( &mut self, - channel: mpsc::Receiver, + channel: crossbeam_channel::Receiver, barrier: Arc, status_channel: crossbeam_channel::Sender, playback_status: Arc>, @@ -408,10 +415,10 @@ impl PlaybackDevice for CoreaudioPlaybackDevice { let mut rate_controller = PIRateController::new_with_default_gains(samplerate, adjust_period as f64, target_level); let mut rate_adjust_value = 1.0; - trace!("Build output stream"); + trace!("Build output stream."); let mut conversion_result; let mut sample_queue: VecDeque = - VecDeque::with_capacity(16 * chunksize * blockalign); + VecDeque::with_capacity((16 * chunksize + target_level) * blockalign); let ringbuffer = HeapRb::::new(blockalign * ( 2 * chunksize + 2048 )); let (mut device_producer, mut device_consumer) = ringbuffer.split(); @@ -435,24 +442,37 @@ impl PlaybackDevice for CoreaudioPlaybackDevice { type Args = render_callback::Args>; - let mut running = true; + let mut running = false; + let mut starting = true; let callback_res = audio_unit.set_render_callback(move |args: Args| { let Args { num_frames, data, .. } = args; - trace!("playback cb called with {num_frames} frames"); + trace!("Playback cb called with {num_frames} frames."); while sample_queue.len() < (blockalign * num_frames) { trace!("playback loop needs more samples, reading from channel"); match rx_dev.try_recv() { Ok(PlaybackDeviceMessage::Data(bytes)) => { - trace!("got chunk"); + trace!("Received a chunk."); for element in device_consumer.pop_iter().take(bytes) { sample_queue.push_back(element); } if !running { running = true; - info!("Restarting playback after buffer underrun"); + if starting { + starting = false; + } + else { + warn!("Restarting playback after buffer underrun."); + } + debug!("Inserting {target_level} silent frames to reach target delay."); + for _ in 0..(blockalign * target_level) { + sample_queue.push_back(0); + } + } + for element in chunk.iter() { + sample_queue.push_back(*element); } } Err(_) => { @@ -461,7 +481,7 @@ impl PlaybackDevice for CoreaudioPlaybackDevice { } if running { running = false; - warn!("Playback interrupted, no data available"); + warn!("Playback interrupted, no data available."); } } } @@ -491,7 +511,7 @@ impl PlaybackDevice for CoreaudioPlaybackDevice { let mut alive_listener = AliveListener::new(device_id); if let Err(err) = alive_listener.register() { - warn!("Unable to register playback device alive listener, error: {err}"); + warn!("Unable to register playback device alive listener, error: {err}."); } match status_channel.send(StatusMessage::PlaybackReady) { @@ -506,7 +526,7 @@ impl PlaybackDevice for CoreaudioPlaybackDevice { * SampleFormat::FLOAT32LE.bytes_per_sample() ]; - debug!("Playback device ready and waiting"); + debug!("Playback device ready and waiting."); barrier.wait(); debug!("Playback device starts now!"); match audio_unit.start() { @@ -527,7 +547,7 @@ impl PlaybackDevice for CoreaudioPlaybackDevice { } Err(err) => { warn!( - "Playback thread could not get real time priority, error: {}", + "Playback thread could not get real time priority, error: {}.", err ); None @@ -545,7 +565,8 @@ impl PlaybackDevice for CoreaudioPlaybackDevice { } match channel.recv() { Ok(AudioMessage::Audio(chunk)) => { - buffer_avg.add_value(buffer_fill.try_lock().map(|b| b.estimate() as f64).unwrap_or_default()); + let estimated_buffer_fill = buffer_fill.try_lock().map(|b| b.estimate() as f64).unwrap_or_default(); + buffer_avg.add_value(estimated_buffer_fill + (channel.len() * chunksize) as f64); if adjust && timer.larger_than_millis((1000.0 * adjust_period) as u64) { if let Some(av_delay) = buffer_avg.average() { let speed = rate_controller.next(av_delay); @@ -555,7 +576,7 @@ impl PlaybackDevice for CoreaudioPlaybackDevice { buffer_avg.restart(); if changed { debug!( - "Current buffer level {:.1}, set capture rate to {:.4}%", + "Current buffer level {:.1}, set capture rate to {:.4}%.", av_delay, 100.0 * speed ); @@ -566,7 +587,7 @@ impl PlaybackDevice for CoreaudioPlaybackDevice { } else { debug!( - "Current buffer level {:.1}, leaving capture rate at {:.4}%", + "Current buffer level {:.1}, leaving capture rate at {:.4}%.", av_delay, 100.0 * rate_adjust_value ); @@ -593,13 +614,13 @@ impl PlaybackDevice for CoreaudioPlaybackDevice { .add_record(chunk_stats.peak_linear()); } else { - xtrace!("playback status blocket, skip rms update"); + xtrace!("Playback status blocked, skipping rms update."); } let bytes = device_producer.push_slice(&buf[0..conversion_result.0]); match tx_dev.send(PlaybackDeviceMessage::Data(bytes)) { Ok(_) => {} Err(err) => { - error!("Playback device channel error: {err}"); + error!("Playback device channel error: {err}."); status_channel .send(StatusMessage::PlaybackError(err.to_string())) .unwrap_or(()); @@ -608,7 +629,7 @@ impl PlaybackDevice for CoreaudioPlaybackDevice { } } Ok(AudioMessage::Pause) => { - trace!("Pause message received"); + trace!("Pause message received."); } Ok(AudioMessage::EndOfStream) => { status_channel @@ -647,7 +668,7 @@ fn nbr_capture_frames( ) -> usize { if let Some(resampl) = &resampler { #[cfg(feature = "debug")] - trace!("Resampler needs {} frames", resampl.input_frames_next()); + trace!("Resampler needs {} frames.", resampl.input_frames_next()); resampl.input_frames_next() } else { capture_frames @@ -658,10 +679,10 @@ fn nbr_capture_frames( impl CaptureDevice for CoreaudioCaptureDevice { fn start( &mut self, - channel: mpsc::SyncSender, + channel: crossbeam_channel::Sender, barrier: Arc, status_channel: crossbeam_channel::Sender, - command_channel: mpsc::Receiver, + command_channel: crossbeam_channel::Receiver, capture_status: Arc>, _processing_params: Arc, ) -> Res>> { @@ -706,7 +727,7 @@ impl CaptureDevice for CoreaudioCaptureDevice { let ringbuffer = HeapRb::::new(blockalign * ( 2 * chunksize + 2 * callback_frames )); let (mut device_producer, mut device_consumer) = ringbuffer.split(); - trace!("Build input stream"); + trace!("Build input stream."); let (mut audio_unit, device_id) = match open_coreaudio_capture(&devname, capture_samplerate, channels, &sample_format) { Ok(audio_unit) => audio_unit, Err(err) => { @@ -726,12 +747,12 @@ impl CaptureDevice for CoreaudioCaptureDevice { let Args { num_frames, data, .. } = args; - trace!("capture call, read {num_frames} frames"); + trace!("Capture call, read {num_frames} frames."); let pushed_bytes = device_producer.push_slice(data.buffer); if pushed_bytes < data.buffer.len() { debug!( - "Capture ring buffer is full, dropped {} out of {} bytes", + "Capture ring buffer is full, dropped {} out of {} bytes.", data.buffer.len() - pushed_bytes, data.buffer.len() ); @@ -741,7 +762,7 @@ impl CaptureDevice for CoreaudioCaptureDevice { device_sph.signal(); }, Err(TrySendError::Full((nbr, length_bytes))) => { - debug!("Dropping captured chunk {nbr} with len {length_bytes}"); + debug!("Dropping captured chunk {nbr} with len {length_bytes}."); } Err(_) => { error!("Error sending, channel disconnected"); @@ -763,11 +784,11 @@ impl CaptureDevice for CoreaudioCaptureDevice { let (rate_tx, rate_rx) = mpsc::channel(); let mut rate_listener = RateListener::new(device_id, Some(rate_tx)); if let Err(err) = rate_listener.register() { - warn!("Unable to register capture rate listener, error: {err}"); + warn!("Unable to register capture rate listener, error: {err}."); } let mut alive_listener = AliveListener::new(device_id); if let Err(err) = alive_listener.register() { - warn!("Unable to register capture device alive listener, error: {err}"); + warn!("Unable to register capture device alive listener, error: {err}."); } let mut capture_frames = chunksize; @@ -779,7 +800,7 @@ impl CaptureDevice for CoreaudioCaptureDevice { let pitch_supported = configure_pitch_control(device_id); if pitch_supported { if samplerate == capture_samplerate && resampler.is_some() { - warn!("Needless 1:1 sample rate conversion active. Not needed since capture device supports rate adjust"); + warn!("Needless 1:1 sample rate conversion active. Not needed since capture device supports rate adjust."); } else if async_src && resampler.is_some() { warn!("Async resampler not needed since capture device supports rate adjust. Consider switching to Sync type to save CPU time."); } @@ -799,7 +820,7 @@ impl CaptureDevice for CoreaudioCaptureDevice { let mut expected_chunk_nbr = 0; let mut prev_len = 0; let mut channel_mask = vec![true; channels]; - debug!("Capture device ready and waiting"); + debug!("Capture device ready and waiting."); match status_channel.send(StatusMessage::CaptureReady) { Ok(()) => {} Err(_err) => {} @@ -821,7 +842,7 @@ impl CaptureDevice for CoreaudioCaptureDevice { } Err(err) => { warn!( - "Capture thread could not get real time priority, error: {}", + "Capture thread could not get real time priority, error: {}.", err ); None @@ -830,7 +851,7 @@ impl CaptureDevice for CoreaudioCaptureDevice { 'deviceloop: loop { match command_channel.try_recv() { Ok(CommandMessage::Exit) => { - debug!("Exit message received, sending EndOfStream"); + debug!("Exit message received, sending EndOfStream."); let msg = AudioMessage::EndOfStream; channel.send(msg).unwrap_or(()); status_channel.send(StatusMessage::CaptureDone).unwrap_or(()); @@ -838,15 +859,15 @@ impl CaptureDevice for CoreaudioCaptureDevice { } Ok(CommandMessage::SetSpeed { speed }) => { rate_adjust = speed; - debug!("Requested to adjust capture speed to {speed}"); + debug!("Requested to adjust capture speed to {speed}."); if pitch_supported { set_pitch(device_id, speed as f32); } else if let Some(resampl) = &mut resampler { - debug!("Adjusting resampler rate to {speed}"); + debug!("Adjusting resampler rate to {speed}."); if async_src { if resampl.set_resample_ratio_relative(speed, true).is_err() { - debug!("Failed to set resampling speed to {speed}"); + debug!("Failed to set resampling speed to {speed}."); } } else { @@ -854,15 +875,15 @@ impl CaptureDevice for CoreaudioCaptureDevice { } } }, - Err(mpsc::TryRecvError::Empty) => {} - Err(mpsc::TryRecvError::Disconnected) => { + Err(crossbeam_channel::TryRecvError::Empty) => {} + Err(crossbeam_channel::TryRecvError::Disconnected) => { error!("Command channel was closed"); break; } } match rate_rx.try_recv() { Ok(rate) => { - debug!("Capture rate change event, new rate: {rate}"); + debug!("Capture rate change event, new rate: {rate}."); if rate as usize != capture_samplerate { channel.send(AudioMessage::EndOfStream).unwrap_or(()); status_channel.send(StatusMessage::CaptureFormatChange(rate as usize)).unwrap_or(()); @@ -889,24 +910,24 @@ impl CaptureDevice for CoreaudioCaptureDevice { let capture_bytes = blockalign * capture_frames; let mut tries = 0; while device_consumer.occupied_len() < (blockalign * capture_frames) && tries < 50 { - trace!("capture device needs more samples to make chunk, reading from channel"); + trace!("Capture device needs more samples to make chunk, reading from channel."); let _ = semaphore.wait_timeout(Duration::from_millis(20)); match rx_dev.try_recv() { Ok((chunk_nbr, length_bytes)) => { - trace!("got chunk, length {length_bytes} bytes"); + trace!("Received chunk, length {length_bytes} bytes."); expected_chunk_nbr += 1; if chunk_nbr > expected_chunk_nbr { - warn!("Samples were dropped, missing {} buffers", chunk_nbr-expected_chunk_nbr); + warn!("Samples were dropped, missing {} buffers.", chunk_nbr-expected_chunk_nbr); expected_chunk_nbr = chunk_nbr; } } Err(TryRecvError::Empty) => { - trace!("No new data from inner capture thread, try {tries} of 50"); + trace!("No new data from inner capture thread, try {tries} of 50."); } Err(TryRecvError::Disconnected) => { - error!("Channel is closed"); + error!("Channel is closed."); channel.send(AudioMessage::EndOfStream).unwrap_or(()); - status_channel.send(StatusMessage::CaptureError("Inner capture thread has exited".to_string())).unwrap_or(()); + status_channel.send(StatusMessage::CaptureError("Inner capture thread has exited.".to_string())).unwrap_or(()); return; } } @@ -920,7 +941,7 @@ impl CaptureDevice for CoreaudioCaptureDevice { capture_status.state = ProcessingState::Stalled; } else { - xtrace!("capture status blocked, skip update"); + xtrace!("Capture status blocked, skip update."); } let msg = AudioMessage::Pause; if channel.send(msg).is_err() { @@ -945,7 +966,7 @@ impl CaptureDevice for CoreaudioCaptureDevice { averager.restart(); let measured_rate_f = samples_per_sec; debug!( - "Measured sample rate is {:.1} Hz", + "Measured sample rate is {:.1} Hz.", measured_rate_f ); if let Ok(mut capture_status) = RwLockUpgradableReadGuard::try_upgrade(capture_status) { @@ -955,12 +976,12 @@ impl CaptureDevice for CoreaudioCaptureDevice { capture_status.state = state; } else { - xtrace!("capture status upgrade blocked, skip update"); + xtrace!("Capture status upgrade blocked, skip update."); } } } else { - xtrace!("capture status blocked, skip update"); + xtrace!("Capture status blocked, skip update."); } watcher_averager.add_value(capture_frames + device_consumer.occupied_len()/blockalign - prev_len/blockalign); if watcher_averager.larger_than_millis(rate_measure_interval) @@ -969,12 +990,12 @@ impl CaptureDevice for CoreaudioCaptureDevice { watcher_averager.restart(); let measured_rate_f = samples_per_sec; debug!( - "Rate watcher, measured sample rate is {:.1} Hz", + "Rate watcher, measured sample rate is {:.1} Hz.", measured_rate_f ); let changed = valuewatcher.check_value(measured_rate_f as f32); if changed { - warn!("sample rate change detected, last rate was {measured_rate_f} Hz"); + warn!("sample rate change detected, last rate was {measured_rate_f} Hz."); if stop_on_rate_change { let msg = AudioMessage::EndOfStream; channel.send(msg).unwrap_or(()); @@ -990,7 +1011,7 @@ impl CaptureDevice for CoreaudioCaptureDevice { capture_status.signal_peak.add_record(chunk_stats.peak_linear()); } else { - xtrace!("capture status blocked, skip rms update"); + xtrace!("Capture status blocked, skip rms update."); } value_range = chunk.maxval - chunk.minval; state = silence_counter.update(value_range); @@ -1074,7 +1095,7 @@ fn set_pitch(device_id: AudioDeviceID, pitch: f32) { }; let mut pan: f32 = (pitch - 1.0) * 50.0 + 0.5; pan = pan.clamp(0.0, 1.0); - debug!("Setting capture pitch to: {pitch}, corresponding pan value: {pan}"); + debug!("Setting capture pitch to: {pitch}, corresponding pan value: {pan}."); let data_size = mem::size_of::() as u32; let status = unsafe { AudioObjectSetPropertyData( @@ -1087,12 +1108,12 @@ fn set_pitch(device_id: AudioDeviceID, pitch: f32) { ) }; if status != 0 { - warn!("Unable to set pitch, error code: {status}",); + warn!("Unable to set pitch, error code: {status}.",); } } fn set_clock_source_index(device_id: AudioDeviceID, index: u32) -> bool { - debug!("Changing capture device clock source to item with index {index}"); + debug!("Changing capture device clock source to item with index {index}."); let property_address = AudioObjectPropertyAddress { mSelector: kAudioDevicePropertyClockSource, mScope: kAudioObjectPropertyScopeGlobal, @@ -1110,7 +1131,7 @@ fn set_clock_source_index(device_id: AudioDeviceID, index: u32) -> bool { ) }; if status != 0 { - warn!("Unable to set clock source, error code: {status}"); + warn!("Unable to set clock source, error code: {status}."); return false; } true @@ -1164,15 +1185,15 @@ fn get_clock_source_names_and_ids(device_id: AudioDeviceID) -> (Vec, Vec ) }; if status as u32 == kAudioCodecUnknownPropertyError { - info!("The capture device has no clock source control"); + info!("The capture device has no clock source control."); return (names, ids); } if status != 0 { - warn!("Unable to read number of clock sources, error code: {status}"); + warn!("Unable to read number of clock sources, error code: {status}."); return (names, ids); } let nbr_items = data_size / mem::size_of::() as u32; - debug!("Capture device has {nbr_items} clock sources"); + debug!("Capture device has {nbr_items} clock sources."); if nbr_items > 0 { let mut sources = vec![0u32; nbr_items as usize]; let status = unsafe { @@ -1186,7 +1207,7 @@ fn get_clock_source_names_and_ids(device_id: AudioDeviceID) -> (Vec, Vec ) }; if status != 0 { - warn!("Unable to list clock sources, error code: {status}"); + warn!("Unable to list clock sources, error code: {status}."); return (names, ids); } @@ -1197,7 +1218,7 @@ fn get_clock_source_names_and_ids(device_id: AudioDeviceID) -> (Vec, Vec } } debug!( - "Available capture device clock source ids: {:?}, names: {:?}", + "Available capture device clock source ids: {:?}, names: {:?}.", ids, names ); (names, ids) @@ -1248,11 +1269,11 @@ fn configure_pitch_control(device_id: AudioDeviceID) -> bool { } match names.iter().position(|n| n == "Internal Adjustable") { Some(idx) => { - info!("The capture device supports pitch control"); + info!("The capture device supports pitch control."); set_clock_source_index(device_id, ids[idx]) } None => { - info!("The capture device does not support pitch control"); + info!("The capture device does not support pitch control."); false } } diff --git a/src/countertimer.rs b/src/countertimer.rs index 8cb4c2a..07661cc 100644 --- a/src/countertimer.rs +++ b/src/countertimer.rs @@ -4,9 +4,6 @@ use crate::ProcessingState; use std::collections::VecDeque; use std::time::{Duration, Instant}; -/// A counter for watching if the signal has been silent -/// for longer than a given limit. - pub struct DeviceBufferEstimator { update_time: Instant, frames: usize, @@ -38,6 +35,8 @@ impl DeviceBufferEstimator { } } +/// A counter for watching if the signal has been silent +/// for longer than a given limit. pub struct SilenceCounter { silence_threshold: PrcFmt, silence_limit_nbr: usize, diff --git a/src/cpaldevice.rs b/src/cpaldevice.rs index b86e933..bbb06a3 100644 --- a/src/cpaldevice.rs +++ b/src/cpaldevice.rs @@ -14,7 +14,6 @@ use parking_lot::{RwLock, RwLockUpgradableReadGuard}; use rubato::VecResampler; use std::collections::VecDeque; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::mpsc; use std::sync::{Arc, Barrier}; use std::thread; use std::time; @@ -201,7 +200,7 @@ where impl PlaybackDevice for CpalPlaybackDevice { fn start( &mut self, - channel: mpsc::Receiver, + channel: crossbeam_channel::Receiver, barrier: Arc, status_channel: crossbeam_channel::Sender, playback_status: Arc>, @@ -235,7 +234,7 @@ impl PlaybackDevice for CpalPlaybackDevice { } let scalefactor = PrcFmt::coerce(2.0).powi(bits_per_sample - 1); - let (tx_dev, rx_dev) = mpsc::sync_channel(1); + let (tx_dev, rx_dev) = crossbeam_channel::bounded(1); let buffer_fill = Arc::new(AtomicUsize::new(0)); let buffer_fill_clone = buffer_fill.clone(); let mut buffer_avg = countertimer::Averager::new(); @@ -382,7 +381,7 @@ impl PlaybackDevice for CpalPlaybackDevice { playback_status.signal_peak.add_record(chunk_stats.peak_linear()); } buffer_avg.add_value( - (buffer_fill.load(Ordering::Relaxed) / channels_clone) + (buffer_fill.load(Ordering::Relaxed) / channels_clone + channel.len() * chunksize_clone) as f64, ); if adjust @@ -468,10 +467,10 @@ where impl CaptureDevice for CpalCaptureDevice { fn start( &mut self, - channel: mpsc::SyncSender, + channel: crossbeam_channel::Sender, barrier: Arc, status_channel: crossbeam_channel::Sender, - command_channel: mpsc::Receiver, + command_channel: crossbeam_channel::Receiver, capture_status: Arc>, _processing_params: Arc, ) -> Res>> { @@ -506,8 +505,8 @@ impl CaptureDevice for CpalCaptureDevice { Err(_err) => {} } let scalefactor = PrcFmt::coerce(2.0).powi(bits_per_sample - 1); - let (tx_dev_i, rx_dev_i) = mpsc::sync_channel(1); - let (tx_dev_f, rx_dev_f) = mpsc::sync_channel(1); + let (tx_dev_i, rx_dev_i) = crossbeam_channel::bounded(1); + let (tx_dev_f, rx_dev_f) = crossbeam_channel::bounded(1); let stream = match sample_format { SampleFormat::S16LE => { trace!("Build i16 input stream"); @@ -593,8 +592,8 @@ impl CaptureDevice for CpalCaptureDevice { } } } - Err(mpsc::TryRecvError::Empty) => {} - Err(mpsc::TryRecvError::Disconnected) => { + Err(crossbeam_channel::TryRecvError::Empty) => {} + Err(crossbeam_channel::TryRecvError::Disconnected) => { error!("Command channel was closed"); break; } diff --git a/src/dither.rs b/src/dither.rs index 34c7966..1ecb2af 100644 --- a/src/dither.rs +++ b/src/dither.rs @@ -552,7 +552,7 @@ impl<'a> Dither<'a> { } } -impl<'a> Filter for Dither<'a> { +impl Filter for Dither<'_> { fn name(&self) -> &str { &self.name } diff --git a/src/filedevice.rs b/src/filedevice.rs index fffa0eb..b58bd7b 100644 --- a/src/filedevice.rs +++ b/src/filedevice.rs @@ -11,7 +11,6 @@ use std::fs::OpenOptions; use std::io::{stdin, stdout, Write}; #[cfg(target_os = "linux")] use std::os::unix::fs::OpenOptionsExt; -use std::sync::mpsc; use std::sync::{Arc, Barrier}; use std::thread; use std::time::Duration; @@ -74,9 +73,9 @@ pub struct FileCaptureDevice { } struct CaptureChannels { - audio: mpsc::SyncSender, + audio: crossbeam_channel::Sender, status: crossbeam_channel::Sender, - command: mpsc::Receiver, + command: crossbeam_channel::Receiver, } struct CaptureParams { @@ -111,7 +110,7 @@ pub trait Reader { impl PlaybackDevice for FilePlaybackDevice { fn start( &mut self, - channel: mpsc::Receiver, + channel: crossbeam_channel::Receiver, barrier: Arc, status_channel: crossbeam_channel::Sender, playback_status: Arc>, @@ -321,8 +320,8 @@ fn capture_loop( } } } - Err(mpsc::TryRecvError::Empty) => {} - Err(mpsc::TryRecvError::Disconnected) => { + Err(crossbeam_channel::TryRecvError::Empty) => {} + Err(crossbeam_channel::TryRecvError::Disconnected) => { error!("Command channel was closed"); break; } @@ -541,10 +540,10 @@ fn capture_loop( impl CaptureDevice for FileCaptureDevice { fn start( &mut self, - channel: mpsc::SyncSender, + channel: crossbeam_channel::Sender, barrier: Arc, status_channel: crossbeam_channel::Sender, - command_channel: mpsc::Receiver, + command_channel: crossbeam_channel::Receiver, capture_status: Arc>, _processing_params: Arc, ) -> Res>> { @@ -685,7 +684,7 @@ fn send_silence( samples: usize, channels: usize, chunksize: usize, - audio_channel: &mpsc::SyncSender, + audio_channel: &crossbeam_channel::Sender, resampler: &mut Option>>, ) { let mut samples_left = samples; diff --git a/src/filters.rs b/src/filters.rs index 598fa48..37782d8 100644 --- a/src/filters.rs +++ b/src/filters.rs @@ -12,6 +12,7 @@ use crate::limiter; use crate::loudness; use crate::mixer; use crate::noisegate; +use crate::race; use rawsample::SampleReader; use std::collections::HashMap; use std::fs::File; @@ -415,6 +416,14 @@ impl Pipeline { ); Box::new(gate) as Box } + config::Processor::RACE { parameters, .. } => { + let race = race::RACE::from_config( + &step.name, + parameters, + conf.devices.samplerate, + ); + Box::new(race) as Box + } }; steps.push(PipelineStep::ProcessorStep(proc)); } diff --git a/src/generatordevice.rs b/src/generatordevice.rs index a2b3cd9..542a2b4 100644 --- a/src/generatordevice.rs +++ b/src/generatordevice.rs @@ -2,7 +2,6 @@ use crate::audiodevice::*; use crate::config; use std::f64::consts::PI; -use std::sync::mpsc; use std::sync::{Arc, Barrier}; use std::thread; @@ -101,9 +100,9 @@ pub struct GeneratorDevice { } struct CaptureChannels { - audio: mpsc::SyncSender, + audio: crossbeam_channel::Sender, status: crossbeam_channel::Sender, - command: mpsc::Receiver, + command: crossbeam_channel::Receiver, } struct GeneratorParams { @@ -158,8 +157,8 @@ fn capture_loop(params: GeneratorParams, msg_channels: CaptureChannels) { Ok(CommandMessage::SetSpeed { .. }) => { warn!("Signal generator does not support rate adjust. Ignoring request."); } - Err(mpsc::TryRecvError::Empty) => {} - Err(mpsc::TryRecvError::Disconnected) => { + Err(crossbeam_channel::TryRecvError::Empty) => {} + Err(crossbeam_channel::TryRecvError::Disconnected) => { error!("Command channel was closed"); break; } @@ -199,10 +198,10 @@ fn capture_loop(params: GeneratorParams, msg_channels: CaptureChannels) { impl CaptureDevice for GeneratorDevice { fn start( &mut self, - channel: mpsc::SyncSender, + channel: crossbeam_channel::Sender, barrier: Arc, status_channel: crossbeam_channel::Sender, - command_channel: mpsc::Receiver, + command_channel: crossbeam_channel::Receiver, capture_status: Arc>, _processing_params: Arc, ) -> Res>> { diff --git a/src/lib.rs b/src/lib.rs index 5ec12c8..7569de1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -138,6 +138,7 @@ pub mod noisegate; pub mod processing; #[cfg(feature = "pulse-backend")] pub mod pulsedevice; +pub mod race; #[cfg(feature = "websocket")] pub mod socketserver; pub mod statefile; diff --git a/src/mixer.rs b/src/mixer.rs index d7525ce..3eb3f44 100644 --- a/src/mixer.rs +++ b/src/mixer.rs @@ -108,6 +108,8 @@ impl Mixer { pub fn validate_mixer(mixer_config: &config::Mixer) -> Res<()> { let chan_in = mixer_config.channels.r#in; let chan_out = mixer_config.channels.out; + let mut output_channels: Vec = Vec::with_capacity(chan_out); + let mut input_channels: Vec = Vec::with_capacity(chan_in); for mapping in mixer_config.mapping.iter() { if mapping.dest >= chan_out { let msg = format!( @@ -117,6 +119,15 @@ pub fn validate_mixer(mixer_config: &config::Mixer) -> Res<()> { ); return Err(config::ConfigError::new(&msg).into()); } + if output_channels.contains(&mapping.dest) { + let msg = format!( + "There is more than one mapping for destination channel {}", + mapping.dest, + ); + return Err(config::ConfigError::new(&msg).into()); + } + output_channels.push(mapping.dest); + input_channels.clear(); for source in mapping.sources.iter() { if source.channel >= chan_in { let msg = format!( @@ -126,6 +137,13 @@ pub fn validate_mixer(mixer_config: &config::Mixer) -> Res<()> { ); return Err(config::ConfigError::new(&msg).into()); } + if input_channels.contains(&source.channel) { + let msg = format!( + "Input channel {} is listed mote than once for destination channel {}", + source.channel, mapping.dest, + ); + return Err(config::ConfigError::new(&msg).into()); + } } } Ok(()) diff --git a/src/processing.rs b/src/processing.rs index 9d5a1e9..99e56fb 100644 --- a/src/processing.rs +++ b/src/processing.rs @@ -5,16 +5,15 @@ use crate::ProcessingParameters; use audio_thread_priority::{ demote_current_thread_from_real_time, promote_current_thread_to_real_time, }; -use std::sync::mpsc; use std::sync::{Arc, Barrier}; use std::thread; pub fn run_processing( conf_proc: config::Configuration, barrier_proc: Arc, - tx_pb: mpsc::SyncSender, - rx_cap: mpsc::Receiver, - rx_pipeconf: mpsc::Receiver<(config::ConfigChange, config::Configuration)>, + tx_pb: crossbeam_channel::Sender, + rx_cap: crossbeam_channel::Receiver, + rx_pipeconf: crossbeam_channel::Receiver<(config::ConfigChange, config::Configuration)>, processing_params: Arc, ) -> thread::JoinHandle<()> { thread::spawn(move || { diff --git a/src/pulsedevice.rs b/src/pulsedevice.rs index ab06f80..6f0dc5c 100644 --- a/src/pulsedevice.rs +++ b/src/pulsedevice.rs @@ -10,7 +10,6 @@ use crate::conversions::{buffer_to_chunk_rawbytes, chunk_to_buffer_rawbytes}; use crate::countertimer; use parking_lot::{RwLock, RwLockUpgradableReadGuard}; use rubato::VecResampler; -use std::sync::mpsc; use std::sync::{Arc, Barrier}; use std::thread; use std::time::{Duration, Instant}; @@ -132,7 +131,7 @@ fn open_pulse( impl PlaybackDevice for PulsePlaybackDevice { fn start( &mut self, - channel: mpsc::Receiver, + channel: crossbeam_channel::Receiver, barrier: Arc, status_channel: crossbeam_channel::Sender, playback_status: Arc>, @@ -257,10 +256,10 @@ fn nbr_capture_bytes( impl CaptureDevice for PulseCaptureDevice { fn start( &mut self, - channel: mpsc::SyncSender, + channel: crossbeam_channel::Sender, barrier: Arc, status_channel: crossbeam_channel::Sender, - command_channel: mpsc::Receiver, + command_channel: crossbeam_channel::Receiver, capture_status: Arc>, _processing_params: Arc, ) -> Res>> { @@ -341,8 +340,8 @@ impl CaptureDevice for PulseCaptureDevice { } } } - Err(mpsc::TryRecvError::Empty) => {} - Err(mpsc::TryRecvError::Disconnected) => { + Err(crossbeam_channel::TryRecvError::Empty) => {} + Err(crossbeam_channel::TryRecvError::Disconnected) => { error!("Command channel was closed"); break; } diff --git a/src/race.rs b/src/race.rs new file mode 100644 index 0000000..c30e5ed --- /dev/null +++ b/src/race.rs @@ -0,0 +1,182 @@ +// RACE, recursive ambiophonic crosstalk eliminator + +use crate::audiodevice::AudioChunk; +use crate::basicfilters::Delay; +use crate::basicfilters::Gain; +use crate::config; +use crate::config::DelayParameters; +use crate::config::GainParameters; +use crate::filters::Filter; +use crate::filters::Processor; +use crate::PrcFmt; +use crate::Res; + +//#[derive(Debug)] +pub struct RACE { + pub name: String, + pub channels: usize, + pub channel_a: usize, + pub channel_b: usize, + pub feedback_a: PrcFmt, + pub feedback_b: PrcFmt, + pub delay_a: Delay, + pub delay_b: Delay, + pub gain: Gain, + pub samplerate: usize, +} + +fn delay_config(config: &config::RACEParameters, samplerate: usize) -> DelayParameters { + // compensate the delay by subtracting one sample period from the delay, clamp at zero + let sample_period_in_delay_unit = match config.delay_unit() { + config::TimeUnit::Microseconds => 1000000.0 / samplerate as PrcFmt, + config::TimeUnit::Milliseconds => 1000.0 / samplerate as PrcFmt, + config::TimeUnit::Millimetres => 343.0 * 1000.0 / samplerate as PrcFmt, + config::TimeUnit::Samples => 1.0, + }; + let compensated_delay = (config.delay - sample_period_in_delay_unit).max(0.0); + + config::DelayParameters { + delay: compensated_delay, + unit: config.delay_unit, + subsample: config.subsample_delay, + } +} + +fn gain_config(config: &config::RACEParameters) -> GainParameters { + config::GainParameters { + gain: -config.attenuation, + scale: Some(config::GainScale::Decibel), + inverted: Some(true), + mute: Some(false), + } +} + +impl RACE { + /// Creates a RACE processor from a config struct + pub fn from_config(name: &str, config: config::RACEParameters, samplerate: usize) -> Self { + let name = name.to_string(); + let channels = config.channels; + + debug!("Creating RACE '{}', channels: {}, channel_a: {}, channel_b: {}, delay: {} {:?}, subsample: {}, attenuation: {}", + name, channels, config.channel_a, config.channel_b, config.delay, config.delay_unit(), config.subsample_delay(), config.attenuation); + let delayconf = delay_config(&config, samplerate); + let delay_a = Delay::from_config("Delay A", samplerate, delayconf.clone()); + let delay_b = Delay::from_config("Delay B", samplerate, delayconf); + + let gainconfig = gain_config(&config); + let gain = Gain::from_config("Gain", gainconfig); + + // sort channel numbers + let channel_a = config.channel_a.min(config.channel_b); + let channel_b = config.channel_a.max(config.channel_b); + + RACE { + name, + channels, + samplerate, + channel_a, + channel_b, + delay_a, + delay_b, + gain, + feedback_a: 0.0, + feedback_b: 0.0, + } + } +} + +impl Processor for RACE { + fn name(&self) -> &str { + &self.name + } + + /// Apply a RACE processor to an AudioChunk, modifying it in-place. + fn process_chunk(&mut self, input: &mut AudioChunk) -> Res<()> { + let (first, second) = input.waveforms.split_at_mut(self.channel_b); + let channel_a = &mut first[self.channel_a]; + let channel_b = &mut second[0]; + if channel_a.is_empty() || channel_b.is_empty() { + return Ok(()); + } + for (value_a, value_b) in channel_a.iter_mut().zip(channel_b.iter_mut()) { + // todo math + let added_a = *value_a + self.feedback_b; + let added_b = *value_b + self.feedback_a; + self.feedback_a = self.delay_a.process_single(added_a); + self.feedback_b = self.delay_b.process_single(added_b); + self.feedback_a = self.gain.process_single(self.feedback_a); + self.feedback_b = self.gain.process_single(self.feedback_b); + *value_a = added_a; + *value_b = added_b; + } + Ok(()) + } + + fn update_parameters(&mut self, config: config::Processor) { + if let config::Processor::RACE { + parameters: config, .. + } = config + { + self.channels = config.channels; + + let delayparams = delay_config(&config, self.samplerate); + let delayconf = config::Filter::Delay { + parameters: delayparams, + description: None, + }; + self.delay_a.update_parameters(delayconf.clone()); + self.delay_b.update_parameters(delayconf); + + let gainparams = gain_config(&config); + let gainconfig = config::Filter::Gain { + description: None, + parameters: gainparams, + }; + self.gain.update_parameters(gainconfig); + + // sort channel numbers + self.channel_a = config.channel_a.min(config.channel_b); + self.channel_b = config.channel_a.max(config.channel_b); + + debug!("Updating RACE '{}', channels: {}, channel_a: {}, channel_b: {}, delay: {} {:?}, subsample: {}, attenuation: {}", + self.name, self.channels, config.channel_a, config.channel_b, config.delay, config.delay_unit(), config.subsample_delay(), config.attenuation); + } else { + // This should never happen unless there is a bug somewhere else + panic!("Invalid config change!"); + } + } +} + +/// Validate the RACE processor config, to give a helpful message intead of a panic. +pub fn validate_race(config: &config::RACEParameters) -> Res<()> { + let channels = config.channels; + if config.attenuation <= 0.0 { + let msg = "Attenuation value must be larger than zero."; + return Err(config::ConfigError::new(msg).into()); + } + if config.delay <= 0.0 { + let msg = "Delay value must be larger than zero."; + return Err(config::ConfigError::new(msg).into()); + } + if config.channel_a == config.channel_b { + let msg = "Channels a and b must be different"; + return Err(config::ConfigError::new(msg).into()); + } + if config.channel_a >= channels { + let msg = format!( + "Invalid channel a to process: {}, max is: {}.", + config.channel_a, + channels - 1 + ); + return Err(config::ConfigError::new(&msg).into()); + } + if config.channel_b >= channels { + let msg = format!( + "Invalid channel b to process: {}, max is: {}.", + config.channel_b, + channels - 1 + ); + return Err(config::ConfigError::new(&msg).into()); + } + Ok(()) +} diff --git a/src/socketserver.rs b/src/socketserver.rs index 0db3419..2c47cb5 100644 --- a/src/socketserver.rs +++ b/src/socketserver.rs @@ -10,7 +10,6 @@ use std::fs::File; use std::io::Read; use std::net::{TcpListener, TcpStream}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc; use std::sync::Arc; use std::thread; use std::time::{Duration, Instant}; @@ -37,7 +36,7 @@ pub struct SharedData { pub playback_status: Arc>, pub processing_params: Arc, pub processing_status: Arc>, - pub state_change_notify: mpsc::SyncSender<()>, + pub state_change_notify: crossbeam_channel::Sender<()>, pub state_file_path: Option, pub unsaved_state_change: Arc, } diff --git a/src/wasapidevice.rs b/src/wasapidevice.rs index e3a2cd0..26a0282 100644 --- a/src/wasapidevice.rs +++ b/src/wasapidevice.rs @@ -12,7 +12,6 @@ use rubato::VecResampler; use std::collections::VecDeque; use std::rc::Rc; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc; use std::sync::{Arc, Barrier, Mutex}; use std::thread; use std::time::Duration; @@ -140,7 +139,7 @@ fn get_supported_wave_format( } wasapi::ShareMode::Shared => match audio_client.is_supported(&wave_format, sharemode) { Ok(None) => { - debug!("Device supports format {:?}", wave_format); + debug!("Device supports format {:?}.", wave_format); Ok(wave_format) } Ok(Some(modified)) => { @@ -181,16 +180,16 @@ fn open_playback( let device = if let Some(name) = devname { let collection = wasapi::DeviceCollection::new(&wasapi::Direction::Render)?; debug!( - "Available playback devices: {:?}", + "Available playback devices: {:?}.", list_device_names_in_collection(&collection) ); collection.get_device_with_name(name)? } else { wasapi::get_default_device(&wasapi::Direction::Render)? }; - trace!("Found playback device {:?}", devname); + trace!("Found playback device {:?}.", devname); let mut audio_client = device.get_iaudioclient()?; - trace!("Got playback iaudioclient"); + trace!("Got playback iaudioclient."); let wave_format = get_supported_wave_format( &audio_client, sample_format, @@ -209,13 +208,13 @@ fn open_playback( false, )?; debug!( - "playback default period {}, min period {}, aligned period {}", + "Playback default period {}, min period {}, aligned period {}.", def_time, min_time, aligned_time ); - debug!("initialized playback audio client"); + debug!("Initialized playback audio client."); let handle = audio_client.set_get_eventhandle()?; let render_client = audio_client.get_audiorenderclient()?; - debug!("Opened Wasapi playback device {:?}", devname); + debug!("Opened Wasapi playback device {:?}.", devname); Ok((device, audio_client, render_client, handle, wave_format)) } @@ -241,14 +240,14 @@ fn open_capture( if !loopback { let collection = wasapi::DeviceCollection::new(&wasapi::Direction::Capture)?; debug!( - "Available capture devices: {:?}", + "Available capture devices: {:?}.", list_device_names_in_collection(&collection) ); collection.get_device_with_name(name)? } else { let collection = wasapi::DeviceCollection::new(&wasapi::Direction::Render)?; debug!( - "Available loopback capture (i.e. playback) devices: {:?}", + "Available loopback capture (i.e. playback) devices: {:?}.", list_device_names_in_collection(&collection) ); collection.get_device_with_name(name)? @@ -259,9 +258,9 @@ fn open_capture( wasapi::get_default_device(&wasapi::Direction::Render)? }; - trace!("Found capture device {:?}", devname); + trace!("Found capture device {:?}.", devname); let mut audio_client = device.get_iaudioclient()?; - trace!("Got capture iaudioclient"); + trace!("Got capture iaudioclient."); let wave_format = get_supported_wave_format( &audio_client, sample_format, @@ -271,7 +270,7 @@ fn open_capture( )?; let (def_time, min_time) = audio_client.get_periods()?; debug!( - "capture default period {}, min period {}", + "Capture default period {}, min period {}.", def_time, min_time ); audio_client.initialize_client( @@ -281,11 +280,11 @@ fn open_capture( &sharemode, loopback, )?; - debug!("initialized capture audio client"); + debug!("Initialized capture audio client."); let handle = audio_client.set_get_eventhandle()?; - trace!("capture got event handle"); + trace!("Capture got event handle."); let capture_client = audio_client.get_audiocaptureclient()?; - debug!("Opened Wasapi capture device {:?}", devname); + debug!("Opened Wasapi capture device {:?}.", devname); Ok((device, audio_client, capture_client, handle, wave_format)) } @@ -311,16 +310,18 @@ fn playback_loop( samplerate: f64, sync: PlaybackSync, mut ringbuffer: Caching>, false, true>, + target_level: usize, ) -> Res<()> { let mut buffer_free_frame_count = audio_client.get_bufferframecount()?; let mut sample_queue: VecDeque = VecDeque::with_capacity( - 4 * blockalign * (chunksize + 2 * buffer_free_frame_count as usize), + 4 * blockalign * (chunksize + 2 * buffer_free_frame_count as usize) + + target_level * blockalign, ); let tx_cb = sync.tx_cb; let mut callbacks = wasapi::EventCallbacks::new(); callbacks.set_disconnected_callback(move |reason| { - debug!("Disconnected, reason: {:?}", reason); + debug!("Disconnected, reason: {:?}.", reason); let simplereason = match reason { wasapi::DisconnectReason::FormatChanged => DisconnectReason::FormatChange, _ => DisconnectReason::Error, @@ -334,12 +335,12 @@ fn playback_loop( sessioncontrol.register_session_notification(callbacks_weak)?; let mut waited_millis = 0; - trace!("Waiting for data to start playback, will time out after one second"); + trace!("Waiting for data to start playback, will time out after one second."); while sync.rx_play.len() < 2 && waited_millis < 1000 { thread::sleep(Duration::from_millis(10)); waited_millis += 10; } - debug!("Waited for data for {} ms", waited_millis); + debug!("Waited for data for {} ms.", waited_millis); // Raise priority let _thread_handle = match promote_current_thread_to_real_time(0, 1) { @@ -349,7 +350,7 @@ fn playback_loop( } Err(err) => { warn!( - "Playback inner thread could not get real time priority, error: {}", + "Playback inner thread could not get real time priority, error: {}.", err ); None @@ -357,16 +358,17 @@ fn playback_loop( }; audio_client.start_stream()?; - let mut running = true; + let mut running = false; + let mut starting = true; let mut pos = 0; let mut device_prevtime = 0.0; let device_freq = clock.get_frequency()? as f64; loop { buffer_free_frame_count = audio_client.get_available_space_in_frames()?; - trace!("New buffer frame count {}", buffer_free_frame_count); + trace!("New buffer frame count {}.", buffer_free_frame_count); let mut device_time = pos as f64 / device_freq; if device_time == 0.0 && device_prevtime > 0.0 { - debug!("Failed to get accurate device time, skipping check for missing events"); + debug!("Failed to get accurate device time, skipping check for missing events."); // A zero value means that the device position read was delayed due to some // other high priority event, and an accurate reading could not be taken. // To avoid needless resets of the stream, set the position to the expected value, @@ -374,7 +376,7 @@ fn playback_loop( device_time = device_prevtime + buffer_free_frame_count as f64 / samplerate; } trace!( - "Device time counted up by {:.4} s", + "Device time counted up by {:.4} s.", device_time - device_prevtime ); if buffer_free_frame_count > 0 @@ -382,7 +384,7 @@ fn playback_loop( > 1.75 * (buffer_free_frame_count as f64 / samplerate) { warn!( - "Missing event! Resetting stream. Interval {:.4} s, expected {:.4} s", + "Missing event! Resetting stream. Interval {:.4} s, expected {:.4} s.", device_time - device_prevtime, buffer_free_frame_count as f64 / samplerate ); @@ -393,20 +395,28 @@ fn playback_loop( device_prevtime = device_time; while sample_queue.len() < (blockalign * buffer_free_frame_count as usize) { - trace!("playback loop needs more samples, reading from channel"); + trace!("Playback loop needs more samples, reading from channel."); match sync.rx_play.try_recv() { Ok(PlaybackDeviceMessage::Data(bytes)) => { - trace!("got chunk"); - for element in ringbuffer.pop_iter().take(bytes) { - sample_queue.push_back(element); - } + trace!("Received chunk."); if !running { running = true; - info!("Restarting playback after buffer underrun"); + if starting { + starting = false; + } else { + warn!("Restarting playback after buffer underrun."); + } + debug!("Inserting {target_level} silent frames to reach target delay."); + for _ in 0..(blockalign * target_level) { + sample_queue.push_back(0); + } + } + for element in ringbuffer.pop_iter().take(bytes) { + sample_queue.push_back(element); } } Ok(PlaybackDeviceMessage::Stop) => { - debug!("Stopping inner playback loop"); + debug!("Stopping inner playback loop."); audio_client.stop_stream()?; return Ok(()); } @@ -418,12 +428,12 @@ fn playback_loop( } if running { running = false; - warn!("Playback interrupted, no data available"); + warn!("Playback interrupted, no data available."); } } Err(TryRecvError::Disconnected) => { - error!("Channel is closed"); - return Err(DeviceError::new("Data channel closed").into()); + error!("Channel is closed."); + return Err(DeviceError::new("Data channel closed.").into()); } } } @@ -436,7 +446,7 @@ fn playback_loop( if let Ok(mut estimator) = sync.bufferfill.try_lock() { estimator.add(curr_buffer_fill) } - trace!("write ok"); + trace!("Write ok."); //println!("{} bef",prev_inst.elapsed().as_micros()); if handle.wait_for_event(1000).is_err() { error!("Error on playback, stopping stream"); @@ -467,7 +477,7 @@ fn capture_loop( let mut callbacks = wasapi::EventCallbacks::new(); callbacks.set_disconnected_callback(move |reason| { - debug!("Capture disconnected, reason: {:?}", reason); + debug!("Capture disconnected, reason: {:?}.", reason); let simplereason = match reason { wasapi::DisconnectReason::FormatChanged => DisconnectReason::FormatChange, _ => DisconnectReason::Error, @@ -493,34 +503,34 @@ fn capture_loop( } Err(err) => { warn!( - "Capture inner thread could not get real time priority, error: {}", + "Capture inner thread could not get real time priority, error: {}.", err ); None } }; - trace!("Starting capture stream"); + trace!("Starting capture stream."); audio_client.start_stream()?; - trace!("Started capture stream"); + trace!("Started capture stream."); loop { - trace!("capturing"); + trace!("Capturing."); if stop_signal.load(Ordering::Relaxed) { - debug!("Stopping inner capture loop on request"); + debug!("Stopping inner capture loop on request."); audio_client.stop_stream()?; return Ok(()); } if handle.wait_for_event(250).is_err() { - debug!("Timeout on capture event"); + debug!("Timeout on capture event."); if !inactive { - warn!("No data received, pausing stream"); + warn!("No data received, pausing stream."); inactive = true; } match channels.tx_filled.try_send((chunk_nbr, 0)) { Ok(()) | Err(TrySendError::Full(_)) => {} Err(TrySendError::Disconnected(_)) => { - error!("Error sending, channel disconnected"); + error!("Error sending, channel disconnected."); audio_client.stop_stream()?; - return Err(DeviceError::new("Channel disconnected").into()); + return Err(DeviceError::new("Channel disconnected.").into()); } } chunk_nbr += 1; @@ -528,7 +538,7 @@ fn capture_loop( } if inactive { - info!("Data received, resuming stream"); + info!("Data received, resuming stream."); inactive = false; } let available_frames = match capture_client.get_next_nbr_frames()? { @@ -536,7 +546,7 @@ fn capture_loop( None => audio_client.get_bufferframecount()?, }; - trace!("Available frames from capture dev: {}", available_frames); + trace!("Available frames from capture dev: {}.", available_frames); // If no available frames, just skip the rest of this loop iteration if available_frames > 0 { @@ -546,12 +556,12 @@ fn capture_loop( capture_client.read_from_device(&mut data[0..nbr_bytes])?; if nbr_frames_read != available_frames { warn!( - "Expected {} frames, got {}", + "Expected {} frames, got {}.", available_frames, nbr_frames_read ); } if flags.silent { - debug!("Captured a buffer marked as silent"); + debug!("Captured a buffer marked as silent."); data.iter_mut().take(nbr_bytes).for_each(|val| *val = 0); } // Disabled since VB-Audio Cable gives this all the time @@ -567,22 +577,22 @@ fn capture_loop( /* if let Some(extra_frames) = capture_client.get_next_nbr_frames()? { if extra_frames > 0 { - debug!("Workaround, reading {} frames more", extra_frames); + trace!("Workaround, reading {} frames more.", extra_frames); let nbr_bytes_extra = extra_frames as usize * blockalign; let (nbr_frames_read, flags) = capture_client .read_from_device(&mut data[nbr_bytes..(nbr_bytes + nbr_bytes_extra)])?; if nbr_frames_read != extra_frames { - warn!("Expected {} frames, got {}", extra_frames, nbr_frames_read); + warn!("Expected {} frames, got {}.", extra_frames, nbr_frames_read); } if flags.silent { - debug!("Captured a buffer marked as silent"); + debug!("Captured a buffer marked as silent."); data.iter_mut() .skip(nbr_bytes) .take(nbr_bytes_extra) .for_each(|val| *val = 0); } if flags.data_discontinuity { - warn!("Capture device reported a buffer overrun"); + warn!("Capture device reported a buffer overrun."); } nbr_bytes += nbr_bytes_extra; } @@ -599,14 +609,14 @@ fn capture_loop( Ok(()) => {} Err(TrySendError::Full((nbr, length))) => { warn!( - "Notification channel full, dropping chunk nbr {} with len {}", + "Notification channel full, dropping chunk nbr {} with len {}.", nbr, length ); } Err(TrySendError::Disconnected(_)) => { - error!("Error sending, channel disconnected"); + error!("Error sending, channel disconnected."); audio_client.stop_stream()?; - return Err(DeviceError::new("Channel disconnected").into()); + return Err(DeviceError::new("Channel disconnected.").into()); } } chunk_nbr += 1; @@ -618,7 +628,7 @@ fn capture_loop( impl PlaybackDevice for WasapiPlaybackDevice { fn start( &mut self, - channel: mpsc::Receiver, + channel: crossbeam_channel::Receiver, barrier: Arc, status_channel: crossbeam_channel::Sender, playback_status: Arc>, @@ -660,7 +670,7 @@ impl PlaybackDevice for WasapiPlaybackDevice { let mut rate_controller = PIRateController::new_with_default_gains(samplerate, adjust_period as f64, target_level); - trace!("Build output stream"); + trace!("Build output stream."); let mut conversion_result; let ringbuffer = HeapRb::::new(channels * sample_format.bytes_per_sample() * ( 2 * chunksize + 2048 )); @@ -703,6 +713,7 @@ impl PlaybackDevice for WasapiPlaybackDevice { samplerate as f64, sync, device_consumer, + target_level, ); if let Err(err) = result { let msg = format!("Playback failed with error: {}", err); @@ -732,7 +743,7 @@ impl PlaybackDevice for WasapiPlaybackDevice { Ok(()) => {} Err(_err) => {} } - debug!("Playback device ready and waiting"); + debug!("Playback device ready and waiting."); barrier.wait(); let thread_handle = match promote_current_thread_to_real_time(0, 1) { Ok(h) => { @@ -741,7 +752,7 @@ impl PlaybackDevice for WasapiPlaybackDevice { } Err(err) => { warn!( - "Playback outer thread could not get real time priority, error: {}", + "Playback outer thread could not get real time priority, error: {}.", err ); None @@ -778,7 +789,8 @@ impl PlaybackDevice for WasapiPlaybackDevice { } match channel.recv() { Ok(AudioMessage::Audio(chunk)) => { - buffer_avg.add_value(buffer_fill.try_lock().map(|b| b.estimate() as f64).unwrap_or_default()); + let estimated_buffer_fill = buffer_fill.try_lock().map(|b| b.estimate() as f64).unwrap_or_default(); + buffer_avg.add_value(estimated_buffer_fill + (channel.len() * chunksize) as f64); if adjust && timer.larger_than_millis((1000.0 * adjust_period) as u64) { if let Some(av_delay) = buffer_avg.average() { @@ -786,7 +798,7 @@ impl PlaybackDevice for WasapiPlaybackDevice { timer.restart(); buffer_avg.restart(); debug!( - "Current buffer level {:.1}, set capture rate to {:.4}%", + "Current buffer level {:.1}, set capture rate to {:.4}%.", av_delay, 100.0 * speed ); @@ -813,7 +825,7 @@ impl PlaybackDevice for WasapiPlaybackDevice { .add_record(chunk_stats.peak_linear()); } else { - xtrace!("playback status blocked, skip rms update"); + xtrace!("Playback status blocked, skip rms update."); } let pushed_bytes = device_producer.push_slice(&buf[0..conversion_result.0]); if pushed_bytes < conversion_result.0 { @@ -826,7 +838,7 @@ impl PlaybackDevice for WasapiPlaybackDevice { match tx_dev.send(PlaybackDeviceMessage::Data(pushed_bytes)) { Ok(_) => {} Err(err) => { - error!("Playback device channel error: {}", err); + error!("Playback device channel error: {}.", err); send_error_or_playbackformatchange( &status_channel, &rx_disconnectreason, @@ -837,7 +849,7 @@ impl PlaybackDevice for WasapiPlaybackDevice { } } Ok(AudioMessage::Pause) => { - trace!("Pause message received"); + trace!("Pause message received."); } Ok(AudioMessage::EndOfStream) => { status_channel @@ -846,7 +858,7 @@ impl PlaybackDevice for WasapiPlaybackDevice { break; } Err(err) => { - error!("Message channel error: {}", err); + error!("Message channel error: {}.", err); send_error_or_playbackformatchange( &status_channel, &rx_disconnectreason, @@ -868,11 +880,11 @@ impl PlaybackDevice for WasapiPlaybackDevice { } match tx_dev.send(PlaybackDeviceMessage::Stop) { Ok(_) => { - debug!("Wait for inner playback thread to exit"); + debug!("Wait for inner playback thread to exit."); innerhandle.join().unwrap_or(()); } Err(_) => { - warn!("Inner playback thread already stopped") + warn!("Inner playback thread already stopped.") } } })?; @@ -903,11 +915,11 @@ fn send_error_or_playbackformatchange( err: String, ) { if check_for_format_change(rx) { - debug!("Send PlaybackFormatChange"); + debug!("Send PlaybackFormatChange."); tx.send(StatusMessage::PlaybackFormatChange(0)) .unwrap_or(()); } else { - debug!("Send PlaybackError"); + debug!("Send PlaybackError."); tx.send(StatusMessage::PlaybackError(err)).unwrap_or(()); } } @@ -918,10 +930,10 @@ fn send_error_or_captureformatchange( err: String, ) { if check_for_format_change(rx) { - debug!("Send CaptureFormatChange"); + debug!("Send CaptureFormatChange."); tx.send(StatusMessage::CaptureFormatChange(0)).unwrap_or(()); } else { - debug!("Send CaptureError"); + debug!("Send CaptureError."); tx.send(StatusMessage::CaptureError(err)).unwrap_or(()); } } @@ -932,7 +944,7 @@ fn nbr_capture_frames( ) -> usize { if let Some(resampl) = &resampler { #[cfg(feature = "debug")] - trace!("Resampler needs {} frames", resampl.input_frames_next()); + trace!("Resampler needs {} frames.", resampl.input_frames_next()); resampl.input_frames_next() } else { capture_frames @@ -943,10 +955,10 @@ fn nbr_capture_frames( impl CaptureDevice for WasapiCaptureDevice { fn start( &mut self, - channel: mpsc::SyncSender, + channel: crossbeam_channel::Sender, barrier: Arc, status_channel: crossbeam_channel::Sender, - command_channel: mpsc::Receiver, + command_channel: crossbeam_channel::Receiver, capture_status: Arc>, _processing_params: Arc, ) -> Res>> { @@ -982,12 +994,13 @@ impl CaptureDevice for WasapiCaptureDevice { let (tx_dev, rx_dev) = bounded(channel_capacity); let (tx_state_dev, rx_state_dev) = bounded(0); + let (tx_start_inner, rx_start_inner) = bounded(0); let (tx_disconnectreason, rx_disconnectreason) = unbounded(); let ringbuffer = HeapRb::::new(channels * bytes_per_sample * ( 2 * chunksize + 2048 )); let (device_producer, mut device_consumer) = ringbuffer.split(); - trace!("Build input stream"); + trace!("Build input stream."); // wasapi device loop let stop_signal = Arc::new(AtomicBool::new(false)); let stop_signal_inner = stop_signal.clone(); @@ -1012,6 +1025,7 @@ impl CaptureDevice for WasapiCaptureDevice { tx_filled: tx_dev, ringbuf: device_producer, }; + let _rx_res = rx_start_inner.recv(); let result = capture_loop(audio_client, capture_client, handle, channels, tx_disconnectreason, blockalign as usize, stop_signal_inner); if let Err(err) = result { let msg = format!("Capture failed with error: {}", err); @@ -1049,7 +1063,7 @@ impl CaptureDevice for WasapiCaptureDevice { let mut data_buffer = vec![0u8; 4 * blockalign * capture_frames]; let mut expected_chunk_nbr = 0; let mut channel_mask = vec![true; channels]; - debug!("Capture device ready and waiting"); + debug!("Capture device ready and waiting."); match status_channel.send(StatusMessage::CaptureReady) { Ok(()) => {} Err(_err) => {} @@ -1062,17 +1076,18 @@ impl CaptureDevice for WasapiCaptureDevice { } Err(err) => { warn!( - "Capture outer thread could not get real time priority, error: {}", + "Capture outer thread could not get real time priority, error: {}.", err ); None } }; + let _send_res = tx_start_inner.send(()); debug!("Capture device starts now!"); loop { match command_channel.try_recv() { Ok(CommandMessage::Exit) => { - debug!("Exit message received, sending EndOfStream"); + debug!("Exit message received, sending EndOfStream."); let msg = AudioMessage::EndOfStream; channel.send(msg).unwrap_or(()); status_channel.send(StatusMessage::CaptureDone).unwrap_or(()); @@ -1080,12 +1095,12 @@ impl CaptureDevice for WasapiCaptureDevice { } Ok(CommandMessage::SetSpeed { speed }) => { rate_adjust = speed; - debug!("Requested to adjust capture speed to {}", speed); + debug!("Requested to adjust capture speed to {}.", speed); if let Some(resampl) = &mut resampler { - debug!("Adjusting resampler rate to {}", speed); + debug!("Adjusting resampler rate to {}.", speed); if async_src { if resampl.set_resample_ratio_relative(speed, true).is_err() { - debug!("Failed to set resampling speed to {}", speed); + debug!("Failed to set resampling speed to {}.", speed); } } else { @@ -1093,9 +1108,9 @@ impl CaptureDevice for WasapiCaptureDevice { } } }, - Err(mpsc::TryRecvError::Empty) => {} - Err(mpsc::TryRecvError::Disconnected) => { - error!("Command channel was closed"); + Err(crossbeam_channel::TryRecvError::Empty) => {} + Err(crossbeam_channel::TryRecvError::Disconnected) => { + error!("Command channel was closed."); break; } }; @@ -1109,7 +1124,7 @@ impl CaptureDevice for WasapiCaptureDevice { Err(TryRecvError::Empty) => {} Err(TryRecvError::Disconnected) => { channel.send(AudioMessage::EndOfStream).unwrap_or(()); - send_error_or_captureformatchange(&status_channel, &rx_disconnectreason, "Inner capture thread exited".to_string()); + send_error_or_captureformatchange(&status_channel, &rx_disconnectreason, "Inner capture thread exited.".to_string()); break; } } @@ -1119,30 +1134,30 @@ impl CaptureDevice for WasapiCaptureDevice { ); let capture_bytes = blockalign * capture_frames; while device_consumer.occupied_len() < (blockalign * capture_frames) { - trace!("capture device needs more samples to make chunk, reading from channel"); + trace!("Capture device needs more samples to make chunk, reading from channel."); match rx_dev.recv() { Ok((chunk_nbr, data_bytes)) => { - trace!("got chunk, length {} bytes", data_bytes); + trace!("Received chunk, length {} bytes.", data_bytes); expected_chunk_nbr += 1; if data_bytes == 0 { if state != ProcessingState::Stalled { - trace!("capture device became inactive"); + trace!("Capture device became inactive."); saved_state = state; state = ProcessingState::Stalled; } break; } else if state == ProcessingState::Stalled { - trace!("capture device became active"); + trace!("Capture device became active."); state = saved_state; } if chunk_nbr > expected_chunk_nbr { - warn!("Samples were dropped, missing {} buffers", chunk_nbr - expected_chunk_nbr); + warn!("Samples were dropped, missing {} buffers.", chunk_nbr - expected_chunk_nbr); expected_chunk_nbr = chunk_nbr; } } Err(err) => { - error!("Channel is closed"); + error!("Channel is closed."); channel.send(AudioMessage::EndOfStream).unwrap_or(()); send_error_or_captureformatchange(&status_channel, &rx_disconnectreason, err.to_string()); return; @@ -1165,12 +1180,12 @@ impl CaptureDevice for WasapiCaptureDevice { capture_status.state = state; } else { - xtrace!("capture status upgrade blocked, skip update"); + xtrace!("Capture status upgrade blocked, skip update."); } } } else { - xtrace!("capture status blocked, skip update"); + xtrace!("Capture status blocked, skip update."); } watcher_averager.add_value(capture_frames); if watcher_averager.larger_than_millis(rate_measure_interval) @@ -1179,12 +1194,12 @@ impl CaptureDevice for WasapiCaptureDevice { watcher_averager.restart(); let measured_rate_f = samples_per_sec; debug!( - "Measured sample rate is {:.1} Hz", + "Measured sample rate is {:.1} Hz.", measured_rate_f ); let changed = valuewatcher.check_value(measured_rate_f as f32); if changed { - warn!("sample rate change detected, last rate was {} Hz", measured_rate_f); + warn!("Sample rate change detected, last rate was {} Hz.", measured_rate_f); if stop_on_rate_change { let msg = AudioMessage::EndOfStream; channel.send(msg).unwrap_or(()); @@ -1206,7 +1221,7 @@ impl CaptureDevice for WasapiCaptureDevice { capture_status.signal_peak.add_record(chunk_stats.peak_linear()); } else { - xtrace!("capture status blocked, skip rms update"); + xtrace!("Capture status blocked, skip rms update."); } value_range = chunk.maxval - chunk.minval; state = silence_counter.update(value_range); @@ -1248,7 +1263,7 @@ impl CaptureDevice for WasapiCaptureDevice { }; } stop_signal.store(true, Ordering::Relaxed); - debug!("Wait for inner capture thread to exit"); + debug!("Wait for inner capture thread to exit."); innerhandle.join().unwrap_or(()); capture_status.write().state = ProcessingState::Inactive; })?;