Skip to content

Commit

Permalink
gRPC spec update: use RecordingMetadata instead of RecordingInfo and …
Browse files Browse the repository at this point in the history
…rename url to storage_url (#8026)
  • Loading branch information
zehiko authored Nov 7, 2024
1 parent 897bf24 commit 0bc05c3
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 52 deletions.
19 changes: 5 additions & 14 deletions crates/store/re_protos/proto/rerun/v0/remote_store.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ service StorageNode {
message RegisterRecordingRequest {
// human readable description of the recording
string description = 1;
// information about recording's backing storage
// TODO(zehiko) add separate info about the "source" recording
string url = 2;
// recording storage url (e.g. s3://bucket/file or file:///path/to/file)
string storage_url = 2;
// type of recording
RecordingType typ = 3;
// (optional) any additional metadata that should be associated with the recording
Expand All @@ -46,8 +45,8 @@ message RegisterRecordingResponse {
message RegistrationError {
// error code
ErrorCode code = 1;
// url of the recording that failed to register
string url = 2;
// storage url of the recording that failed to register
string storage_url = 2;
// human readable details about the error
string message = 3;
}
Expand Down Expand Up @@ -97,15 +96,7 @@ enum EncoderVersion {
message ListRecordingsRequest {}

message ListRecordingsResponse {
repeated RecordingInfo recordings = 1;
}

message RecordingInfo {
RecordingId id = 1;
string description = 2;
string storage_url = 3;
uint64 size_bytes = 4;
RecordingType typ = 5;
repeated RecordingMetadata recordings = 1;
}

enum RecordingType {
Expand Down
60 changes: 50 additions & 10 deletions crates/store/re_protos/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,36 @@ impl RecordingMetadata {
}
}
}

/// Returns unique id of the recording
pub fn id(&self) -> Result<re_log_types::StoreId, CodecError> {
let metadata = self.data()?;
let id_pos = metadata
.schema
.fields
.iter()
// TODO(zehiko) we need to figure out where mandatory fields live
.position(|field| field.name == "id")
.ok_or_else(|| CodecError::InvalidArgument("missing id field in schema".to_owned()))?;

use arrow2::array::Utf8Array as ArrowUtf8Array;

let id = metadata.data.columns()[id_pos]
.as_any()
.downcast_ref::<ArrowUtf8Array<i32>>()
.ok_or_else(|| {
CodecError::InvalidArgument(format!(
"Unexpected type for id with position {id_pos} in schema: {:?}",
metadata.schema
))
})?
.value(0);

Ok(re_log_types::StoreId::from_string(
re_log_types::StoreKind::Recording,
id.to_owned(),
))
}
}

/// Helper function that serializes given arrow schema and record batch into bytes
Expand Down Expand Up @@ -224,10 +254,15 @@ fn read_arrow_from_bytes<R: std::io::Read>(
#[cfg(test)]
mod tests {

use arrow2::array::Utf8Array as ArrowUtf8Array;
use arrow2::chunk::Chunk as ArrowChunk;
use arrow2::{array::Int32Array, datatypes::Field, datatypes::Schema as ArrowSchema};
use arrow2::{
array::Int32Array as ArrowInt32Array, datatypes::Field as ArrowField,
datatypes::Schema as ArrowSchema,
};
use re_dataframe::external::re_chunk::{Chunk, RowId};
use re_dataframe::TransportChunk;
use re_log_types::StoreId;
use re_log_types::{example_components::MyPoint, Timeline};

use crate::v0::RecordingMetadata;
Expand Down Expand Up @@ -325,19 +360,24 @@ mod tests {

#[test]
fn test_recording_metadata_serialization() {
let expected_schema = ArrowSchema::from(vec![Field::new(
"my_int",
arrow2::datatypes::DataType::Int32,
false,
)]);
let my_ints = Int32Array::from_slice([42]);
let expected_chunk = ArrowChunk::new(vec![Box::new(my_ints) as _]);
let expected_schema = ArrowSchema::from(vec![
ArrowField::new("id", arrow2::datatypes::DataType::Utf8, false),
ArrowField::new("my_int", arrow2::datatypes::DataType::Int32, false),
]);

let id = ArrowUtf8Array::<i32>::from_slice(["some_id"]);
let my_ints = ArrowInt32Array::from_slice([42]);
let expected_chunk = ArrowChunk::new(vec![Box::new(id) as _, Box::new(my_ints) as _]);
let metadata_tc = TransportChunk {
schema: expected_schema.clone(),
data: expected_chunk.clone(),
};

let metadata = RecordingMetadata::try_from(EncoderVersion::V0, &metadata_tc).unwrap();
assert_eq!(
StoreId::from_string(re_log_types::StoreKind::Recording, "some_id".to_owned()),
metadata.id().unwrap()
);

let tc = metadata.data().unwrap();

Expand All @@ -347,13 +387,13 @@ mod tests {

#[test]
fn test_recording_metadata_fails_with_non_unit_batch() {
let expected_schema = ArrowSchema::from(vec![Field::new(
let expected_schema = ArrowSchema::from(vec![ArrowField::new(
"my_int",
arrow2::datatypes::DataType::Int32,
false,
)]);
// more than 1 row in the batch
let my_ints = Int32Array::from_slice([41, 42]);
let my_ints = ArrowInt32Array::from_slice([41, 42]);

let expected_chunk = ArrowChunk::new(vec![Box::new(my_ints) as _]);
let metadata_tc = TransportChunk {
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_protos/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ pub mod v0 {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!(
"Failed to register recording: {}, error code: {}, error message: {}",
self.url, self.code, self.message
self.storage_url, self.code, self.message
))
}
}
Expand Down
24 changes: 5 additions & 19 deletions crates/store/re_protos/src/v0/rerun.remote_store.v0.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 11 additions & 8 deletions rerun_py/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub struct PyConnection {
#[pymethods]
impl PyConnection {
/// List all recordings registered with the node.
fn list_recordings(&mut self) -> PyResult<Vec<PyRecordingInfo>> {
fn list_recordings(&mut self) -> PyResult<Vec<PyRecordingMetadata>> {
self.runtime.block_on(async {
let request = ListRecordingsRequest {};

Expand All @@ -66,7 +66,7 @@ impl PyConnection {
.into_inner()
.recordings
.into_iter()
.map(|recording| PyRecordingInfo { info: recording })
.map(|recording| PyRecordingMetadata { info: recording })
.collect())
})
}
Expand Down Expand Up @@ -127,7 +127,7 @@ impl PyConnection {
let request = RegisterRecordingRequest {
// TODO(jleibs): Description should really just be in the metadata
description: Default::default(),
url: storage_url.to_string(),
storage_url: storage_url.to_string(),
metadata,
typ: RecordingType::Rrd.into(),
};
Expand Down Expand Up @@ -187,17 +187,20 @@ impl MetadataLike {
}

/// The info for a recording stored in the archive.
#[pyclass(name = "RecordingInfo")]
pub struct PyRecordingInfo {
info: re_protos::v0::RecordingInfo,
#[pyclass(name = "RecordingMetadata")]
pub struct PyRecordingMetadata {
info: re_protos::v0::RecordingMetadata,
}

#[pymethods]
impl PyRecordingInfo {
impl PyRecordingMetadata {
fn __repr__(&self) -> String {
format!(
"Recording(id={})",
self.info.id.as_ref().map_or("Unknown", |id| id.id.as_str())
self.info
.id()
.map(|id| id.to_string())
.unwrap_or("Unknown".to_owned())
)
}
}

0 comments on commit 0bc05c3

Please sign in to comment.