Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New ChunkStore APIs to facilitate data access #7284

Closed
jleibs opened this issue Aug 27, 2024 · 1 comment · Fixed by #7341 or #7345
Closed

New ChunkStore APIs to facilitate data access #7284

jleibs opened this issue Aug 27, 2024 · 1 comment · Fixed by #7341 or #7345
Assignees
Labels
⛃ re_datastore affects the datastore itself

Comments

@jleibs
Copy link
Member

jleibs commented Aug 27, 2024

This proposal covers 3 newish things.

  • A ComponentColumnDescriptor that is roughly inspiried by our future vision of tagged components
  • It introduces TimeColumnDescriptor as a way of talking about how temporal data will be materialized in the record-batches. Because the user has the abillity to pass in the list of columns they are interested in, they can choose whether or not they want the query results to include a temporal column. This includes supporting use-cases like including the log_time column when executing a range query according to frame as the timeline.
  • It suggests introduction of a RowJoinPolicy in the range query that determines the behavior. This policy seems fairly unambiguous and the existence of the different options and abillity to switch between them seems like it has the potential to clarify for a user what's happening rather than making it always magic.

Proposed interfaces:

pub struct TimeColumnDescriptor {
    /// The timeline this column is associated with.
    timeline: Timeline,

    /// The data type of the column.
    data_type: ArrowDataType,
}

pub struct ComponentColumnDescriptor {
    /// The path of the entity
    entity_path: EntityPath,

    /// Optional name of the `Archetype` associated with this data.
    ///
    /// `None` if the data wasn't logged through an archetype.
    ///
    /// Example: `rerun.archetypes.Points3D`.
    archetype_name: Option<String>,

    /// Optional name of the field within `Archetype` associated with this data.
    ///
    /// `None` if the data wasn't logged through an archetype.
    ///
    /// Example: `positions`.
    archetype_field: Option<String>,

    /// Semantic name associated with this data.
    ///
    /// This is fully implied by `archetype_name` and `archetype_field`, but
    /// included for semantic convenience.
    ///
    /// Example: `rerun.components.Position3D`.
    component_name: String,

    /// The data type of the column.
    data_type: ArrowDataType,

    /// Whether this column represents static data.
    is_static: bool,
}

enum ColumnDescriptor {
    Time(TimeColumnDescriptor),
    Component(ComponentColumnDescriptor),
}

pub struct LatestAtQueryExpression {
    /// The entity path expression to query.
    ///
    /// Example: `world/camera/**`
    entity_path_expr: String,

    /// The timeline to query.
    ///
    /// Example `frame`
    timeline: Timeline,

    /// The point of view to query from.
    ///
    /// Example: `18`
    time_value: TimeInt,
}

/// The `RowJoinPolicy` determines the joining behavior of a `RangeQueryExpression`.
///
/// This policy has the potential to significantly affect the density of the resulting data frame
/// and how much data is included in the result.
///
/// TODO: In the future we could let the user provide a RowJoinPolicy per column, but for now we
/// only allow a single policy for the entire query.
enum RowJoinPolicy {
    /// In `ExactRow` the data result will only contain data that was logged as part of the same
    /// row as the `point_of_view` column, within a single log call. This is the most sparse data
    /// frame.
    ///
    /// WARNING: Even if data is logged at the same time point, if it was logged in a separate log
    /// call, it will not be included in the result unless it is also explicitly included in one of
    /// the `point_of_view` columns.
    ExactRow,

    /// In `MatchOnTime` the data result will contain all data that was logged at the same time as
    /// the `point_of_view` regardless of whether it was logged in the same log call or not. This
    /// still has the potential to be a very sparse data frame if data was logged asynchronously at
    /// different points in time.
    ///
    /// If a point-of-view column is logged multiple times at the same point-point, all of the rows
    /// will be included in the result. If an exact-row match is possible on other columns, that will
    /// take precedence, and otherwise, the last logged value with a matching time point will be
    /// used.
    MatchOnTime,

