Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

datastore: stabilize dataframe sorts #1549

Merged
merged 4 commits into from
Mar 15, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 59 additions & 18 deletions crates/re_arrow_store/src/store_polars.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::BTreeSet;

use arrow2::{
array::{new_empty_array, Array, BooleanArray, ListArray, UInt64Array, Utf8Array},
bitmap::Bitmap,
Expand All @@ -23,7 +25,7 @@ impl DataStore {
pub fn to_dataframe(&self) -> DataFrame {
crate::profile_function!();

const IS_TIMELESS_COL: &str = "_is_timeless";
const TIMELESS_COL: &str = "_is_timeless";

let timeless_dfs = self.timeless_indices.values().map(|index| {
let ent_path = index.ent_path.clone();
Expand All @@ -34,7 +36,7 @@ impl DataStore {
// Add a column where every row is a boolean true (timeless)
let timeless = {
let timeless = BooleanArray::from(vec![Some(true); num_rows]).boxed();
new_infallible_series(IS_TIMELESS_COL, timeless.as_ref(), num_rows)
new_infallible_series(TIMELESS_COL, timeless.as_ref(), num_rows)
};
let df = df.with_column(timeless).unwrap(); // cannot fail

Expand Down Expand Up @@ -89,25 +91,56 @@ impl DataStore {
})
.collect();

// Some internal functions of `polars` will panic if everything's empty: early exit.
if dfs.iter().all(|df| df.is_empty()) {
return DataFrame::empty();
}

// Concatenate all indices together.
//
// This has to be done diagonally since these indices refer to different entities with
// potentially wildly different sets of components and lengths.
let df = diag_concat_df(dfs.as_slice())
// TODO(cmc): is there any way this can fail in this case?
.unwrap();
//
// NOTE: The only way this can fail in this case is if all these frames are empty, because
// the store itself is empty, which we check just above.
let df = diag_concat_df(dfs.as_slice()).unwrap();

// Arrange the columns in the order that makes the most sense as a user.
let timelines: BTreeSet<&str> = self
.indices
.keys()
.map(|(timeline, _)| timeline.name().as_str())
.collect();
let df = sort_df_columns(&df, self.config.store_insert_ids, &timelines);

let df = sort_df_columns(&df, self.config.store_insert_ids);
let has_timeless = df.column(TIMELESS_COL).is_ok();
let insert_id_col = DataStore::insert_id_key().as_str();

let has_insert_ids = df.column(DataStore::insert_id_key().as_str()).is_ok();
let has_timeless = df.column(IS_TIMELESS_COL).is_ok();
const ASCENDING: bool = false;
const DESCENDING: bool = true;

// Now we want to sort based on _the contents_ of the columns, and we need to make sure
// we do so in as stable a way as possible given our constraints: we cannot actually sort
// the component columns themselves as they are internally lists of their own.
let (sort_cols, sort_orders): (Vec<_>, Vec<_>) = [
has_timeless.then_some((IS_TIMELESS_COL, true)),
has_insert_ids.then_some((DataStore::insert_id_key().as_str(), false)),
df.column(TIMELESS_COL)
.is_ok()
.then_some((TIMELESS_COL, DESCENDING)),
df.column(insert_id_col)
.is_ok()
.then_some((insert_id_col, ASCENDING)),
]
.into_iter()
.flatten()
// NOTE: Already properly arranged above, and already contains insert_id if needed.
.chain(
df.get_column_names()
.into_iter()
.filter(|col| *col != TIMELESS_COL) // we handle this one separately
.filter(|col| *col != insert_id_col) // we handle this one separately
.filter(|col| df.column(col).unwrap().list().is_err()) // lists cannot be sorted
.map(|col| (col, ASCENDING)),
)
.unzip();

let df = if !sort_cols.is_empty() {
Expand All @@ -117,7 +150,7 @@ impl DataStore {
};

if has_timeless {
df.drop(IS_TIMELESS_COL).unwrap()
df.drop(TIMELESS_COL).unwrap()
} else {
df
}
Expand Down Expand Up @@ -308,8 +341,13 @@ fn new_infallible_series(name: &str, data: &dyn Array, len: usize) -> Series {
// - insert ID comes first if it's available,
// - followed by lexically sorted timelines,
// - followed by the entity path,
// - and finally all components in lexical order.
fn sort_df_columns(df: &DataFrame, store_insert_ids: bool) -> DataFrame {
// - followed by native components (i.e. "rerun.XXX") in lexical order,
// - and finally extension components (i.e. "ext.XXX") in lexical order.
fn sort_df_columns(
df: &DataFrame,
store_insert_ids: bool,
timelines: &BTreeSet<&str>,
) -> DataFrame {
crate::profile_function!();

let columns: Vec<_> = {
Expand All @@ -325,25 +363,28 @@ fn sort_df_columns(df: &DataFrame, store_insert_ids: bool) -> DataFrame {
);
}

let timelines = all
let timelines = timelines.iter().copied().map(Some).collect::<Vec<_>>();

let native_components = all
.iter()
.copied()
.filter(|name| !name.starts_with("rerun."))
.filter(|name| name.starts_with("rerun."))
.map(Some)
.collect::<Vec<_>>();

let components = all
let extension_components = all
.iter()
.copied()
.filter(|name| name.starts_with("rerun."))
.filter(|name| name.starts_with("ext."))
.map(Some)
.collect::<Vec<_>>();

[
vec![store_insert_ids.then(|| DataStore::insert_id_key().as_str())],
timelines,
vec![Some("entity")],
components,
native_components,
extension_components,
]
.into_iter()
.flatten() // flatten vectors
Expand Down