Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace Session with RecordingContext #1983

Merged
merged 29 commits into from
May 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
68d8b0c
version crossbeam at the workspace level
teh-cmc Apr 17, 2023
cb74038
more DataRow size helpers
teh-cmc Apr 26, 2023
a0d9d39
DataTableBatcher
teh-cmc Apr 26, 2023
f46ac72
lints
teh-cmc Apr 26, 2023
5440f76
lints
teh-cmc Apr 26, 2023
c1088c5
self review
teh-cmc Apr 26, 2023
cbf17be
don't expose shutdown to make errors impossible
teh-cmc Apr 26, 2023
e7b42bf
doc
teh-cmc Apr 26, 2023
573de98
backport
teh-cmc Apr 26, 2023
67dc616
backport
teh-cmc Apr 27, 2023
14130c5
introduce RecordingStream
teh-cmc Apr 27, 2023
71a31bd
clean up old stuff from the before time
teh-cmc Apr 27, 2023
649bbe8
self-review
teh-cmc Apr 28, 2023
2b74c3b
ordered data columns in data tables
teh-cmc Apr 28, 2023
34be0a7
tests
teh-cmc Apr 28, 2023
72685fa
even more tests
teh-cmc Apr 28, 2023
067168f
rogue todo
teh-cmc Apr 28, 2023
b8e0065
batching is now a reality
teh-cmc Apr 28, 2023
0e69707
some extra peace of mind
teh-cmc Apr 28, 2023
a7f84c8
revert
teh-cmc Apr 28, 2023
ead5883
Merge branch 'main' into cmc/sdk_revamp/1_batcher
teh-cmc Apr 28, 2023
3f0ec73
Merge branch 'cmc/sdk_revamp/1_batcher' into cmc/sdk_revamp/2_rust_re…
teh-cmc Apr 28, 2023
6e348db
lock shenanigans
teh-cmc Apr 28, 2023
a31285b
Merge branch 'main' into cmc/sdk_revamp/1_batcher
teh-cmc May 3, 2023
ecb7ce5
Merge branch 'cmc/sdk_revamp/1_batcher' into cmc/sdk_revamp/2_rust_re…
teh-cmc May 3, 2023
8580773
Merge remote-tracking branch 'origin/main' into cmc/sdk_revamp/2_rust…
teh-cmc May 3, 2023
4af3342
merge shenanigans
teh-cmc May 3, 2023
d1e5c19
address PR comments
teh-cmc May 3, 2023
c121325
typos
teh-cmc May 4, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/re_arrow_store/src/polars_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ pub fn range_components<'a, const N: usize>(

// --- Joins ---

// TODO(#1619): none of this mess should be here
// TODO(#1759): none of this mess should be here