    /// In `LatestAt`, each row will compute a `latest_at` result  for each column, using the time
    /// value of the row. Because this does not require exact matching time, this style of query has
    /// the potential to return a very large number of rows with very densely populated data.
    ///
    /// Given the number of columns this style of query is likely to produce, you are advised to
    /// use `schema_for_query`, and then provide just the columns you are interested in to the
    /// `range_query` method.
    LatestAt,
}

pub struct RangeQueryExpression {
    /// The entity path expression to query.
    ///
    /// Example: `world/camera/**`
    entity_path_expr: String,

    /// The timeline to query.
    ///
    /// Example `frame`
    timeline: Timeline,

    /// The time range to query.
    time_range: TimeRange,

    /// A point of view is specified by a component column descriptor.
    ///
    /// In a range query results, each non-null value of the component column
    /// will generate a row in the result.
    ///
    /// Note that a component can still be logged multiple
    point_of_view: Vec<ComponentColumnDescriptor>,

    /// Which joining policy to use. See `RowJoinPolicy`
    row_join_policy: RowJoinPolicy
}

enum QueryExpression {
    LatestAt(LatestAtQueryExpression),
    Range(RangeQueryExpression),
}

struct LatestAtResult {
    /// The original query expression used to generate this result.
    query: LatestAtQueryExpression,

    /// The columns in the result, in the same order as the data.
    columns: Vec<ColumnDescriptor>,

    /// The actual arrow RecordBatch containing the data.
    ///
    /// Note that this RecordBatch contains redundant raw arrow schema representation of
    /// the columns in the `columns` field. The two must match for this to be a valid result.
    data: RecordBatch,
}

struct RangeQueryResult {
    /// The original query expression used to generate this result.
    query: RangeQueryExpression,

    /// The columns in the result, in the same order as the data.
    columns: Vec<ColumnDescriptor>,

    /// The actual arrow RecordBatch containing the data.
    ///
    /// Note that this RecordBatch contains redundant raw arrow schema representation of
    /// the columns in the `columns` field. The two must match for this to be a valid result.
    data: Iter<Item = RecordBatch>,
}

impl ChunkStore {
    /// Returns the full schema of the table.
    ///
    /// This will include a column descriptor for every timeline and every component on every entity that has been
    /// written to the store so far.
    ///
    /// The order of the columns is not guaranteed to be in any particular or deterministic order.
    pub fn table_schema(&self) -> Vec<ColumnDescriptor> {
        todo!()
    }

    /// Returns all the timelines present in the store along with the maximum time range covered by
    /// each of them.
    pub fn timeline_info(&self) -> BTreeMap<TimeColumnDescriptor, TimeRange> {
        todo!()
    }

    /// Returns the filtered schema for the given query expression.
    ///
    /// This should only include Columns which will contain non-empty values from the perspective of
    /// the query semantics.
    ///
    /// Note that this result has the potential to include both multiple `TimeColumnDescriptor` and
    /// `ComponentColumnDescriptor`. Any timeline that is referenced in the same row as as the point-of-view
    /// column will be included in the result.
    pub fn schema_for_query(&self, query: QueryExpression) -> Vec<ColumnDescriptor> {
        todo!()
    }

    /// Executes a `LatestAt` query.
    ///
    /// Returns a single RecordBatch containing a single row.
    ///
    /// If `columns` is provided, the `LatestAtResult` will only include those columns.
    ///
    /// Because data is often logged concurrently across multiple timelines, the non-primary timelines
    /// are still valid data-columns to include in the result. So a user could, for example, query
    /// for a range of data on the `frame` timeline, but still include the `log_time` timeline in
    /// the result.
    ///
    /// Alternatively, `columns` will be determined by the result of `schema_for_query`.
    ///
    /// Any provided `ColumnDescriptors` that don't match a column in the result will still be included, but the
    /// data will be null for the entire column.
    pub fn latest_at_query(
        &self,
        query: LatestAtQueryExpression,
        columns: Option<Vec<ColumnDescriptor>>,
    ) -> LatestAtResult {
        todo!()
    }

