Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix premature pausing when reaching end of still-streaming stream #2106

Merged
merged 5 commits into from
May 12, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 42 additions & 5 deletions crates/re_smart_channel/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! A channel that keeps track of latency and queue length.

use std::sync::{
atomic::{AtomicU64, Ordering::Relaxed},
atomic::{AtomicBool, AtomicU64, Ordering::Relaxed},
Arc,
};

Expand Down Expand Up @@ -64,7 +64,12 @@ fn smart_channel_with_stats<T: Send>(
tx,
stats: stats.clone(),
};
let receiver = Receiver { rx, stats, source };
let receiver = Receiver {
rx,
stats,
source,
connected: AtomicBool::new(true),
};
(sender, receiver)
}

Expand Down Expand Up @@ -121,25 +126,57 @@ pub struct Receiver<T: Send> {
rx: crossbeam::channel::Receiver<(Instant, T)>,
stats: Arc<SharedStats>,
source: Source,
connected: AtomicBool,
}

impl<T: Send> Receiver<T> {
/// Are we still connected?
///
/// Once false, we will never be connected again: the source has run dry.
///
/// This is only updated once one of the receive methods fails.
pub fn is_connected(&self) -> bool {
self.connected.load(Relaxed)
}

pub fn recv(&self) -> Result<T, RecvError> {
let (sent, msg) = self.rx.recv()?;
let (sent, msg) = match self.rx.recv() {
Ok(x) => x,
Err(RecvError) => {
self.connected.store(false, Relaxed);
return Err(RecvError);
}
};
let latency_ns = sent.elapsed().as_nanos() as u64;
self.stats.latency_ns.store(latency_ns, Relaxed);
Ok(msg)
}

pub fn try_recv(&self) -> Result<T, TryRecvError> {
let (sent, msg) = self.rx.try_recv()?;
let (sent, msg) = match self.rx.try_recv() {
Ok(x) => x,
Err(err) => {
if err == TryRecvError::Disconnected {
self.connected.store(false, Relaxed);
}
return Err(err);
}
};
let latency_ns = sent.elapsed().as_nanos() as u64;
self.stats.latency_ns.store(latency_ns, Relaxed);
Ok(msg)
}

pub fn recv_timeout(&self, timeout: std::time::Duration) -> Result<T, RecvTimeoutError> {
let (sent, msg) = self.rx.recv_timeout(timeout)?;
let (sent, msg) = match self.rx.recv_timeout(timeout) {
Ok(x) => x,
Err(err) => {
if err == RecvTimeoutError::Disconnected {
self.connected.store(false, Relaxed);
}
return Err(err);
}
};
let latency_ns = sent.elapsed().as_nanos() as u64;
self.stats.latency_ns.store(latency_ns, Relaxed);
Ok(msg)
Expand Down
25 changes: 16 additions & 9 deletions crates/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ impl eframe::App for App {
log_db,
&self.re_ui,
&self.component_ui_registry,
self.rx.source(),
&self.rx,
);
}

Expand Down Expand Up @@ -964,7 +964,7 @@ impl AppState {
log_db: &LogDb,
re_ui: &re_ui::ReUi,
component_ui_registry: &ComponentUiRegistry,
data_source: &re_smart_channel::Source,
rx: &Receiver<LogMsg>,
) {
crate::profile_function!();

Expand All @@ -984,7 +984,7 @@ impl AppState {
let rec_cfg = recording_config_entry(
recording_configs,
selected_rec_id.clone(),
data_source,
rx.source(),
log_db,
);
let selected_app_id = log_db
Expand Down Expand Up @@ -1024,12 +1024,19 @@ impl AppState {
.blueprint_panel_and_viewport(&mut ctx, ui),
});

// move time last, so we get to see the first data first!
ctx.rec_cfg
.time_ctrl
.move_time(log_db.times_per_timeline(), ui.ctx().input(|i| i.stable_dt));
if ctx.rec_cfg.time_ctrl.play_state() == PlayState::Playing {
ui.ctx().request_repaint();
{
// We move the time at the very end of the frame,
// so we have one frame to see the first data before we move the time.
let dt = ui.ctx().input(|i| i.stable_dt);
let more_data_is_coming = rx.is_connected();
let needs_repaint = ctx.rec_cfg.time_ctrl.move_time(
log_db.times_per_timeline(),
dt,
more_data_is_coming,
);
if needs_repaint == re_viewer_context::NeedsRepaint::Yes {
ui.ctx().request_repaint();
}
}

if WATERMARK {
Expand Down
6 changes: 6 additions & 0 deletions crates/re_viewer_context/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ slotmap::new_key_type! {
pub struct DataBlueprintGroupHandle;
}

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum NeedsRepaint {
Yes,
No,
}

// ---------------------------------------------------------------------------

/// Profiling macro for feature "puffin"
Expand Down
35 changes: 26 additions & 9 deletions crates/re_viewer_context/src/time_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ use std::collections::{BTreeMap, BTreeSet};
use re_data_store::TimesPerTimeline;
use re_log_types::{Duration, TimeInt, TimeRange, TimeRangeF, TimeReal, TimeType, Timeline};

use crate::NeedsRepaint;

/// The time range we are currently zoomed in on.
#[derive(Clone, Copy, Debug, serde::Deserialize, serde::Serialize)]
pub struct TimeView {
Expand Down Expand Up @@ -112,11 +114,17 @@ impl Default for TimeControl {

impl TimeControl {
/// Update the current time
pub fn move_time(&mut self, times_per_timeline: &TimesPerTimeline, stable_dt: f32) {
#[must_use]
pub fn move_time(
emilk marked this conversation as resolved.
Show resolved Hide resolved
&mut self,
times_per_timeline: &TimesPerTimeline,
stable_dt: f32,
more_data_is_coming: bool,
) -> NeedsRepaint {
self.select_a_valid_timeline(times_per_timeline);

let Some(full_range) = self.full_range(times_per_timeline) else {
return;
return NeedsRepaint::No; // we have no data on this timeline yet, so bail
};

match self.play_state() {
Expand All @@ -132,6 +140,7 @@ impl TimeControl {
full_range.min
})
});
NeedsRepaint::No
}
PlayState::Playing => {
let dt = stable_dt.min(0.1) * self.speed;
Expand All @@ -141,11 +150,17 @@ impl TimeControl {
.entry(self.timeline)
.or_insert_with(|| TimeState::new(full_range.min));

if self.looping == Looping::Off && state.time >= full_range.max {
// We've reached the end - stop playing.
if self.looping == Looping::Off && full_range.max <= state.time {
// We've reached the end o the data
emilk marked this conversation as resolved.
Show resolved Hide resolved
emilk marked this conversation as resolved.
Show resolved Hide resolved
state.time = full_range.max.into();
self.pause();
return;

if more_data_is_coming {
// then let's wait for it without pausing!
return NeedsRepaint::No; // ui will wake up where more data arrives
} else {
self.pause();
return NeedsRepaint::No;
}
}

let loop_range = match self.looping {
Expand All @@ -166,10 +181,12 @@ impl TimeControl {
}

if let Some(loop_range) = loop_range {
if state.time > loop_range.max {
state.time = loop_range.min;
if loop_range.max < state.time {
state.time = loop_range.min; // loop!
}
}

NeedsRepaint::Yes
}
PlayState::Following => {
// Set the time to the max:
Expand All @@ -181,7 +198,7 @@ impl TimeControl {
entry.get_mut().time = full_range.max.into();
}
}
// no need for request_repaint - we already repaint when new data arrives
NeedsRepaint::No // no need for request_repaint - we already repaint when new data arrives
}
}
}
Expand Down