Skip to content

Commit

Permalink
Datastore revamp 6: sunset LogMsg storage + save store to disk (#1795)
Browse files Browse the repository at this point in the history
  • Loading branch information
teh-cmc authored Apr 12, 2023
1 parent 72de79e commit b7bbe10
Show file tree
Hide file tree
Showing 22 changed files with 123 additions and 444 deletions.
77 changes: 30 additions & 47 deletions crates/re_data_store/src/log_db.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::BTreeMap;

use nohash_hasher::IntMap;

use re_arrow_store::{DataStoreConfig, TimeInt};
Expand Down Expand Up @@ -159,33 +161,31 @@ impl EntityDb {
/// A in-memory database built from a stream of [`LogMsg`]es.
#[derive(Default)]
pub struct LogDb {
/// Messages in the order they arrived
chronological_row_ids: Vec<RowId>,
log_messages: ahash::HashMap<RowId, LogMsg>,

/// Data that was logged with [`TimePoint::timeless`].
/// We need to re-insert those in any new timelines
/// that are created after they were logged.
timeless_row_ids: Vec<RowId>,
/// All [`EntityPathOpMsg`]s ever received.
entity_op_msgs: BTreeMap<RowId, EntityPathOpMsg>,

/// Set by whomever created this [`LogDb`].
pub data_source: Option<re_smart_channel::Source>,

/// Comes in a special message, [`LogMsg::BeginRecordingMsg`].
recording_info: Option<RecordingInfo>,
recording_msg: Option<BeginRecordingMsg>,

/// Where we store the entities.
pub entity_db: EntityDb,
}

impl LogDb {
pub fn recording_msg(&self) -> Option<&BeginRecordingMsg> {
self.recording_msg.as_ref()
}

pub fn recording_info(&self) -> Option<&RecordingInfo> {
self.recording_info.as_ref()
self.recording_msg().map(|msg| &msg.info)
}

pub fn recording_id(&self) -> RecordingId {
if let Some(info) = &self.recording_info {
info.recording_id
if let Some(msg) = &self.recording_msg {
msg.info.recording_id
} else {
RecordingId::ZERO
}
Expand All @@ -203,11 +203,16 @@ impl LogDb {
self.entity_db.tree.num_timeless_messages()
}

pub fn num_rows(&self) -> usize {
self.entity_db.data_store.total_timeless_rows() as usize
+ self.entity_db.data_store.total_temporal_rows() as usize
}

pub fn is_empty(&self) -> bool {
self.log_messages.is_empty()
self.num_rows() == 0
}

pub fn add(&mut self, msg: LogMsg) -> Result<(), Error> {
pub fn add(&mut self, msg: &LogMsg) -> Result<(), Error> {
crate::profile_function!();

match &msg {
Expand All @@ -218,38 +223,27 @@ impl LogDb {
time_point,
path_op,
} = msg;
self.entity_op_msgs.insert(*row_id, msg.clone());
self.entity_db.add_path_op(*row_id, time_point, path_op);
}
LogMsg::ArrowMsg(_, inner) => self.entity_db.try_add_arrow_msg(inner)?,
LogMsg::Goodbye(_) => {}
}

// TODO(#1619): the following only makes sense because, while we support sending and
// receiving batches, we don't actually do so yet.
// We need to stop storing raw `LogMsg`s before we can benefit from our batching.
self.chronological_row_ids.push(msg.id());
self.log_messages.insert(msg.id(), msg);

Ok(())
}

fn add_begin_recording_msg(&mut self, msg: &BeginRecordingMsg) {
self.recording_info = Some(msg.info.clone());
self.recording_msg = Some(msg.clone());
}

pub fn len(&self) -> usize {
self.log_messages.len()
/// Returns an iterator over all [`EntityPathOpMsg`]s that have been written to this `LogDb`.
pub fn iter_entity_op_msgs(&self) -> impl Iterator<Item = &EntityPathOpMsg> {
self.entity_op_msgs.values()
}

/// In the order they arrived
pub fn chronological_log_messages(&self) -> impl Iterator<Item = &LogMsg> {
self.chronological_row_ids
.iter()
.filter_map(|id| self.get_log_msg(id))
}

pub fn get_log_msg(&self, row_id: &RowId) -> Option<&LogMsg> {
self.log_messages.get(row_id)
pub fn get_entity_op_msg(&self, row_id: &RowId) -> Option<&EntityPathOpMsg> {
self.entity_op_msgs.get(row_id)
}

/// Free up some RAM by forgetting the older parts of all timelines.
Expand All @@ -263,26 +257,15 @@ impl LogDb {
let cutoff_times = self.entity_db.data_store.oldest_time_per_timeline();

let Self {
chronological_row_ids,
log_messages,
timeless_row_ids,
entity_op_msgs,
data_source: _,
recording_info: _,
recording_msg: _,
entity_db,
} = self;

{
crate::profile_scope!("chronological_row_ids");
chronological_row_ids.retain(|row_id| !drop_row_ids.contains(row_id));
}

{
crate::profile_scope!("log_messages");
log_messages.retain(|row_id, _| !drop_row_ids.contains(row_id));
}
{
crate::profile_scope!("timeless_row_ids");
timeless_row_ids.retain(|row_id| !drop_row_ids.contains(row_id));
crate::profile_scope!("entity_op_msgs");
entity_op_msgs.retain(|row_id, _| !drop_row_ids.contains(row_id));
}

entity_db.purge(&cutoff_times, &drop_row_ids);
Expand Down
11 changes: 11 additions & 0 deletions crates/re_log_encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,14 @@ pub fn encode<'a>(
}
encoder.finish()
}

pub fn encode_owned(
messages: impl Iterator<Item = LogMsg>,
write: impl std::io::Write,
) -> Result<(), EncodeError> {
let mut encoder = Encoder::new(write)?;
for message in messages {
encoder.append(&message)?;
}
encoder.finish()
}
2 changes: 1 addition & 1 deletion crates/re_log_types/src/data_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ impl DataTable {
/// Internally, time columns are (de)serialized separately from the rest of the control
/// columns for efficiency/QOL concerns: that doesn't change the fact that they are control
/// columns all the same!
/// * Data columns are the one that hold component data.
/// * Data columns are the ones that hold component data.
/// They are optional, potentially sparse, and never deserialized on the server-side (not by
/// the storage systems, at least).
pub fn serialize(&self) -> DataTableResult<(Schema, Chunk<Box<dyn Array>>)> {
Expand Down
113 changes: 51 additions & 62 deletions crates/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,7 @@ impl eframe::App for App {
log_db,
)
.selection_state
.on_frame_start(log_db, blueprint);
.on_frame_start(blueprint);

{
// TODO(andreas): store the re_renderer somewhere else.
Expand Down Expand Up @@ -704,7 +704,7 @@ impl App {
log_db.data_source = Some(self.rx.source().clone());
}

if let Err(err) = log_db.add(msg) {
if let Err(err) = log_db.add(&msg) {
re_log::error!("Failed to add incoming msg: {err}");
};

Expand Down Expand Up @@ -916,8 +916,6 @@ fn preview_files_being_dropped(egui_ctx: &egui::Context) {
enum PanelSelection {
#[default]
Viewport,

EventLog,
}

#[derive(Default, serde::Deserialize, serde::Serialize)]
Expand All @@ -940,8 +938,6 @@ struct AppState {
/// Which view panel is currently being shown
panel_selection: PanelSelection,

event_log_view: crate::event_log_view::EventLogView,

selection_panel: crate::selection_panel::SelectionPanel,
time_panel: crate::time_panel::TimePanel,

Expand Down Expand Up @@ -969,7 +965,6 @@ impl AppState {
selected_rec_id,
recording_configs,
panel_selection,
event_log_view,
blueprints,
selection_panel,
time_panel,
Expand Down Expand Up @@ -1014,7 +1009,6 @@ impl AppState {
.entry(selected_app_id)
.or_insert_with(|| Blueprint::new(ui.ctx()))
.blueprint_panel_and_viewport(&mut ctx, ui),
PanelSelection::EventLog => event_log_view.ui(&mut ctx, ui),
});

// move time last, so we get to see the first data first!
Expand Down Expand Up @@ -1523,7 +1517,13 @@ fn save(app: &mut App, loop_selection: Option<(re_data_store::Timeline, TimeRang
.set_title(title)
.save_file()
{
let f = save_database_to_file(app.log_db(), path, loop_selection);
let f = match save_database_to_file(app.log_db(), path, loop_selection) {
Ok(f) => f,
Err(err) => {
re_log::error!("File saving failed: {err}");
return;
}
};
if let Err(err) = app.spawn_threaded_promise(FILE_SAVER_PROMISE, f) {
// NOTE: Shouldn't even be possible as the "Save" button is already
// grayed out at this point... better safe than sorry though.
Expand All @@ -1546,16 +1546,6 @@ fn main_view_selector_ui(ui: &mut egui::Ui, app: &mut App) {
{
ui.close_menu();
}
if ui
.selectable_value(
&mut app.state.panel_selection,
PanelSelection::EventLog,
"Event Log",
)
.clicked()
{
ui.close_menu();
}
});
}
}
Expand Down Expand Up @@ -1760,57 +1750,56 @@ fn save_database_to_file(
log_db: &LogDb,
path: std::path::PathBuf,
time_selection: Option<(re_data_store::Timeline, TimeRangeF)>,
) -> impl FnOnce() -> anyhow::Result<std::path::PathBuf> {
use re_log_types::{EntityPathOpMsg, TimeInt};

let msgs = match time_selection {
// Fast path: no query, just dump everything.
None => log_db
.chronological_log_messages()
.cloned()
.collect::<Vec<_>>(),

// Query path: time to filter!
Some((timeline, range)) => {
use std::ops::RangeInclusive;
let range: RangeInclusive<TimeInt> = range.min.floor()..=range.max.ceil();
log_db
.chronological_log_messages()
.filter(|msg| {
match msg {
LogMsg::BeginRecordingMsg(_) | LogMsg::Goodbye(_) => {
true // timeless
}
LogMsg::EntityPathOpMsg(_, EntityPathOpMsg { time_point, .. }) => {
time_point.is_timeless() || {
let is_within_range = time_point
.get(&timeline)
.map_or(false, |t| range.contains(t));
is_within_range
}
}
LogMsg::ArrowMsg(_, _) => {
// TODO(john)
false
}
}
})
.cloned()
.collect::<Vec<_>>()
}
};
) -> anyhow::Result<impl FnOnce() -> anyhow::Result<std::path::PathBuf>> {
use re_arrow_store::TimeRange;

crate::profile_scope!("dump_messages");

move || {
let begin_rec_msg = log_db
.recording_msg()
.map(|msg| LogMsg::BeginRecordingMsg(msg.clone()));

let ent_op_msgs = log_db
.iter_entity_op_msgs()
.map(|msg| LogMsg::EntityPathOpMsg(log_db.recording_id(), msg.clone()))
.collect_vec();

let time_filter = time_selection.map(|(timeline, range)| {
(
timeline,
TimeRange::new(range.min.floor(), range.max.ceil()),
)
});
let data_msgs: Result<Vec<_>, _> = log_db
.entity_db
.data_store
.to_data_tables(time_filter)
.map(|table| {
table
.to_arrow_msg()
.map(|msg| LogMsg::ArrowMsg(log_db.recording_id(), msg))
})
.collect();

use anyhow::Context as _;
let data_msgs = data_msgs.with_context(|| "Failed to export to data tables")?;

let msgs = std::iter::once(begin_rec_msg)
.flatten() // option
.chain(ent_op_msgs)
.chain(data_msgs);

Ok(move || {
crate::profile_scope!("save_to_file");

use anyhow::Context as _;
let file = std::fs::File::create(path.as_path())
.with_context(|| format!("Failed to create file at {path:?}"))?;

re_log_encoding::encoder::encode(msgs.iter(), file)
re_log_encoding::encoder::encode_owned(msgs, file)
.map(|_| path)
.context("Message encode")
}
})
}

#[allow(unused_mut)]
Expand All @@ -1821,7 +1810,7 @@ fn load_rrd_to_log_db(mut read: impl std::io::Read) -> anyhow::Result<LogDb> {

let mut log_db = LogDb::default();
for msg in decoder {
log_db.add(msg?)?;
log_db.add(&msg?)?;
}
Ok(log_db)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/re_viewer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ mod viewer_analytics;

pub(crate) use misc::{mesh_loader, Item, TimeControl, TimeView, ViewerContext};
use re_log_types::PythonVersion;
pub(crate) use ui::{event_log_view, memory_panel, selection_panel, time_panel, UiVerbosity};
pub(crate) use ui::{memory_panel, selection_panel, time_panel, UiVerbosity};

pub use app::{App, StartupOptions};
pub use remote_viewer_app::RemoteViewerApp;
Expand Down
Loading

0 comments on commit b7bbe10

Please sign in to comment.