    /// Executes a `Range` query.
    ///
    /// Returns a single RecordBatch containing a single row.
    ///
    /// If `columns` is provided, the `RangeQueryResult` will only include those columns.
    ///
    /// Because data is often logged concurrently across multiple timelines, the non-primary timelines
    /// are still valid data-columns to include in the result. So a user could, for example, query
    /// for a range of data on the `frame` timeline, but still include the `log_time` timeline in
    /// the result.
    ///
    /// Alternatively, `columns` will be determined by the result of `schema_for_query`.
    ///
    /// Any provided `ColumnDescriptors` that don't match a column in the result will still be included, but the
    /// data will be null for the entire column.
    pub fn range_query(
        &self,
        query: RangeQueryExpression,
        columns: Option<Vec<ColumnDescriptor>>,
    ) -> RangeQueryResult {
        todo!()
    }
}
@rerun-io rerun-io deleted a comment Aug 27, 2024
@jleibs jleibs added the ⛃ re_datastore affects the datastore itself label Aug 27, 2024
@jleibs jleibs added this to the 0.19 milestone Aug 27, 2024
@rerun-io rerun-io deleted a comment from YeGop0218 Aug 27, 2024
@teh-cmc
Copy link
Member

teh-cmc commented Aug 29, 2024

PoV -> ListOfIndices   (option to decimate -- remove the RowId part of the index)

range_query({Color, Radius}) -> QueryHandle
QueryHandle.gimme_dem_indices() -> ListOfIndices
QueryHandle.get(0, 1000) -> iter RecordBatch
async QueryHandle.get(1001, 2000) -> iter RecordBatch


range_query() -> QueryHandle
QueryHandle.get(0, 1000, {Color, Radius}) -> RecordBatch


Secondaries: dataframes to only go from sparse to dense 
PoVs: always dense (from sparse to dense if multiple PoVs dont agree)
pub fn range_query(
    &self,
    query: RangeQueryExpression,
    columns: Option<Vec<ColumnDescriptor>>,
) -> QueryHandle {
    todo!()
}

impl QueryHandle {
    pub fn get(row_range: RangeInclusive) -> RecordBatch; // <<< semantically

    // All returned RecordBatches have the same schema, which might lead to empty columns.
    pub fn get(row_range: RangeInclusive) -> impl Iterator<Item = RecordBatch>;

    let recbatch = get().collect();
}

No MVCC (yet :wink_wink:)

teh-cmc added a commit that referenced this issue Sep 4, 2024
All the boilerplate for the new `re_dataframe`.

Also introduces all the new types:
* `QueryExpression`, `LatestAtQueryExpression`, `RangeQueryExpression`
* `QueryHandle`, `LatestAtQueryHandle` (unimplemented),
`RangeQueryHandle` (unimplemented)
* `ColumnDescriptor`, `ControlColumnDescriptor`, `TimeColumnDescriptor`,
`ComponentColumnDescriptor`

No actual code logic, just definitions.

* Part of #7284 

---

Dataframe APIs PR series:
- #7338
- #7339
- #7340
- #7341
- #7345
teh-cmc added a commit that referenced this issue Sep 4, 2024
The schema resolution logic.

* Part of #7284 

---

Dataframe APIs PR series:
- #7338
- #7339
- #7340
- #7341
- #7345
teh-cmc added a commit that referenced this issue Sep 4, 2024
Implements the latest-api dataframe API.

Examples:
```
cargo r --all-features -p re_dataframe --example latest_at -- /tmp/helix.rrd
cargo r --all-features -p re_dataframe --example latest_at -- /tmp/helix.rrd /helix/structure/scaffolding/**
```

