Skip to content

Commit

Permalink
Enrich ReDap Catalog with RecordingUri on the fly (#8418)
Browse files Browse the repository at this point in the history
As of right now, only the viewer is fully aware of Data Platform's
endpoint, so we leverage that to enrich Catalog view with
RecordingUri for each recording. We also add the chunk id to make 
received record batch convertible to a Chunk.
  • Loading branch information
zehiko authored Dec 12, 2024
1 parent 32181b4 commit 7b76958
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 8 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5931,6 +5931,7 @@ dependencies = [
"re_log_types",
"re_protos",
"re_smart_channel",
"re_types",
"thiserror",
"tokio",
"tokio-stream",
Expand Down
1 change: 1 addition & 0 deletions crates/store/re_grpc_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ re_log_encoding.workspace = true
re_log_types.workspace = true
re_protos.workspace = true
re_smart_channel.workspace = true
re_types.workspace = true

thiserror.workspace = true
tokio-stream.workspace = true
Expand Down
78 changes: 70 additions & 8 deletions crates/store/re_grpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,19 @@
mod address;

pub use address::{InvalidRedapAddress, RedapAddress};
use re_chunk::external::arrow2;
use re_log_types::external::re_types_core::ComponentDescriptor;
use re_types::components::RecordingUri;
use re_types::Component;
use url::Url;

// ----------------------------------------------------------------------------

use std::error::Error;

use re_chunk::Chunk;
use arrow2::array::Utf8Array as Arrow2Utf8Array;
use arrow2::datatypes::Field as Arrow2Field;
use re_chunk::{Arrow2Array, Chunk, ChunkId, TransportChunk};
use re_log_encoding::codec::{wire::decode, CodecError};
use re_log_types::{
ApplicationId, LogMsg, SetStoreInfo, StoreId, StoreInfo, StoreKind, StoreSource, Time,
Expand Down Expand Up @@ -64,6 +70,9 @@ enum StreamError {

#[error(transparent)]
ChunkError(#[from] re_chunk::ChunkError),

#[error("Invalid URI: {0}")]
InvalidUri(String),
}

// ----------------------------------------------------------------------------
Expand Down Expand Up @@ -179,7 +188,7 @@ async fn stream_recording_async(
let store_id = StoreId::from_string(StoreKind::Recording, recording_id.clone());

let store_info = StoreInfo {
application_id: ApplicationId::from("rerun_data_platform"),
application_id: ApplicationId::from("redap_recording"),
store_id: store_id.clone(),
cloned_from: None,
is_official_example: false,
Expand Down Expand Up @@ -221,9 +230,6 @@ async fn stream_recording_async(
Ok(())
}

/// TODO(zehiko) - this is a copy of `stream_recording_async` with a different gRPC call,
/// this will go away as we tackle unification of data and metadata streams REDAP #74, hence
/// avoiding refactoring right now
async fn stream_catalog_async(
tx: re_smart_channel::Sender<LogMsg>,
redap_endpoint: Url,
Expand Down Expand Up @@ -273,7 +279,7 @@ async fn stream_catalog_async(
let store_id = StoreId::from_string(StoreKind::Recording, "catalog".to_owned());

let store_info = StoreInfo {
application_id: ApplicationId::from("rerun_data_platform"),
application_id: ApplicationId::from("redap_catalog"),
store_id: store_id.clone(),
cloned_from: None,
is_official_example: false,
Expand All @@ -295,8 +301,64 @@ async fn stream_catalog_async(

re_log::info!("Starting to read...");
while let Some(result) = resp.next().await {
let tc = result.map_err(TonicStatusError)?;
let chunk = Chunk::from_transport(&tc)?;
let mut tc = result.map_err(TonicStatusError)?;
// received TransportChunk doesn't have ChunkId, hence we need to add it before converting
// to Chunk
tc.schema.metadata.insert(
TransportChunk::CHUNK_METADATA_KEY_ID.to_owned(),
ChunkId::new().to_string(),
);

let mut chunk = Chunk::from_transport(&tc)?;

// enrich catalog data with RecordingUri that's based on the ReDap endpoint (that we know)
// and the recording id (that we have in the catalog data)
let host = redap_endpoint
.host()
.ok_or(StreamError::InvalidUri(format!(
"couldn't get host from {redap_endpoint}"
)))?;
let port = redap_endpoint
.port()
.ok_or(StreamError::InvalidUri(format!(
"couldn't get port from {redap_endpoint}"
)))?;

let recording_uri_arrays: Vec<Box<dyn Arrow2Array>> = chunk
.iter_component_arrays(&"id".into())
.map(|id| {
let rec_id = id
.as_any()
.downcast_ref::<Arrow2Utf8Array<i32>>()
.ok_or(StreamError::ChunkError(re_chunk::ChunkError::Malformed {
reason: format!("id must be a utf8 array: {:?}", tc.schema),
}))?
.value(0); // each component batch is of length 1 i.e. single 'id' value

let recording_uri = format!("rerun://{host}:{port}/recording/{rec_id}");

let recording_uri_data = Arrow2Utf8Array::<i32>::from([Some(recording_uri)]);

Ok::<Box<_>, StreamError>(
Box::new(recording_uri_data) as Box<dyn arrow2::array::Array>
)
})
.collect::<Result<Vec<_>, _>>()?;

let recording_id_arrays = recording_uri_arrays
.iter()
.map(|e| Some(e.as_ref()))
.collect::<Vec<_>>();

let rec_id_field = Arrow2Field::new("item", arrow2::datatypes::DataType::Utf8, true);
#[allow(clippy::unwrap_used)] // we know we've given the right field type
let uris = re_chunk::util::arrays_to_list_array(
rec_id_field.data_type().clone(),
&recording_id_arrays,
)
.unwrap();

chunk.add_component(ComponentDescriptor::new(RecordingUri::name()), uris)?;

if tx
.send(LogMsg::ArrowMsg(store_id.clone(), chunk.to_arrow_msg()?))
Expand Down

0 comments on commit 7b76958

Please sign in to comment.