diff --git a/Cargo.lock b/Cargo.lock index 8f3a411f3fb9..bd21dfd45a73 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5931,6 +5931,7 @@ dependencies = [ "re_log_types", "re_protos", "re_smart_channel", + "re_types", "thiserror", "tokio", "tokio-stream", diff --git a/crates/store/re_grpc_client/Cargo.toml b/crates/store/re_grpc_client/Cargo.toml index 665e7fc6faa9..efb330fc306e 100644 --- a/crates/store/re_grpc_client/Cargo.toml +++ b/crates/store/re_grpc_client/Cargo.toml @@ -27,6 +27,7 @@ re_log_encoding.workspace = true re_log_types.workspace = true re_protos.workspace = true re_smart_channel.workspace = true +re_types.workspace = true thiserror.workspace = true tokio-stream.workspace = true diff --git a/crates/store/re_grpc_client/src/lib.rs b/crates/store/re_grpc_client/src/lib.rs index 478312bb6aaa..d6e7b3605e07 100644 --- a/crates/store/re_grpc_client/src/lib.rs +++ b/crates/store/re_grpc_client/src/lib.rs @@ -3,13 +3,19 @@ mod address; pub use address::{InvalidRedapAddress, RedapAddress}; +use re_chunk::external::arrow2; +use re_log_types::external::re_types_core::ComponentDescriptor; +use re_types::components::RecordingUri; +use re_types::Component; use url::Url; // ---------------------------------------------------------------------------- use std::error::Error; -use re_chunk::Chunk; +use arrow2::array::Utf8Array as Arrow2Utf8Array; +use arrow2::datatypes::Field as Arrow2Field; +use re_chunk::{Arrow2Array, Chunk, ChunkId, TransportChunk}; use re_log_encoding::codec::{wire::decode, CodecError}; use re_log_types::{ ApplicationId, LogMsg, SetStoreInfo, StoreId, StoreInfo, StoreKind, StoreSource, Time, @@ -64,6 +70,9 @@ enum StreamError { #[error(transparent)] ChunkError(#[from] re_chunk::ChunkError), + + #[error("Invalid URI: {0}")] + InvalidUri(String), } // ---------------------------------------------------------------------------- @@ -179,7 +188,7 @@ async fn stream_recording_async( let store_id = StoreId::from_string(StoreKind::Recording, recording_id.clone()); let store_info = StoreInfo { - application_id: ApplicationId::from("rerun_data_platform"), + application_id: ApplicationId::from("redap_recording"), store_id: store_id.clone(), cloned_from: None, is_official_example: false, @@ -221,9 +230,6 @@ async fn stream_recording_async( Ok(()) } -/// TODO(zehiko) - this is a copy of `stream_recording_async` with a different gRPC call, -/// this will go away as we tackle unification of data and metadata streams REDAP #74, hence -/// avoiding refactoring right now async fn stream_catalog_async( tx: re_smart_channel::Sender, redap_endpoint: Url, @@ -273,7 +279,7 @@ async fn stream_catalog_async( let store_id = StoreId::from_string(StoreKind::Recording, "catalog".to_owned()); let store_info = StoreInfo { - application_id: ApplicationId::from("rerun_data_platform"), + application_id: ApplicationId::from("redap_catalog"), store_id: store_id.clone(), cloned_from: None, is_official_example: false, @@ -295,8 +301,64 @@ async fn stream_catalog_async( re_log::info!("Starting to read..."); while let Some(result) = resp.next().await { - let tc = result.map_err(TonicStatusError)?; - let chunk = Chunk::from_transport(&tc)?; + let mut tc = result.map_err(TonicStatusError)?; + // received TransportChunk doesn't have ChunkId, hence we need to add it before converting + // to Chunk + tc.schema.metadata.insert( + TransportChunk::CHUNK_METADATA_KEY_ID.to_owned(), + ChunkId::new().to_string(), + ); + + let mut chunk = Chunk::from_transport(&tc)?; + + // enrich catalog data with RecordingUri that's based on the ReDap endpoint (that we know) + // and the recording id (that we have in the catalog data) + let host = redap_endpoint + .host() + .ok_or(StreamError::InvalidUri(format!( + "couldn't get host from {redap_endpoint}" + )))?; + let port = redap_endpoint + .port() + .ok_or(StreamError::InvalidUri(format!( + "couldn't get port from {redap_endpoint}" + )))?; + + let recording_uri_arrays: Vec> = chunk + .iter_component_arrays(&"id".into()) + .map(|id| { + let rec_id = id + .as_any() + .downcast_ref::>() + .ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed { + reason: format!("id must be a utf8 array: {:?}", tc.schema), + }))? + .value(0); // each component batch is of length 1 i.e. single 'id' value + + let recording_uri = format!("rerun://{host}:{port}/recording/{rec_id}"); + + let recording_uri_data = Arrow2Utf8Array::::from([Some(recording_uri)]); + + Ok::, StreamError>( + Box::new(recording_uri_data) as Box + ) + }) + .collect::, _>>()?; + + let recording_id_arrays = recording_uri_arrays + .iter() + .map(|e| Some(e.as_ref())) + .collect::>(); + + let rec_id_field = Arrow2Field::new("item", arrow2::datatypes::DataType::Utf8, true); + #[allow(clippy::unwrap_used)] // we know we've given the right field type + let uris = re_chunk::util::arrays_to_list_array( + rec_id_field.data_type().clone(), + &recording_id_arrays, + ) + .unwrap(); + + chunk.add_component(ComponentDescriptor::new(RecordingUri::name()), uris)?; if tx .send(LogMsg::ArrowMsg(store_id.clone(), chunk.to_arrow_msg()?))