pub fn dataframe_from_cells<const N: usize>(
cells: &[Option<DataCell>; N],
Expand Down
10 changes: 5 additions & 5 deletions crates/re_arrow_store/src/store_dump.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use ahash::HashMapExt;
use std::collections::BTreeMap;

use arrow2::Either;
use nohash_hasher::IntMap;
use re_log_types::{
DataCellColumn, DataTable, ErasedTimeVec, RowIdVec, TableId, TimeRange, Timeline,
};
Expand Down Expand Up @@ -51,7 +51,7 @@ impl DataStore {
.take(table.num_rows() as _)
.collect(),
col_num_instances: col_num_instances.clone(),
columns: columns.clone(), // shallow
columns: columns.clone().into_iter().collect(), // shallow
}
})
}
Expand Down Expand Up @@ -92,7 +92,7 @@ impl DataStore {
.take(col_row_id.len())
.collect(),
col_num_instances: col_num_instances.clone(),
columns: columns.clone(), // shallow
columns: columns.clone().into_iter().collect(), // shallow
}
})
})
Expand Down Expand Up @@ -163,7 +163,7 @@ impl DataStore {
let col_num_instances =
filter_column(col_time, col_num_instances.iter(), time_filter).collect();

let mut columns2 = IntMap::with_capacity(columns.len());
let mut columns2 = BTreeMap::default();
for (component, column) in columns {
let column = filter_column(col_time, column.iter(), time_filter).collect();
columns2.insert(*component, DataCellColumn(column));
Expand Down
2 changes: 1 addition & 1 deletion crates/re_arrow_store/src/store_polars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
};

// TODO(#1692): all of this stuff should be defined by Data{Cell,Row,Table}, not the store.
// TODO(#1619): remove this and reimplement it on top of store serialization
// TODO(#1759): remove this and reimplement it on top of store serialization

// ---

Expand Down
5 changes: 2 additions & 3 deletions crates/re_arrow_store/src/store_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,8 @@ use crate::{
IndexedTable, PersistentIndexedTable,
};

// TODO(#1619):
// - The store should insert column-per-column rather than row-per-row (purely a performance
// matter)
// TODO(cmc): the store should insert column-per-column rather than row-per-row (purely a
// performance matter).

// --- Data store ---

Expand Down
2 changes: 0 additions & 2 deletions crates/re_arrow_store/tests/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ use re_log_types::{
Timeline,
};

// TODO(#1619): introduce batching in the testing matrix

// --- LatestComponentsAt ---

#[test]
Expand Down
2 changes: 1 addition & 1 deletion crates/re_data_store/src/log_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl EntityDb {
let mut table = DataTable::from_arrow_msg(msg)?;
table.compute_all_size_bytes();

// TODO(#1619): batch all of this
// TODO(cmc): batch all of this
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oooh.... even more performance gains to be had?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we only batch during transport at the moment, everything else happens on a row-by-row basis.

Though I'm not sure it's ever going to be worth it in this instance: implementing the write path in terms of individual rows is so flexible, simple and easy to extend... and things are already so fast even with just the transport-level batching...

for row in table.to_rows() {
self.try_add_data_row(&row)?;
}
Expand Down
3 changes: 0 additions & 3 deletions crates/re_log_types/src/arrow_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ use arrow2::{array::Array, chunk::Chunk, datatypes::Schema};
#[derive(Clone, Debug, PartialEq)]
pub struct ArrowMsg {
/// Unique identifier for the [`crate::DataTable`] in this message.
///
/// NOTE(#1619): While we're in the process of transitioning towards end-to-end batching, the
/// `table_id` is always the same as the `row_id` as the first and only row.
pub table_id: TableId,

/// The maximum values for all timelines across the entire batch of data.
Expand Down
5 changes: 4 additions & 1 deletion crates/re_log_types/src/data_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ pub struct DataCellInner {
}

// TODO(cmc): We should be able to build a cell from non-reference types.
// TODO(#1619): We shouldn't have to specify the component name separately, this should be
// TODO(#1696): We shouldn't have to specify the component name separately, this should be
// part of the metadata by using an extension.
// TODO(#1696): Check that the array is indeed a leaf / component type when building a cell from an
// arrow payload.
Expand Down Expand Up @@ -533,6 +533,9 @@ impl DataCell {
inner.compute_size_bytes();
return true;
}

re_log::error_once!("cell size could _not_ be computed");

false
}
}
Expand Down
13 changes: 2 additions & 11 deletions crates/re_log_types/src/data_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,6 @@ impl RowId {
pub fn random() -> Self {
Self(re_tuid::Tuid::random())
}

/// Temporary utility while we transition to batching. See #1619.
#[doc(hidden)]
pub fn into_table_id(self) -> TableId {
TableId(self.0)
}
}

impl SizeBytes for RowId {
Expand Down Expand Up @@ -322,13 +316,10 @@ impl DataRow {
Self::try_from_cells(row_id, timepoint, entity_path, num_instances, cells).unwrap()
}

/// Turns the `DataRow` into a single-row [`DataTable`] that carries the same ID.
///
/// This only makes sense as part of our transition to batching. See #1619.
#[doc(hidden)]
/// Turns the `DataRow` into a single-row [`DataTable`].
#[inline]
pub fn into_table(self) -> DataTable {
DataTable::from_rows(self.row_id.into_table_id(), [self])
DataTable::from_rows(TableId::random(), [self])
}
}

Expand Down
18 changes: 3 additions & 15 deletions crates/re_log_types/src/data_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::BTreeMap;

use ahash::HashMap;
use itertools::Itertools as _;
use nohash_hasher::{IntMap, IntSet};
use nohash_hasher::IntSet;
use smallvec::SmallVec;

use crate::{
Expand Down Expand Up @@ -156,12 +156,6 @@ impl TableId {
pub fn random() -> Self {
Self(re_tuid::Tuid::random())
}

/// Temporary utility while we transition to batching. See #1619.
#[doc(hidden)]
pub fn into_row_id(self) -> RowId {
RowId(self.0)
}
}

impl SizeBytes for TableId {
Expand Down Expand Up @@ -353,7 +347,7 @@ pub struct DataTable {
///
/// The cells are optional since not all rows will have data for every single component
/// (i.e. the table is sparse).
pub columns: IntMap<ComponentName, DataCellColumn>,
pub columns: BTreeMap<ComponentName, DataCellColumn>,
}

impl DataTable {
Expand Down Expand Up @@ -424,7 +418,7 @@ impl DataTable {
}

// Pre-allocate all columns (one per component).
let mut columns = IntMap::default();
let mut columns = BTreeMap::default();
for component in components {
columns.insert(
component,
Expand All @@ -441,12 +435,6 @@ impl DataTable {
}
}

if col_row_id.len() > 1 {
re_log::warn_once!(
"batching features are not ready for use, use single-row data tables instead!"
);
}

Self {
table_id,
col_row_id,
Expand Down
2 changes: 1 addition & 1 deletion crates/re_log_types/src/datagen.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Generate random data for tests and benchmarks.

// TODO(#1619): It really is time for whole module to disappear.
// TODO(#1810): It really is time for whole module to disappear.

use crate::{
component_types::{self, InstanceKey},
Expand Down
12 changes: 0 additions & 12 deletions crates/re_log_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,18 +195,6 @@ pub enum LogMsg {
}

impl LogMsg {
pub fn id(&self) -> RowId {
match self {
Self::BeginRecordingMsg(msg) => msg.row_id,
Self::EntityPathOpMsg(_, msg) => msg.row_id,
Self::Goodbye(row_id) => *row_id,
// TODO(#1619): the following only makes sense because, while we support sending and
// receiving batches, we don't actually do so yet.
// We need to stop storing raw `LogMsg`s before we can benefit from our batching.
Self::ArrowMsg(_, msg) => msg.table_id.into_row_id(),
}
}

pub fn recording_id(&self) -> Option<&RecordingId> {
match self {
Self::BeginRecordingMsg(msg) => Some(&msg.info.recording_id),
Expand Down
2 changes: 2 additions & 0 deletions crates/re_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ re_log.workspace = true
re_memory.workspace = true
re_sdk_comms = { workspace = true, features = ["client"] }

crossbeam.workspace = true
document-features = "0.2"
parking_lot.workspace = true
thiserror.workspace = true
Expand All @@ -54,6 +55,7 @@ arrow2_convert.workspace = true
ndarray.workspace = true
ndarray-rand = "0.14"
rand = "0.8"
similar-asserts = "1.4.2"


[build-dependencies]
Expand Down
23 changes: 0 additions & 23 deletions crates/re_sdk/src/global.rs

This file was deleted.

19 changes: 7 additions & 12 deletions crates/re_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,15 @@
// ----------------
// Private modules:

#[cfg(feature = "global_session")]
mod global;

mod log_sink;
mod msg_sender;
mod session;
mod recording_stream;

// -------------
// Public items:

#[cfg(feature = "global_session")]
pub use self::global::global_session;

pub use self::msg_sender::{MsgSender, MsgSenderError};
pub use self::session::{Session, SessionBuilder};
pub use self::recording_stream::{RecordingStream, RecordingStreamBuilder};

pub use re_sdk_comms::default_server_addr;

Expand All @@ -36,6 +30,9 @@ impl crate::sink::LogSink for re_log_encoding::FileSink {
fn send(&self, msg: re_log_types::LogMsg) {
re_log_encoding::FileSink::send(self, msg);
}

#[inline]
fn flush_blocking(&self) {}
}

// ---------------
Expand All @@ -49,9 +46,7 @@ pub mod demo_util;
/// This is how you select whether the log stream ends up
/// sent over TCP, written to file, etc.
pub mod sink {
pub use crate::log_sink::{
disabled, BufferedSink, LogSink, MemorySink, MemorySinkStorage, TcpSink,
};
pub use crate::log_sink::{BufferedSink, LogSink, MemorySink, MemorySinkStorage, TcpSink};

#[cfg(not(target_arch = "wasm32"))]
pub use re_log_encoding::{FileSink, FileSinkError};
Expand Down Expand Up @@ -153,7 +148,7 @@ pub fn decide_logging_enabled(default_enabled: bool) -> bool {

// ----------------------------------------------------------------------------

/// Creates a new [`re_log_types::RecordingInfo`] which can be used with [`Session::new`].
/// Creates a new [`re_log_types::RecordingInfo`] which can be used with [`RecordingStream::new`].
#[track_caller] // track_caller so that we can see if we are being called from an official example.
pub fn new_recording_info(
application_id: impl Into<re_log_types::ApplicationId>,
Expand Down
Loading