Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SDK DataLoaders 5: customizable (external) loaders for Rust #5351

Merged
merged 14 commits into from
Feb 29, 2024
6 changes: 3 additions & 3 deletions crates/re_data_source/src/data_loader/loader_archetype.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl DataLoader for ArchetypeLoader {
#[cfg(not(target_arch = "wasm32"))]
fn load_from_path(
&self,
store_id: re_log_types::StoreId,
settings: &crate::DataLoaderSettings,
filepath: std::path::PathBuf,
tx: std::sync::mpsc::Sender<LoadedData>,
) -> Result<(), crate::DataLoaderError> {
Expand All @@ -35,12 +35,12 @@ impl DataLoader for ArchetypeLoader {
.with_context(|| format!("Failed to read file {filepath:?}"))?;
let contents = std::borrow::Cow::Owned(contents);

self.load_from_file_contents(store_id, filepath, contents, tx)
self.load_from_file_contents(settings, filepath, contents, tx)
}

fn load_from_file_contents(
&self,
_store_id: re_log_types::StoreId,
_settings: &crate::DataLoaderSettings,
filepath: std::path::PathBuf,
contents: std::borrow::Cow<'_, [u8]>,
tx: std::sync::mpsc::Sender<LoadedData>,
Expand Down
8 changes: 4 additions & 4 deletions crates/re_data_source/src/data_loader/loader_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ impl crate::DataLoader for DirectoryLoader {
#[cfg(not(target_arch = "wasm32"))]
fn load_from_path(
&self,
store_id: re_log_types::StoreId,
settings: &crate::DataLoaderSettings,
dirpath: std::path::PathBuf,
tx: std::sync::mpsc::Sender<crate::LoadedData>,
) -> Result<(), crate::DataLoaderError> {
Expand All @@ -39,7 +39,7 @@ impl crate::DataLoader for DirectoryLoader {

let filepath = entry.path();
if filepath.is_file() {
let store_id = store_id.clone();
let settings = settings.clone();
let filepath = filepath.to_owned();
let tx = tx.clone();

Expand All @@ -51,7 +51,7 @@ impl crate::DataLoader for DirectoryLoader {
_ = std::thread::Builder::new()
.name(format!("load_dir_entry({filepath:?})"))
.spawn(move || {
let data = match crate::load_file::load(&store_id, &filepath, None) {
let data = match crate::load_file::load(&settings, &filepath, None) {
Ok(data) => data,
Err(err) => {
re_log::error!(?filepath, %err, "Failed to load directory entry");
Expand All @@ -74,7 +74,7 @@ impl crate::DataLoader for DirectoryLoader {
#[inline]
fn load_from_file_contents(
&self,
_store_id: re_log_types::StoreId,
_settings: &crate::DataLoaderSettings,
path: std::path::PathBuf,
_contents: std::borrow::Cow<'_, [u8]>,
_tx: std::sync::mpsc::Sender<crate::LoadedData>,
Expand Down
11 changes: 6 additions & 5 deletions crates/re_data_source/src/data_loader/loader_external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl crate::DataLoader for ExternalLoader {

fn load_from_path(
&self,
store_id: re_log_types::StoreId,
settings: &crate::DataLoaderSettings,
filepath: std::path::PathBuf,
tx: std::sync::mpsc::Sender<crate::LoadedData>,
) -> Result<(), crate::DataLoaderError> {
Expand All @@ -126,8 +126,9 @@ impl crate::DataLoader for ExternalLoader {
struct CompatibleLoaderFound;
let (tx_feedback, rx_feedback) = std::sync::mpsc::channel::<CompatibleLoaderFound>();

let args = settings.to_cli_args();
for exe in EXTERNAL_LOADER_PATHS.iter() {
let store_id = store_id.clone();
let args = args.clone();
let filepath = filepath.clone();
let tx = tx.clone();
let tx_feedback = tx_feedback.clone();
Expand All @@ -139,7 +140,7 @@ impl crate::DataLoader for ExternalLoader {

let child = Command::new(exe)
.arg(filepath.clone())
.args(["--recording-id".to_owned(), store_id.to_string()])
.args(args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn();
Expand Down Expand Up @@ -277,12 +278,12 @@ impl crate::DataLoader for ExternalLoader {
#[inline]
fn load_from_file_contents(
&self,
_store_id: re_log_types::StoreId,
_settings: &crate::DataLoaderSettings,
path: std::path::PathBuf,
_contents: std::borrow::Cow<'_, [u8]>,
_tx: std::sync::mpsc::Sender<crate::LoadedData>,
) -> Result<(), crate::DataLoaderError> {
// TODO(cmc): You could imagine a world where plugins can be streamed rrd data via their
// TODO(#5324): You could imagine a world where plugins can be streamed rrd data via their
// standard input… but today is not world.
Err(crate::DataLoaderError::Incompatible(path))
}
Expand Down
4 changes: 2 additions & 2 deletions crates/re_data_source/src/data_loader/loader_rrd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ impl crate::DataLoader for RrdLoader {
fn load_from_path(
&self,
// NOTE: The Store ID comes from the rrd file itself.
_store_id: re_log_types::StoreId,
_settings: &crate::DataLoaderSettings,
filepath: std::path::PathBuf,
tx: std::sync::mpsc::Sender<crate::LoadedData>,
) -> Result<(), crate::DataLoaderError> {
Expand Down Expand Up @@ -58,7 +58,7 @@ impl crate::DataLoader for RrdLoader {
fn load_from_file_contents(
&self,
// NOTE: The Store ID comes from the rrd file itself.
_store_id: re_log_types::StoreId,
_settings: &crate::DataLoaderSettings,
filepath: std::path::PathBuf,
contents: std::borrow::Cow<'_, [u8]>,
tx: std::sync::mpsc::Sender<crate::LoadedData>,
Expand Down
102 changes: 99 additions & 3 deletions crates/re_data_source/src/data_loader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,106 @@ use std::sync::Arc;

use once_cell::sync::Lazy;

use re_log_types::{ArrowMsg, DataRow, LogMsg};
use re_log_types::{ArrowMsg, DataRow, EntityPath, LogMsg, TimePoint};

// ---

/// Recommended settings for the [`DataLoader`].
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get the recommended method on this, but I'm confused/not convinced that this thing as a whole is "recommended settings". Why not just DataLoaderSettings and then each field elaborates what effect it expects (already the case)? I get that we're defensive here and want to make clear that a data loader that ignores the settings is still a valid data loader and these are all "would you kindly please"-arguments, but I find baking that into the name goes a bit too far and is more confusing than helping.

... anyways, not a hill I gonna die on so if you prefer this as is keep it :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there's something more we could write about the (lack of) consequences if a data loader ignores the args

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure we can rename to DataLoaderSettings.

I wonder if there's something more we could write about the (lack of) consequences if a data loader ignores the args

We've already done so in a bunch of places, including the guide.

///
/// The loader is free to ignore some or all of these.
///
/// External [`DataLoader`]s will be passed the following CLI parameters:
/// * `--recording-id <store_id>`
/// * `--opened-recording-id <opened_store_id>` (if set)
/// * `--entity-path-prefix <entity_path_prefix>` (if set)
/// * `--timeless` (if `timepoint` is set to the timeless timepoint)
/// * `--time <timeline1>=<time1> <timeline2>=<time2> ...` (if `timepoint` contains temporal data)
/// * `--sequence <timeline1>=<seq1> <timeline2>=<seq2> ...` (if `timepoint` contains sequence data)
#[derive(Debug, Clone)]
pub struct DataLoaderSettings {
/// The recommended [`re_log_types::StoreId`] to log the data to, based on the surrounding context.
pub store_id: re_log_types::StoreId,

/// The [`re_log_types::StoreId`] that is currently opened in the viewer, if any.
///
/// Log data to this recording if you want it to appear in a new recording shared by all
/// data-loaders for the current loading session.
//
// TODO(#5350): actually support this
pub opened_store_id: Option<re_log_types::StoreId>,

/// What should the entity paths be prefixed with?
pub entity_path_prefix: Option<EntityPath>,

/// At what time(s) should the data be logged to?
pub timepoint: Option<TimePoint>,
}

impl DataLoaderSettings {
#[inline]
pub fn recommended(store_id: impl Into<re_log_types::StoreId>) -> Self {
Self {
store_id: store_id.into(),
opened_store_id: Default::default(),
entity_path_prefix: Default::default(),
timepoint: Default::default(),
}
}

/// Generates CLI flags from these settings, for external data loaders.
pub fn to_cli_args(&self) -> Vec<String> {
let Self {
store_id,
opened_store_id,
entity_path_prefix,
timepoint,
} = self;

let mut args = Vec::new();

args.extend(["--recording-id".to_owned(), format!("{store_id}")]);

if let Some(opened_store_id) = opened_store_id {
args.extend([
"--opened-recording-id".to_owned(),
format!("{opened_store_id}"),
]);
}

if let Some(entity_path_prefix) = entity_path_prefix {
args.extend([
"--entity-path-prefix".to_owned(),
format!("{entity_path_prefix}"),
]);
}

if let Some(timepoint) = timepoint {
if timepoint.is_timeless() {
args.push("--timeless".to_owned());
}

for (timeline, time) in timepoint.iter() {
match timeline.typ() {
re_log_types::TimeType::Time => {
args.extend([
"--time".to_owned(),
format!("{}={}", timeline.name(), time.as_i64()),
]);
}
re_log_types::TimeType::Sequence => {
args.extend([
"--sequence".to_owned(),
format!("{}={}", timeline.name(), time.as_i64()),
]);
}
}
}
}

args
}
}

/// A [`DataLoader`] loads data from a file path and/or a file's contents.
///
/// Files can be loaded in 3 different ways:
Expand Down Expand Up @@ -90,7 +186,7 @@ pub trait DataLoader: Send + Sync {
#[cfg(not(target_arch = "wasm32"))]
fn load_from_path(
&self,
store_id: re_log_types::StoreId,
settings: &DataLoaderSettings,
path: std::path::PathBuf,
tx: std::sync::mpsc::Sender<LoadedData>,
) -> Result<(), DataLoaderError>;
Expand Down Expand Up @@ -122,7 +218,7 @@ pub trait DataLoader: Send + Sync {
/// with a [`DataLoaderError::Incompatible`] error.
fn load_from_file_contents(
&self,
store_id: re_log_types::StoreId,
settings: &DataLoaderSettings,
filepath: std::path::PathBuf,
contents: std::borrow::Cow<'_, [u8]>,
tx: std::sync::mpsc::Sender<LoadedData>,
Expand Down
12 changes: 8 additions & 4 deletions crates/re_data_source/src/data_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,10 @@ impl DataSource {
// This `StoreId` will be communicated to all `DataLoader`s, which may or may not
// decide to use it depending on whether they want to share a common recording
// or not.
let store_id = re_log_types::StoreId::random(re_log_types::StoreKind::Recording);
crate::load_from_path(&store_id, file_source, &path, &tx)
let shared_store_id =
re_log_types::StoreId::random(re_log_types::StoreKind::Recording);
let settings = crate::DataLoaderSettings::recommended(shared_store_id);
crate::load_from_path(&settings, file_source, &path, &tx)
.with_context(|| format!("{path:?}"))?;

if let Some(on_msg) = on_msg {
Expand All @@ -156,9 +158,11 @@ impl DataSource {
// This `StoreId` will be communicated to all `DataLoader`s, which may or may not
// decide to use it depending on whether they want to share a common recording
// or not.
let store_id = re_log_types::StoreId::random(re_log_types::StoreKind::Recording);
let shared_store_id =
re_log_types::StoreId::random(re_log_types::StoreKind::Recording);
let settings = crate::DataLoaderSettings::recommended(shared_store_id);
crate::load_from_file_contents(
&store_id,
&settings,
file_source,
&std::path::PathBuf::from(file_contents.name),
std::borrow::Cow::Borrowed(&file_contents.bytes),
Expand Down
2 changes: 1 addition & 1 deletion crates/re_data_source/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mod load_stdin;

pub use self::data_loader::{
iter_loaders, register_custom_data_loader, ArchetypeLoader, DataLoader, DataLoaderError,
DirectoryLoader, LoadedData, RrdLoader,
DataLoaderSettings, DirectoryLoader, LoadedData, RrdLoader,
};
pub use self::data_source::DataSource;
pub use self::load_file::{extension, load_from_file_contents};
Expand Down
Loading
Loading