Skip to content

Commit

Permalink
Encode LogMsg using protobuf (#8347)
Browse files Browse the repository at this point in the history
  • Loading branch information
jprochazk authored Dec 12, 2024
1 parent 776aa4f commit 0786fda
Show file tree
Hide file tree
Showing 59 changed files with 2,740 additions and 597 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -5816,6 +5816,7 @@ dependencies = [
"re_chunk",
"re_chunk_store",
"re_log",
"re_log_encoding",
"re_log_types",
"re_query",
"re_tracing",
Expand Down Expand Up @@ -6083,6 +6084,7 @@ name = "re_protos_builder"
version = "0.21.0-alpha.1+dev"
dependencies = [
"camino",
"prost-build",
"re_log",
"tonic-build",
]
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ prettyplease = "0.2"
proc-macro2 = { version = "1.0", default-features = false }
profiling = { version = "1.0.12", default-features = false }
prost = "0.13.3"
prost-build = "0.13.3"
puffin = "0.19.1"
puffin_http = "0.16"
pyo3 = "0.22.5"
Expand Down
1 change: 1 addition & 0 deletions crates/build/re_protos_builder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ camino.workspace = true
tonic-build = { workspace = true, default-features = false, features = [
"prost",
] }
prost-build = { workspace = true }
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

use camino::Utf8Path;

const PROTOBUF_DEFINITIONS_DIR_PATH: &str = "crates/store/re_protos/proto";
const PROTOBUF_REMOTE_STORE_V0_RELATIVE_PATH: &str = "rerun/v0/remote_store.proto";
const RUST_V0_OUTPUT_DIR_PATH: &str = "crates/store/re_protos/src/v0";
const PROTOS_DIR: &str = "crates/store/re_protos/proto";
const INPUT_V0: &[&str] = &["rerun/v0/remote_store.proto", "rerun/v0/log_msg.proto"];
const OUTPUT_V0_RUST_DIR: &str = "crates/store/re_protos/src/v0";

fn main() {
re_log::setup_logging();
Expand All @@ -26,8 +26,8 @@ fn main() {
"failed to find workspace root"
);

let definitions_dir_path = workspace_dir.join(PROTOBUF_DEFINITIONS_DIR_PATH);
let rust_generated_output_dir_path = workspace_dir.join(RUST_V0_OUTPUT_DIR_PATH);
let definitions_dir_path = workspace_dir.join(PROTOS_DIR);
let rust_generated_output_dir_path = workspace_dir.join(OUTPUT_V0_RUST_DIR);

re_log::info!(
definitions=?definitions_dir_path,
Expand All @@ -37,7 +37,7 @@ fn main() {

re_protos_builder::generate_rust_code(
definitions_dir_path,
&[PROTOBUF_REMOTE_STORE_V0_RELATIVE_PATH],
INPUT_V0,
rust_generated_output_dir_path,
);
}
5 changes: 4 additions & 1 deletion crates/build/re_protos_builder/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ pub fn generate_rust_code(
proto_paths: &[impl AsRef<Path>],
output_dir: impl AsRef<Path>,
) {
let mut prost_config = prost_build::Config::new();
prost_config.enable_type_names(); // tonic doesn't expose this option

if let Err(err) = tonic_build::configure()
.out_dir(output_dir.as_ref())
.build_client(true)
.build_server(true)
.build_transport(false) // Small convenience, but doesn't work on web
.compile_protos(proto_paths, &[definitions_dir])
.compile_protos_with_config(prost_config, proto_paths, &[definitions_dir])
{
match err.kind() {
std::io::ErrorKind::Other => {
Expand Down
4 changes: 1 addition & 3 deletions crates/store/re_chunk_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,14 @@ pub use re_chunk::{
Chunk, ChunkId, ChunkShared, LatestAtQuery, RangeQuery, RangeQueryOptions, RowId,
UnitChunkShared,
};
#[doc(no_inline)]
pub use re_log_encoding::decoder::VersionPolicy;

#[doc(no_inline)]
pub use re_log_types::{ResolvedTimeRange, TimeInt, TimeType, Timeline};

pub mod external {
pub use arrow2;

pub use re_chunk;
pub use re_log_encoding;
}

// ---
Expand Down
34 changes: 16 additions & 18 deletions crates/store/re_chunk_store/src/protobuf_conversions.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use re_protos::missing_field;
use re_protos::TypeConversionError;
use std::collections::BTreeMap;
use std::collections::BTreeSet;

use re_protos::TypeConversionError;

impl TryFrom<re_protos::common::v0::ComponentColumnSelector> for crate::ComponentColumnSelector {
type Error = TypeConversionError;

Expand All @@ -11,16 +11,16 @@ impl TryFrom<re_protos::common::v0::ComponentColumnSelector> for crate::Componen
) -> Result<Self, Self::Error> {
let entity_path = value
.entity_path
.ok_or(TypeConversionError::missing_field(
"rerun.common.v0.ComponentColumnSelector",
.ok_or(missing_field!(
re_protos::common::v0::ComponentColumnSelector,
"entity_path",
))?
.try_into()?;

let component_name = value
.component
.ok_or(TypeConversionError::missing_field(
"rerun.common.v0.ComponentColumnSelector",
.ok_or(missing_field!(
re_protos::common::v0::ComponentColumnSelector,
"component",
))?
.name;
Expand All @@ -36,8 +36,8 @@ impl TryFrom<re_protos::common::v0::TimeColumnSelector> for crate::TimeColumnSel
type Error = TypeConversionError;

fn try_from(value: re_protos::common::v0::TimeColumnSelector) -> Result<Self, Self::Error> {
let timeline = value.timeline.ok_or(TypeConversionError::missing_field(
"rerun.common.v0.TimeColumnSelector",
let timeline = value.timeline.ok_or(missing_field!(
re_protos::common::v0::TimeColumnSelector,
"timeline",
))?;

Expand All @@ -51,12 +51,10 @@ impl TryFrom<re_protos::common::v0::ColumnSelector> for crate::ColumnSelector {
type Error = TypeConversionError;

fn try_from(value: re_protos::common::v0::ColumnSelector) -> Result<Self, Self::Error> {
match value
.selector_type
.ok_or(TypeConversionError::missing_field(
"rerun.common.v0.ColumnSelector",
"selector_type",
))? {
match value.selector_type.ok_or(missing_field!(
re_protos::common::v0::ColumnSelector,
"selector_type",
))? {
re_protos::common::v0::column_selector::SelectorType::ComponentColumn(
component_column_selector,
) => {
Expand Down Expand Up @@ -115,8 +113,8 @@ impl TryFrom<re_protos::common::v0::ViewContents> for crate::ViewContentsSelecto
.map(|part| {
let entity_path: re_log_types::EntityPath = part
.path
.ok_or(TypeConversionError::missing_field(
"rerun.common.v0.ViewContentsPart",
.ok_or(missing_field!(
re_protos::common::v0::ViewContentsPart,
"path",
))?
.try_into()?;
Expand All @@ -139,8 +137,8 @@ impl TryFrom<re_protos::common::v0::Query> for crate::QueryExpression {
fn try_from(value: re_protos::common::v0::Query) -> Result<Self, Self::Error> {
let filtered_index = value
.filtered_index
.ok_or(TypeConversionError::missing_field(
"rerun.common.v0.Query",
.ok_or(missing_field!(
re_protos::common::v0::Query,
"filtered_index",
))?
.try_into()?;
Expand Down
4 changes: 2 additions & 2 deletions crates/store/re_chunk_store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ impl ChunkStore {
pub fn from_rrd_filepath(
store_config: &ChunkStoreConfig,
path_to_rrd: impl AsRef<std::path::Path>,
version_policy: crate::VersionPolicy,
version_policy: re_log_encoding::VersionPolicy,
) -> anyhow::Result<BTreeMap<StoreId, Self>> {
let path_to_rrd = path_to_rrd.as_ref();

Expand Down Expand Up @@ -808,7 +808,7 @@ impl ChunkStore {
pub fn handle_from_rrd_filepath(
store_config: &ChunkStoreConfig,
path_to_rrd: impl AsRef<std::path::Path>,
version_policy: crate::VersionPolicy,
version_policy: re_log_encoding::VersionPolicy,
) -> anyhow::Result<BTreeMap<StoreId, ChunkStoreHandle>> {
Ok(
Self::from_rrd_filepath(store_config, path_to_rrd, version_policy)?
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_data_loader/src/loader_external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ impl crate::DataLoader for ExternalLoader {
// streaming data to stdout.
let is_sending_data = Arc::new(AtomicBool::new(false));

let version_policy = re_log_encoding::decoder::VersionPolicy::Warn;
let version_policy = re_log_encoding::VersionPolicy::Warn;
let stdout = std::io::BufReader::new(stdout);
match re_log_encoding::decoder::Decoder::new(version_policy, stdout) {
Ok(decoder) => {
Expand Down
10 changes: 5 additions & 5 deletions crates/store/re_data_loader/src/loader_rrd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl crate::DataLoader for RrdLoader {
"Loading rrd data from filesystem…",
);

let version_policy = re_log_encoding::decoder::VersionPolicy::Warn;
let version_policy = re_log_encoding::VersionPolicy::Warn;

match extension.as_str() {
"rbl" => {
Expand Down Expand Up @@ -118,7 +118,7 @@ impl crate::DataLoader for RrdLoader {
return Err(crate::DataLoaderError::Incompatible(filepath));
}

let version_policy = re_log_encoding::decoder::VersionPolicy::Warn;
let version_policy = re_log_encoding::VersionPolicy::Warn;
let contents = std::io::Cursor::new(contents);
let decoder = match re_log_encoding::decoder::Decoder::new(version_policy, contents) {
Ok(decoder) => decoder,
Expand Down Expand Up @@ -308,7 +308,7 @@ impl RetryableFileReader {
mod tests {
use re_build_info::CrateVersion;
use re_chunk::RowId;
use re_log_encoding::{decoder, encoder::DroppableEncoder};
use re_log_encoding::{encoder::DroppableEncoder, VersionPolicy};
use re_log_types::{
ApplicationId, LogMsg, SetStoreInfo, StoreId, StoreInfo, StoreKind, StoreSource, Time,
};
Expand Down Expand Up @@ -341,7 +341,7 @@ mod tests {

let mut encoder = DroppableEncoder::new(
re_build_info::CrateVersion::LOCAL,
re_log_encoding::EncodingOptions::UNCOMPRESSED,
re_log_encoding::EncodingOptions::MSGPACK_UNCOMPRESSED,
rrd_file,
)
.unwrap();
Expand Down Expand Up @@ -372,7 +372,7 @@ mod tests {
encoder.flush_blocking().expect("failed to flush messages");

let reader = RetryableFileReader::new(&rrd_file_path).unwrap();
let mut decoder = Decoder::new(decoder::VersionPolicy::Warn, reader).unwrap();
let mut decoder = Decoder::new(VersionPolicy::Warn, reader).unwrap();

// we should be able to read 5 messages that we wrote
let decoded_messages = (0..5)
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_data_source/src/load_stdin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use re_smart_channel::Sender;
/// This fails synchronously iff the standard input stream could not be opened, otherwise errors
/// are handled asynchronously (as in: they're logged).
pub fn load_stdin(tx: Sender<LogMsg>) -> anyhow::Result<()> {
let version_policy = re_log_encoding::decoder::VersionPolicy::Warn;
let version_policy = re_log_encoding::VersionPolicy::Warn;

let stdin = std::io::BufReader::new(std::io::stdin());
let decoder = re_log_encoding::decoder::Decoder::new_concatenated(version_policy, stdin)?;
Expand Down
3 changes: 2 additions & 1 deletion crates/store/re_dataframe/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,17 @@ all-features = true
[features]
default = []


[dependencies]
# Rerun dependencies:
re_chunk.workspace = true
re_chunk_store.workspace = true
re_log.workspace = true
re_log_encoding.workspace = true
re_log_types.workspace = true
re_query.workspace = true
re_tracing.workspace = true
re_types_core.workspace = true

# External dependencies:
anyhow.workspace = true
arrow2.workspace = true
Expand Down
3 changes: 2 additions & 1 deletion crates/store/re_dataframe/examples/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use itertools::Itertools;

use re_dataframe::{
ChunkStoreConfig, EntityPathFilter, QueryEngine, QueryExpression, ResolvedTimeRange,
SparseFillStrategy, StoreKind, TimeInt, Timeline, VersionPolicy,
SparseFillStrategy, StoreKind, TimeInt, Timeline,
};
use re_log_encoding::VersionPolicy;

fn main() -> anyhow::Result<()> {
let args = std::env::args().collect_vec();
Expand Down
3 changes: 1 addition & 2 deletions crates/store/re_dataframe/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::collections::BTreeMap;
use re_chunk::{EntityPath, TransportChunk};
use re_chunk_store::{
ChunkStore, ChunkStoreConfig, ChunkStoreHandle, ColumnDescriptor, QueryExpression,
VersionPolicy,
};
use re_log_types::{EntityPathFilter, StoreId};
use re_query::{QueryCache, QueryCacheHandle, StorageEngine, StorageEngineLike};
Expand Down Expand Up @@ -59,7 +58,7 @@ impl QueryEngine<StorageEngine> {
pub fn from_rrd_filepath(
store_config: &ChunkStoreConfig,
path_to_rrd: impl AsRef<std::path::Path>,
version_policy: VersionPolicy,
version_policy: re_log_encoding::VersionPolicy,
) -> anyhow::Result<BTreeMap<StoreId, Self>> {
Ok(
ChunkStore::handle_from_rrd_filepath(store_config, path_to_rrd, version_policy)?
Expand Down
3 changes: 1 addition & 2 deletions crates/store/re_dataframe/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ pub use self::external::re_chunk::{util::concatenate_record_batches, TransportCh
#[doc(no_inline)]
pub use self::external::re_chunk_store::{
ChunkStoreConfig, ChunkStoreHandle, ColumnSelector, ComponentColumnSelector, Index, IndexRange,
IndexValue, QueryExpression, SparseFillStrategy, TimeColumnSelector, VersionPolicy,
ViewContentsSelector,
IndexValue, QueryExpression, SparseFillStrategy, TimeColumnSelector, ViewContentsSelector,
};
#[doc(no_inline)]
pub use self::external::re_log_types::{
Expand Down
4 changes: 2 additions & 2 deletions crates/store/re_entity_db/examples/memory_usage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ fn log_messages() {

fn encode_log_msg(log_msg: &LogMsg) -> Vec<u8> {
let mut bytes = vec![];
let encoding_options = re_log_encoding::EncodingOptions::COMPRESSED;
let encoding_options = re_log_encoding::EncodingOptions::MSGPACK_COMPRESSED;
re_log_encoding::encoder::encode_ref(
re_build_info::CrateVersion::LOCAL,
encoding_options,
Expand All @@ -78,7 +78,7 @@ fn log_messages() {
}

fn decode_log_msg(mut bytes: &[u8]) -> LogMsg {
let version_policy = re_log_encoding::decoder::VersionPolicy::Error;
let version_policy = re_log_encoding::VersionPolicy::Error;
let mut messages = re_log_encoding::decoder::Decoder::new(version_policy, &mut bytes)
.unwrap()
.collect::<Result<Vec<LogMsg>, _>>()
Expand Down
2 changes: 1 addition & 1 deletion crates/store/re_entity_db/src/store_bundle.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use itertools::Itertools as _;

use crate::EntityDb;
use re_log_encoding::decoder::VersionPolicy;
use re_log_encoding::VersionPolicy;
use re_log_types::{StoreId, StoreKind};

#[derive(thiserror::Error, Debug)]
Expand Down
3 changes: 2 additions & 1 deletion crates/store/re_grpc_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ async fn stream_recording_async(
.connect()
.await?;

StorageNodeClient::new(tonic_client)
// TODO(#8411): figure out the right size for this
StorageNodeClient::new(tonic_client).max_decoding_message_size(usize::MAX)
};

re_log::debug!("Fetching {recording_id}…");
Expand Down
Loading

0 comments on commit 0786fda

Please sign in to comment.