```rust
use itertools::Itertools as _;

use re_chunk::{TimeInt, Timeline};
use re_chunk_store::{ChunkStore, ChunkStoreConfig, LatestAtQueryExpression, VersionPolicy};
use re_dataframe::QueryEngine;
use re_log_types::StoreKind;

fn main() -> anyhow::Result<()> {
    let args = std::env::args().collect_vec();

    let get_arg = |i| {
        let Some(value) = args.get(i) else {
            eprintln!(
                "Usage: {} <path_to_rrd> <entity_path_expr>",
                args.first().map_or("$BIN", |s| s.as_str())
            );
            std::process::exit(1);
        };
        value
    };

    let path_to_rrd = get_arg(1);
    let entity_path_expr = args.get(2).map_or("/**", |s| s.as_str());

    let stores = ChunkStore::from_rrd_filepath(
        &ChunkStoreConfig::DEFAULT,
        path_to_rrd,
        VersionPolicy::Warn,
    )?;

    for (store_id, store) in &stores {
        if store_id.kind != StoreKind::Recording {
            continue;
        }

        let cache = re_dataframe::external::re_query::Caches::new(store);
        let engine = QueryEngine {
            store,
            cache: &cache,
        };

        let query = LatestAtQueryExpression {
            entity_path_expr: entity_path_expr.into(),
            timeline: Timeline::log_time(),
            at: TimeInt::MAX,
        };

        let query_handle = engine.latest_at(&query, None /* columns */);
        let batch = query_handle.get();

        eprintln!("{query}:\n{batch}");
    }

    Ok(())
}
```

* Part of #7284 

---

Dataframe APIs PR series:
- #7338
- #7339
- #7340
- #7341
- #7345
@teh-cmc teh-cmc closed this as completed in 9a994a0 Sep 4, 2024
teh-cmc added a commit that referenced this issue Sep 4, 2024
Implements the paginated dense range dataframe APIs.

If there's no off-by-one anywhere in there, I will eat my hat.
Getting this in the hands of people is the highest prio though, I'll add
tests later.


![image](https://github.com/user-attachments/assets/e865ba62-21db-41c1-9899-35a0e7aea134)

![image](https://github.com/user-attachments/assets/32934ba8-2673-401a-aafc-409dfbe9b2c5)


* Fixes #7284 

---

Dataframe APIs PR series:
- #7338
- #7339
- #7340
- #7341
- #7345
abey79 added a commit that referenced this issue Sep 4, 2024
)

### What

- Part of: #7279

This PR updates the dataframe query override UI as per design in #7279,
in particular adding PoV entity and component.
- updated UI layout
- time boundaries default to `+∞`/`–∞` button which, when clicked, turn
into editable time drag value
- reset buttons to go back to the `∞` state
- auto-selection of PoV component based on PoV entity (picks a required
component for one of the entity archetypes)

**Note**:
- This is a pure UI PR. The PoV entity/component are not yet used at all
for the dataframe's content (that will be addressed in a follow-up PR
currently blocked on #7284).
- Ignore the ugly "Time range table order" part, this will be cleaned up
later (#7070)


https://github.com/user-attachments/assets/32151a1f-b0ca-4e99-99df-ea730451d4dc

### Checklist
* [x] I have read and agree to [Contributor
Guide](https://github.com/rerun-io/rerun/blob/main/CONTRIBUTING.md) and
the [Code of
Conduct](https://github.com/rerun-io/rerun/blob/main/CODE_OF_CONDUCT.md)
* [x] I've included a screenshot or gif (if applicable)
* [x] I have tested the web demo (if applicable):
* Using examples from latest `main` build:
[rerun.io/viewer](https://rerun.io/viewer/pr/7331?manifest_url=https://app.rerun.io/version/main/examples_manifest.json)
* Using full set of examples from `nightly` build:
[rerun.io/viewer](https://rerun.io/viewer/pr/7331?manifest_url=https://app.rerun.io/version/nightly/examples_manifest.json)
* [x] The PR title and labels are set such as to maximize their
usefulness for the next release's CHANGELOG
* [x] If applicable, add a new check to the [release
checklist](https://github.com/rerun-io/rerun/blob/main/tests/python/release_checklist)!
* [x] If have noted any breaking changes to the log API in
`CHANGELOG.md` and the migration guide

- [PR Build Summary](https://build.rerun.io/pr/7331)
- [Recent benchmark results](https://build.rerun.io/graphs/crates.html)
- [Wasm size tracking](https://build.rerun.io/graphs/sizes.html)

To run all checks from `main`, comment on the PR with `@rerun-bot
full-check`.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
⛃ re_datastore affects the datastore itself
Projects
None yet
2 participants