From fb1ce3049741fc21436d5062463ef87b0d44c34d Mon Sep 17 00:00:00 2001 From: Jordan Oroshiba Date: Wed, 24 Apr 2024 20:50:41 -0700 Subject: [PATCH 01/14] feat(conductor, relayer): brotli compress data blobs --- Cargo.lock | 38 +++++++++++++++++++ Cargo.toml | 1 + crates/astria-conductor/Cargo.toml | 1 + .../astria-conductor/src/celestia/convert.rs | 31 ++++++++++++++- .../tests/blackbox/helpers/mod.rs | 32 +++++++++++++++- crates/astria-sequencer-relayer/Cargo.toml | 1 + .../src/relayer/write/conversion.rs | 35 ++++++++++++++--- 7 files changed, 129 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 352d47b3c9..b085cd3b09 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -68,6 +68,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "alloc-no-stdlib" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3" + +[[package]] +name = "alloc-stdlib" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece" +dependencies = [ + "alloc-no-stdlib", +] + [[package]] name = "allocator-api2" version = "0.2.16" @@ -600,6 +615,7 @@ dependencies = [ "astria-telemetry", "async-trait", "base64 0.21.7", + "brotli", "bytes", "chrono", "ed25519-consensus", @@ -821,6 +837,7 @@ dependencies = [ "axum", "base64 0.21.7", "base64-serde", + "brotli", "dirs", "ed25519-consensus", "futures", @@ -1327,6 +1344,27 @@ dependencies = [ "syn_derive", ] +[[package]] +name = "brotli" +version = "5.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19483b140a7ac7174d34b5a581b406c64f84da5409d3e09cf4fff604f9270e67" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor", +] + +[[package]] +name = "brotli-decompressor" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6221fe77a248b9117d431ad93761222e1cf8ff282d9d1d5d9f53d6299a1cf76" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + [[package]] name = "bs58" version = "0.5.1" diff --git a/Cargo.toml b/Cargo.toml index 764dd0a545..7c4ffdc4e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,6 +55,7 @@ async-trait = "0.1.52" axum = "0.6.16" base64 = "0.21" base64-serde = "0.7.0" +brotli = "5.0.0" bytes = "1" celestia-tendermint = "0.32.1" clap = "4" diff --git a/crates/astria-conductor/Cargo.toml b/crates/astria-conductor/Cargo.toml index ae3d8c106a..c7c110703f 100644 --- a/crates/astria-conductor/Cargo.toml +++ b/crates/astria-conductor/Cargo.toml @@ -26,6 +26,7 @@ telemetry = { package = "astria-telemetry", path = "../astria-telemetry", featur ] } base64 = { workspace = true } +brotli = { workspace = true } bytes = { workspace = true } ed25519-consensus = { workspace = true } futures = { workspace = true } diff --git a/crates/astria-conductor/src/celestia/convert.rs b/crates/astria-conductor/src/celestia/convert.rs index 10b7ee7462..3052a76bf2 100644 --- a/crates/astria-conductor/src/celestia/convert.rs +++ b/crates/astria-conductor/src/celestia/convert.rs @@ -97,7 +97,15 @@ impl ConvertedBlobs { fn convert_header(blob: &Blob) -> Option { use astria_core::generated::sequencerblock::v1alpha1::CelestiaSequencerBlob as ProtoType; - let raw = ProtoType::decode(&*blob.data) + let data = brotli_decompress_blob_data(blob.data.to_vec()) + .inspect_err(|err| { + info!( + error = err as &StdError, + "failed decompressing blob data; dropping the blob", + ); + }) + .ok()?; + let raw = ProtoType::decode(&*data) .inspect_err(|err| { info!( error = err as &StdError, @@ -118,7 +126,15 @@ fn convert_header(blob: &Blob) -> Option { fn convert_rollup(blob: &Blob) -> Option { use astria_core::generated::sequencerblock::v1alpha1::CelestiaRollupBlob as ProtoType; - let raw_blob = ProtoType::decode(&*blob.data) + let data = brotli_decompress_blob_data(blob.data.to_vec()) + .inspect_err(|err| { + info!( + error = err as &StdError, + "failed decompressing rollup blob data; dropping the blob", + ); + }) + .ok()?; + let raw_blob = ProtoType::decode(&*data) .inspect_err(|err| { info!( error = err as &StdError, @@ -136,3 +152,14 @@ fn convert_rollup(blob: &Blob) -> Option { }) .ok() } + +fn brotli_decompress_blob_data(data: Vec) -> Result, std::io::Error> { + use std::io::Write as _; + let mut output = Vec::new(); + { + let mut decompressor = brotli::DecompressorWriter::new(&mut output, data.len()); + decompressor.write_all(data.as_slice())?; + } + + Ok(output) +} diff --git a/crates/astria-conductor/tests/blackbox/helpers/mod.rs b/crates/astria-conductor/tests/blackbox/helpers/mod.rs index 3173bad497..2561d3fac7 100644 --- a/crates/astria-conductor/tests/blackbox/helpers/mod.rs +++ b/crates/astria-conductor/tests/blackbox/helpers/mod.rs @@ -16,6 +16,7 @@ use astria_core::{ }, primitive::v1::RollupId, }; +use brotli::enc::BrotliEncoderParams; use bytes::Bytes; use celestia_client::celestia_types::{ nmt::Namespace, @@ -32,6 +33,10 @@ use sequencer_client::{ #[macro_use] mod macros; mod mock_grpc; +use astria_eyre::{ + eyre, + eyre::Context, +}; pub use mock_grpc::MockGrpc; use serde_json::json; use tokio::task::JoinHandle; @@ -449,17 +454,21 @@ pub struct Blobs { pub fn make_blobs(height: u32) -> Blobs { let (head, tail) = make_sequencer_block(height).into_celestia_blobs(); + let raw_header = ::prost::Message::encode_to_vec(&head.into_raw()); + let head_compressed = brotli_compressed_bytes(raw_header).unwrap(); let header = ::celestia_client::celestia_types::Blob::new( ::celestia_client::celestia_namespace_v0_from_bytes(crate::SEQUENCER_CHAIN_ID.as_bytes()), - ::prost::Message::encode_to_vec(&head.into_raw()), + head_compressed, ) .unwrap(); let mut rollup = Vec::new(); for elem in tail { + let raw_rollup = ::prost::Message::encode_to_vec(&elem.into_raw()); + let rollup_compressed = brotli_compressed_bytes(raw_rollup).unwrap(); let blob = ::celestia_client::celestia_types::Blob::new( ::celestia_client::celestia_namespace_v0_from_rollup_id(crate::ROLLUP_ID), - ::prost::Message::encode_to_vec(&elem.into_raw()), + rollup_compressed, ) .unwrap(); rollup.push(blob); @@ -495,6 +504,25 @@ fn validator() -> tendermint::validator::Info { } } +fn brotli_compressed_bytes(data: Vec) -> eyre::Result> { + use std::io::Write as _; + let compression_params = BrotliEncoderParams { + quality: 5, + size_hint: data.len(), + ..Default::default() + }; + let mut output = Vec::new(); + { + let mut compressor = + brotli::CompressorWriter::with_params(&mut output, 4096, &compression_params); + compressor + .write_all(&data) + .wrap_err("failed compressing data")?; + } + + Ok(output) +} + #[must_use] pub fn make_commit(height: u32) -> tendermint::block::Commit { let signing_key = signing_key(); diff --git a/crates/astria-sequencer-relayer/Cargo.toml b/crates/astria-sequencer-relayer/Cargo.toml index 0178197ffe..ad1e3ed9c8 100644 --- a/crates/astria-sequencer-relayer/Cargo.toml +++ b/crates/astria-sequencer-relayer/Cargo.toml @@ -22,6 +22,7 @@ zeroize = { version = "1.6.0", features = ["zeroize_derive"] } axum = { workspace = true } base64 = { workspace = true } base64-serde = { workspace = true } +brotli = { workspace = true } ed25519-consensus = { workspace = true } futures = { workspace = true } hex = { workspace = true, features = ["serde"] } diff --git a/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs b/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs index f1bc80d1e0..61147c3972 100644 --- a/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs +++ b/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs @@ -3,6 +3,7 @@ use astria_eyre::eyre::{ self, WrapErr as _, }; +use brotli::enc::BrotliEncoderParams; use celestia_client::celestia_types::{ nmt::Namespace, Blob, @@ -62,12 +63,12 @@ pub(super) fn convert(block: SequencerBlock) -> eyre::Result { let sequencer_namespace = celestia_client::celestia_namespace_v0_from_str( sequencer_blob.header().chain_id().as_str(), ); + let sequencer_blob_raw = sequencer_blob.into_raw(); + let compressed_sequencer_blob_raw = brotli_compressed_bytes(sequencer_blob_raw.encode_to_vec()) + .wrap_err("failed compressing sequencer blob")?; - let header_blob = Blob::new( - sequencer_namespace, - sequencer_blob.into_raw().encode_to_vec(), - ) - .wrap_err("failed creating head Celestia blob")?; + let header_blob = Blob::new(sequencer_namespace, compressed_sequencer_blob_raw) + .wrap_err("failed creating head Celestia blob")?; blobs.push(header_blob); let mut rollups = Vec::new(); for blob in rollup_blobs { @@ -78,7 +79,10 @@ pub(super) fn convert(block: SequencerBlock) -> eyre::Result { celestia_namespace: namespace, sequencer_rollup_id: blob.rollup_id(), }; - let blob = Blob::new(namespace, blob.into_raw().encode_to_vec()) + let raw_blob = blob.into_raw(); + let compressed_blob = brotli_compressed_bytes(raw_blob.encode_to_vec()) + .wrap_err_with(|| format!("failed compressing rollup `{rollup_id}`"))?; + let blob = Blob::new(namespace, compressed_blob) .wrap_err_with(|| format!("failed creating blob for rollup `{rollup_id}`"))?; blobs.push(blob); rollups.push(info); @@ -92,3 +96,22 @@ pub(super) fn convert(block: SequencerBlock) -> eyre::Result { }, }) } + +fn brotli_compressed_bytes(data: Vec) -> eyre::Result> { + use std::io::Write as _; + let compression_params = BrotliEncoderParams { + quality: 5, + size_hint: data.len(), + ..Default::default() + }; + let mut output = Vec::new(); + { + let mut compressor = + brotli::CompressorWriter::with_params(&mut output, 4096, &compression_params); + compressor + .write_all(&data) + .wrap_err("failed compressing data")?; + } + + Ok(output) +} From 91a52d045ff5dd1e44e051e46530f2c6c6314cb2 Mon Sep 17 00:00:00 2001 From: Jordan Oroshiba Date: Wed, 24 Apr 2024 21:05:44 -0700 Subject: [PATCH 02/14] clippy --- crates/astria-conductor/src/celestia/convert.rs | 8 ++++---- crates/astria-conductor/tests/blackbox/helpers/mod.rs | 8 ++++---- .../src/relayer/write/conversion.rs | 8 ++++---- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/crates/astria-conductor/src/celestia/convert.rs b/crates/astria-conductor/src/celestia/convert.rs index 3052a76bf2..2e08eca7f5 100644 --- a/crates/astria-conductor/src/celestia/convert.rs +++ b/crates/astria-conductor/src/celestia/convert.rs @@ -97,7 +97,7 @@ impl ConvertedBlobs { fn convert_header(blob: &Blob) -> Option { use astria_core::generated::sequencerblock::v1alpha1::CelestiaSequencerBlob as ProtoType; - let data = brotli_decompress_blob_data(blob.data.to_vec()) + let data = brotli_decompress_blob_data(&blob.data) .inspect_err(|err| { info!( error = err as &StdError, @@ -126,7 +126,7 @@ fn convert_header(blob: &Blob) -> Option { fn convert_rollup(blob: &Blob) -> Option { use astria_core::generated::sequencerblock::v1alpha1::CelestiaRollupBlob as ProtoType; - let data = brotli_decompress_blob_data(blob.data.to_vec()) + let data = brotli_decompress_blob_data(&blob.data) .inspect_err(|err| { info!( error = err as &StdError, @@ -153,12 +153,12 @@ fn convert_rollup(blob: &Blob) -> Option { .ok() } -fn brotli_decompress_blob_data(data: Vec) -> Result, std::io::Error> { +fn brotli_decompress_blob_data(data: &[u8]) -> Result, std::io::Error> { use std::io::Write as _; let mut output = Vec::new(); { let mut decompressor = brotli::DecompressorWriter::new(&mut output, data.len()); - decompressor.write_all(data.as_slice())?; + decompressor.write_all(data)?; } Ok(output) diff --git a/crates/astria-conductor/tests/blackbox/helpers/mod.rs b/crates/astria-conductor/tests/blackbox/helpers/mod.rs index 2561d3fac7..12e61644a3 100644 --- a/crates/astria-conductor/tests/blackbox/helpers/mod.rs +++ b/crates/astria-conductor/tests/blackbox/helpers/mod.rs @@ -455,7 +455,7 @@ pub fn make_blobs(height: u32) -> Blobs { let (head, tail) = make_sequencer_block(height).into_celestia_blobs(); let raw_header = ::prost::Message::encode_to_vec(&head.into_raw()); - let head_compressed = brotli_compressed_bytes(raw_header).unwrap(); + let head_compressed = brotli_compressed_bytes(&raw_header).unwrap(); let header = ::celestia_client::celestia_types::Blob::new( ::celestia_client::celestia_namespace_v0_from_bytes(crate::SEQUENCER_CHAIN_ID.as_bytes()), head_compressed, @@ -465,7 +465,7 @@ pub fn make_blobs(height: u32) -> Blobs { let mut rollup = Vec::new(); for elem in tail { let raw_rollup = ::prost::Message::encode_to_vec(&elem.into_raw()); - let rollup_compressed = brotli_compressed_bytes(raw_rollup).unwrap(); + let rollup_compressed = brotli_compressed_bytes(&raw_rollup).unwrap(); let blob = ::celestia_client::celestia_types::Blob::new( ::celestia_client::celestia_namespace_v0_from_rollup_id(crate::ROLLUP_ID), rollup_compressed, @@ -504,7 +504,7 @@ fn validator() -> tendermint::validator::Info { } } -fn brotli_compressed_bytes(data: Vec) -> eyre::Result> { +fn brotli_compressed_bytes(data: &[u8]) -> eyre::Result> { use std::io::Write as _; let compression_params = BrotliEncoderParams { quality: 5, @@ -516,7 +516,7 @@ fn brotli_compressed_bytes(data: Vec) -> eyre::Result> { let mut compressor = brotli::CompressorWriter::with_params(&mut output, 4096, &compression_params); compressor - .write_all(&data) + .write_all(data) .wrap_err("failed compressing data")?; } diff --git a/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs b/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs index 61147c3972..3498fbb22e 100644 --- a/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs +++ b/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs @@ -64,7 +64,7 @@ pub(super) fn convert(block: SequencerBlock) -> eyre::Result { sequencer_blob.header().chain_id().as_str(), ); let sequencer_blob_raw = sequencer_blob.into_raw(); - let compressed_sequencer_blob_raw = brotli_compressed_bytes(sequencer_blob_raw.encode_to_vec()) + let compressed_sequencer_blob_raw = brotli_compressed_bytes(&sequencer_blob_raw.encode_to_vec()) .wrap_err("failed compressing sequencer blob")?; let header_blob = Blob::new(sequencer_namespace, compressed_sequencer_blob_raw) @@ -80,7 +80,7 @@ pub(super) fn convert(block: SequencerBlock) -> eyre::Result { sequencer_rollup_id: blob.rollup_id(), }; let raw_blob = blob.into_raw(); - let compressed_blob = brotli_compressed_bytes(raw_blob.encode_to_vec()) + let compressed_blob = brotli_compressed_bytes(&raw_blob.encode_to_vec()) .wrap_err_with(|| format!("failed compressing rollup `{rollup_id}`"))?; let blob = Blob::new(namespace, compressed_blob) .wrap_err_with(|| format!("failed creating blob for rollup `{rollup_id}`"))?; @@ -97,7 +97,7 @@ pub(super) fn convert(block: SequencerBlock) -> eyre::Result { }) } -fn brotli_compressed_bytes(data: Vec) -> eyre::Result> { +fn brotli_compressed_bytes(data: &[u8]) -> eyre::Result> { use std::io::Write as _; let compression_params = BrotliEncoderParams { quality: 5, @@ -109,7 +109,7 @@ fn brotli_compressed_bytes(data: Vec) -> eyre::Result> { let mut compressor = brotli::CompressorWriter::with_params(&mut output, 4096, &compression_params); compressor - .write_all(&data) + .write_all(data) .wrap_err("failed compressing data")?; } From 64453495d23936d4bedb12379dbacbff27c3a67a Mon Sep 17 00:00:00 2001 From: Jordan Oroshiba Date: Wed, 24 Apr 2024 21:25:38 -0700 Subject: [PATCH 03/14] add metrics --- .../src/metrics_init.rs | 19 ++++++++++++++ .../src/relayer/write/conversion.rs | 26 ++++++++++++++++--- 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/crates/astria-sequencer-relayer/src/metrics_init.rs b/crates/astria-sequencer-relayer/src/metrics_init.rs index 2b47666c6a..bc0dc23c97 100644 --- a/crates/astria-sequencer-relayer/src/metrics_init.rs +++ b/crates/astria-sequencer-relayer/src/metrics_init.rs @@ -61,6 +61,18 @@ pub fn register() { Unit::Seconds, "The time it takes to submit a blob to Celestia" ); + + describe_gauge!( + TOTAL_ASTRIA_BLOB_DATA_SIZE_FOR_BLOCK, + Unit::Bytes, + "The total size of the compressed data blob for a Astria blob" + ); + + describe_gauge!( + COMPRESSION_RATIO_FOR_ASTRIA_BLOCK, + Unit::Count, + "The ratio of the uncompressed to compressed data size for Astria data in a block" + ); } // We configure buckets for manually, in order to ensure Prometheus metrics are structured as a @@ -96,3 +108,10 @@ pub const SEQUENCER_HEIGHT_FETCH_FAILURE_COUNT: &str = concat!( env!("CARGO_CRATE_NAME"), "_sequencer_height_fetch_failure_count", ); + +pub const TOTAL_ASTRIA_BLOB_DATA_SIZE_FOR_BLOCK: &str = + concat!(env!("CARGO_CRATE_NAME"), "_total_astria_blob_data_size"); +pub const COMPRESSION_RATIO_FOR_ASTRIA_BLOCK: &str = concat!( + env!("CARGO_CRATE_NAME"), + "_compression_ratio_for_astria_block" +); diff --git a/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs b/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs index 3498fbb22e..c7c9af2515 100644 --- a/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs +++ b/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs @@ -12,6 +12,8 @@ use prost::Message as _; use sequencer_client::SequencerBlock; use tendermint::block::Height as SequencerHeight; +use crate::metrics_init; + // allow: the signature is dictated by the `serde(serialize_with = ...)` attribute. #[allow(clippy::trivially_copy_pass_by_ref)] fn serialize_height(height: &SequencerHeight, serializer: S) -> Result @@ -55,6 +57,8 @@ pub(super) struct Converted { pub(super) fn convert(block: SequencerBlock) -> eyre::Result { let sequencer_height = block.height(); + let mut total_data_uncompressed_size = 0; + let mut total_data_compressed_size = 0; let (sequencer_blob, rollup_blobs) = block.into_celestia_blobs(); // Allocate extra space: one blob for the sequencer blob "header", @@ -63,9 +67,11 @@ pub(super) fn convert(block: SequencerBlock) -> eyre::Result { let sequencer_namespace = celestia_client::celestia_namespace_v0_from_str( sequencer_blob.header().chain_id().as_str(), ); - let sequencer_blob_raw = sequencer_blob.into_raw(); - let compressed_sequencer_blob_raw = brotli_compressed_bytes(&sequencer_blob_raw.encode_to_vec()) + let sequencer_blob_raw = sequencer_blob.into_raw().encode_to_vec(); + total_data_uncompressed_size += sequencer_blob_raw.len(); + let compressed_sequencer_blob_raw = brotli_compressed_bytes(&sequencer_blob_raw) .wrap_err("failed compressing sequencer blob")?; + total_data_compressed_size += compressed_sequencer_blob_raw.len(); let header_blob = Blob::new(sequencer_namespace, compressed_sequencer_blob_raw) .wrap_err("failed creating head Celestia blob")?; @@ -79,14 +85,26 @@ pub(super) fn convert(block: SequencerBlock) -> eyre::Result { celestia_namespace: namespace, sequencer_rollup_id: blob.rollup_id(), }; - let raw_blob = blob.into_raw(); - let compressed_blob = brotli_compressed_bytes(&raw_blob.encode_to_vec()) + let raw_blob = blob.into_raw().encode_to_vec(); + total_data_uncompressed_size += raw_blob.len(); + let compressed_blob = brotli_compressed_bytes(&raw_blob) .wrap_err_with(|| format!("failed compressing rollup `{rollup_id}`"))?; + total_data_compressed_size += compressed_blob.len(); let blob = Blob::new(namespace, compressed_blob) .wrap_err_with(|| format!("failed creating blob for rollup `{rollup_id}`"))?; blobs.push(blob); rollups.push(info); } + + // gauges require f64, it's okay if the metrics get messed up by overflow or precision loss + #[allow(clippy::cast_precision_loss)] + metrics::gauge!(metrics_init::TOTAL_ASTRIA_BLOB_DATA_SIZE_FOR_BLOCK) + .set(total_data_compressed_size as f64); + #[allow(clippy::cast_precision_loss)] + let compression_ratio: f64 = + total_data_compressed_size as f64 / total_data_uncompressed_size as f64; + metrics::gauge!(metrics_init::COMPRESSION_RATIO_FOR_ASTRIA_BLOCK).set(compression_ratio); + Ok(Converted { blobs, info: ConversionInfo { From 135d9abc8cbbc2a91c588a47ea088eb5038856b8 Mon Sep 17 00:00:00 2001 From: Jordan Oroshiba Date: Wed, 24 Apr 2024 21:32:21 -0700 Subject: [PATCH 04/14] add log --- .../src/relayer/write/conversion.rs | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs b/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs index c7c9af2515..33efc92ade 100644 --- a/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs +++ b/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs @@ -11,6 +11,7 @@ use celestia_client::celestia_types::{ use prost::Message as _; use sequencer_client::SequencerBlock; use tendermint::block::Height as SequencerHeight; +use tracing::info; use crate::metrics_init; @@ -95,15 +96,23 @@ pub(super) fn convert(block: SequencerBlock) -> eyre::Result { blobs.push(blob); rollups.push(info); } - + + let compression_ratio = + total_data_uncompressed_size / total_data_compressed_size; + info!( + sequencer_height = sequencer_height, + total_data_compressed_size = total_data_compressed_size, + compression_ratio = compression_ratio, + "converted blocks into blobs with compressed data", + ); // gauges require f64, it's okay if the metrics get messed up by overflow or precision loss #[allow(clippy::cast_precision_loss)] metrics::gauge!(metrics_init::TOTAL_ASTRIA_BLOB_DATA_SIZE_FOR_BLOCK) .set(total_data_compressed_size as f64); #[allow(clippy::cast_precision_loss)] - let compression_ratio: f64 = - total_data_compressed_size as f64 / total_data_uncompressed_size as f64; - metrics::gauge!(metrics_init::COMPRESSION_RATIO_FOR_ASTRIA_BLOCK).set(compression_ratio); + metrics::gauge!(metrics_init::COMPRESSION_RATIO_FOR_ASTRIA_BLOCK).set(compression_ratio as f64); + + Ok(Converted { blobs, From 449cc19f84666481bf68929f5c919b7bcba4cc3e Mon Sep 17 00:00:00 2001 From: Jordan Oroshiba Date: Wed, 24 Apr 2024 21:32:47 -0700 Subject: [PATCH 05/14] lint --- .../src/relayer/write/conversion.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs b/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs index 33efc92ade..a7cb2f1307 100644 --- a/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs +++ b/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs @@ -97,8 +97,7 @@ pub(super) fn convert(block: SequencerBlock) -> eyre::Result { rollups.push(info); } - let compression_ratio = - total_data_uncompressed_size / total_data_compressed_size; + let compression_ratio = total_data_uncompressed_size / total_data_compressed_size; info!( sequencer_height = sequencer_height, total_data_compressed_size = total_data_compressed_size, @@ -111,9 +110,7 @@ pub(super) fn convert(block: SequencerBlock) -> eyre::Result { .set(total_data_compressed_size as f64); #[allow(clippy::cast_precision_loss)] metrics::gauge!(metrics_init::COMPRESSION_RATIO_FOR_ASTRIA_BLOCK).set(compression_ratio as f64); - - - + Ok(Converted { blobs, info: ConversionInfo { From e8d287e2aae0195f5be84112cd155d8d32cd4361 Mon Sep 17 00:00:00 2001 From: Jordan Oroshiba Date: Wed, 24 Apr 2024 21:38:37 -0700 Subject: [PATCH 06/14] fix --- crates/astria-conductor/tests/blackbox/helpers/mod.rs | 10 +++------- .../src/relayer/write/conversion.rs | 2 +- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/crates/astria-conductor/tests/blackbox/helpers/mod.rs b/crates/astria-conductor/tests/blackbox/helpers/mod.rs index 12e61644a3..5c63f2fc2e 100644 --- a/crates/astria-conductor/tests/blackbox/helpers/mod.rs +++ b/crates/astria-conductor/tests/blackbox/helpers/mod.rs @@ -33,10 +33,7 @@ use sequencer_client::{ #[macro_use] mod macros; mod mock_grpc; -use astria_eyre::{ - eyre, - eyre::Context, -}; +use astria_eyre; pub use mock_grpc::MockGrpc; use serde_json::json; use tokio::task::JoinHandle; @@ -504,7 +501,7 @@ fn validator() -> tendermint::validator::Info { } } -fn brotli_compressed_bytes(data: &[u8]) -> eyre::Result> { +fn brotli_compressed_bytes(data: &[u8]) -> Result, std::io::Error> { use std::io::Write as _; let compression_params = BrotliEncoderParams { quality: 5, @@ -516,8 +513,7 @@ fn brotli_compressed_bytes(data: &[u8]) -> eyre::Result> { let mut compressor = brotli::CompressorWriter::with_params(&mut output, 4096, &compression_params); compressor - .write_all(data) - .wrap_err("failed compressing data")?; + .write_all(data)?; } Ok(output) diff --git a/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs b/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs index a7cb2f1307..8ee78c26a1 100644 --- a/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs +++ b/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs @@ -99,7 +99,7 @@ pub(super) fn convert(block: SequencerBlock) -> eyre::Result { let compression_ratio = total_data_uncompressed_size / total_data_compressed_size; info!( - sequencer_height = sequencer_height, + sequencer_height = %sequencer_height, total_data_compressed_size = total_data_compressed_size, compression_ratio = compression_ratio, "converted blocks into blobs with compressed data", From ca510f1e484a885f5f9717f5be996fa55fd480a4 Mon Sep 17 00:00:00 2001 From: Jordan Oroshiba Date: Wed, 24 Apr 2024 21:39:59 -0700 Subject: [PATCH 07/14] lint --- crates/astria-conductor/tests/blackbox/helpers/mod.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/astria-conductor/tests/blackbox/helpers/mod.rs b/crates/astria-conductor/tests/blackbox/helpers/mod.rs index 5c63f2fc2e..4257236aba 100644 --- a/crates/astria-conductor/tests/blackbox/helpers/mod.rs +++ b/crates/astria-conductor/tests/blackbox/helpers/mod.rs @@ -512,8 +512,7 @@ fn brotli_compressed_bytes(data: &[u8]) -> Result, std::io::Error> { { let mut compressor = brotli::CompressorWriter::with_params(&mut output, 4096, &compression_params); - compressor - .write_all(data)?; + compressor.write_all(data)?; } Ok(output) From c21f93f0b0c74eea26df7d12deaf934c20b36c3a Mon Sep 17 00:00:00 2001 From: Jordan Oroshiba Date: Thu, 25 Apr 2024 12:34:00 -0700 Subject: [PATCH 08/14] move into core, feature gated for reuse --- .github/workflows/release.yml | 2 +- Cargo.lock | 3 +- Cargo.toml | 1 - crates/astria-conductor/Cargo.toml | 5 +- .../astria-conductor/src/celestia/convert.rs | 16 ++---- .../tests/blackbox/helpers/mod.rs | 23 ++------- crates/astria-core/Cargo.toml | 2 + crates/astria-core/src/brotli.rs | 49 +++++++++++++++++++ crates/astria-core/src/lib.rs | 2 + crates/astria-sequencer-relayer/Cargo.toml | 3 +- .../src/relayer/write/conversion.rs | 31 +++--------- 11 files changed, 71 insertions(+), 66 deletions(-) create mode 100644 crates/astria-core/src/brotli.rs diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 4be563804c..608c241fe6 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -14,7 +14,7 @@ jobs: runs-on: ubuntu-latest environment: BUF needs: run_checker - if: github.event_name != 'merge_group' && needs.run_checker.outputs.run_release_proto == 'true' && github.repository_owner == 'astriaorg' + if: github.event_name != 'merge_group' && (github.event_name != 'pull_request' || github.event.pull_request.head.repo.full_name == 'astriaorg/astria') && needs.run_checker.outputs.run_release_proto == 'true' steps: - uses: actions/checkout@v4 - uses: bufbuild/buf-setup-action@v1 diff --git a/Cargo.lock b/Cargo.lock index b085cd3b09..6e7e048a01 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -615,7 +615,6 @@ dependencies = [ "astria-telemetry", "async-trait", "base64 0.21.7", - "brotli", "bytes", "chrono", "ed25519-consensus", @@ -669,6 +668,7 @@ dependencies = [ "astria-merkle", "base64 0.21.7", "base64-serde", + "brotli", "bytes", "celestia-tendermint", "ed25519-consensus", @@ -837,7 +837,6 @@ dependencies = [ "axum", "base64 0.21.7", "base64-serde", - "brotli", "dirs", "ed25519-consensus", "futures", diff --git a/Cargo.toml b/Cargo.toml index 7c4ffdc4e2..764dd0a545 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,7 +55,6 @@ async-trait = "0.1.52" axum = "0.6.16" base64 = "0.21" base64-serde = "0.7.0" -brotli = "5.0.0" bytes = "1" celestia-tendermint = "0.32.1" clap = "4" diff --git a/crates/astria-conductor/Cargo.toml b/crates/astria-conductor/Cargo.toml index c7c110703f..5667725d4c 100644 --- a/crates/astria-conductor/Cargo.toml +++ b/crates/astria-conductor/Cargo.toml @@ -13,7 +13,7 @@ name = "astria-conductor" [dependencies] astria-build-info = { path = "../astria-build-info", features = ["runtime"] } -astria-core = { path = "../astria-core", features = ["client", "serde"] } +astria-core = { path = "../astria-core", features = ["client", "serde", "brotli"] } astria-eyre = { path = "../astria-eyre" } celestia-client = { package = "astria-celestia-client", path = "../astria-celestia-client" } config = { package = "astria-config", path = "../astria-config" } @@ -26,7 +26,6 @@ telemetry = { package = "astria-telemetry", path = "../astria-telemetry", featur ] } base64 = { workspace = true } -brotli = { workspace = true } bytes = { workspace = true } ed25519-consensus = { workspace = true } futures = { workspace = true } @@ -57,7 +56,7 @@ tracing-futures = { version = "0.2.5", features = ["futures-03"] } moka = { version = "0.12.5", features = ["future"] } [dev-dependencies] -astria-core = { path = "../astria-core", features = ["server", "test-utils"] } +astria-core = { path = "../astria-core", features = ["server", "test-utils", "brotli"] } astria-grpc-mock = { path = "../astria-grpc-mock" } config = { package = "astria-config", path = "../astria-config", features = [ "tests", diff --git a/crates/astria-conductor/src/celestia/convert.rs b/crates/astria-conductor/src/celestia/convert.rs index 2e08eca7f5..76e1e03984 100644 --- a/crates/astria-conductor/src/celestia/convert.rs +++ b/crates/astria-conductor/src/celestia/convert.rs @@ -1,3 +1,4 @@ +use astria_core::brotli::decompress_bytes; use celestia_client::{ celestia_types::{ nmt::Namespace, @@ -97,7 +98,7 @@ impl ConvertedBlobs { fn convert_header(blob: &Blob) -> Option { use astria_core::generated::sequencerblock::v1alpha1::CelestiaSequencerBlob as ProtoType; - let data = brotli_decompress_blob_data(&blob.data) + let data = decompress_bytes(&blob.data) .inspect_err(|err| { info!( error = err as &StdError, @@ -126,7 +127,7 @@ fn convert_header(blob: &Blob) -> Option { fn convert_rollup(blob: &Blob) -> Option { use astria_core::generated::sequencerblock::v1alpha1::CelestiaRollupBlob as ProtoType; - let data = brotli_decompress_blob_data(&blob.data) + let data = decompress_bytes(&blob.data) .inspect_err(|err| { info!( error = err as &StdError, @@ -152,14 +153,3 @@ fn convert_rollup(blob: &Blob) -> Option { }) .ok() } - -fn brotli_decompress_blob_data(data: &[u8]) -> Result, std::io::Error> { - use std::io::Write as _; - let mut output = Vec::new(); - { - let mut decompressor = brotli::DecompressorWriter::new(&mut output, data.len()); - decompressor.write_all(data)?; - } - - Ok(output) -} diff --git a/crates/astria-conductor/tests/blackbox/helpers/mod.rs b/crates/astria-conductor/tests/blackbox/helpers/mod.rs index 4257236aba..2347b359c6 100644 --- a/crates/astria-conductor/tests/blackbox/helpers/mod.rs +++ b/crates/astria-conductor/tests/blackbox/helpers/mod.rs @@ -15,8 +15,8 @@ use astria_core::{ sequencerblock::v1alpha1::FilteredSequencerBlock, }, primitive::v1::RollupId, + brotli::compress_bytes, }; -use brotli::enc::BrotliEncoderParams; use bytes::Bytes; use celestia_client::celestia_types::{ nmt::Namespace, @@ -452,7 +452,7 @@ pub fn make_blobs(height: u32) -> Blobs { let (head, tail) = make_sequencer_block(height).into_celestia_blobs(); let raw_header = ::prost::Message::encode_to_vec(&head.into_raw()); - let head_compressed = brotli_compressed_bytes(&raw_header).unwrap(); + let head_compressed = compress_bytes(&raw_header).unwrap(); let header = ::celestia_client::celestia_types::Blob::new( ::celestia_client::celestia_namespace_v0_from_bytes(crate::SEQUENCER_CHAIN_ID.as_bytes()), head_compressed, @@ -462,7 +462,7 @@ pub fn make_blobs(height: u32) -> Blobs { let mut rollup = Vec::new(); for elem in tail { let raw_rollup = ::prost::Message::encode_to_vec(&elem.into_raw()); - let rollup_compressed = brotli_compressed_bytes(&raw_rollup).unwrap(); + let rollup_compressed = compress_bytes(&raw_rollup).unwrap(); let blob = ::celestia_client::celestia_types::Blob::new( ::celestia_client::celestia_namespace_v0_from_rollup_id(crate::ROLLUP_ID), rollup_compressed, @@ -501,23 +501,6 @@ fn validator() -> tendermint::validator::Info { } } -fn brotli_compressed_bytes(data: &[u8]) -> Result, std::io::Error> { - use std::io::Write as _; - let compression_params = BrotliEncoderParams { - quality: 5, - size_hint: data.len(), - ..Default::default() - }; - let mut output = Vec::new(); - { - let mut compressor = - brotli::CompressorWriter::with_params(&mut output, 4096, &compression_params); - compressor.write_all(data)?; - } - - Ok(output) -} - #[must_use] pub fn make_commit(height: u32) -> tendermint::block::Commit { let signing_key = signing_key(); diff --git a/crates/astria-core/Cargo.toml b/crates/astria-core/Cargo.toml index f3be1f5daf..05b875d83a 100644 --- a/crates/astria-core/Cargo.toml +++ b/crates/astria-core/Cargo.toml @@ -16,6 +16,7 @@ keywords = ["astria", "grpc", "rpc", "blockchain", "execution", "protobuf"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +brotli = { version = "5.0.0", optional = true } pbjson = { version = "0.6.0", optional = true } merkle = { package = "astria-merkle", path = "../astria-merkle" } @@ -46,6 +47,7 @@ serde = ["dep:serde", "dep:pbjson", "dep:base64-serde"] server = ["dep:tonic"] test-utils = ["dep:rand"] base64-serde = ["dep:base64-serde"] +brotli = ["dep:brotli"] [dev-dependencies] astria-core = { path = ".", features = ["serde"] } diff --git a/crates/astria-core/src/brotli.rs b/crates/astria-core/src/brotli.rs new file mode 100644 index 0000000000..bde243e94a --- /dev/null +++ b/crates/astria-core/src/brotli.rs @@ -0,0 +1,49 @@ +use std::io::Write as _; + +use brotli::{ + enc::BrotliEncoderParams, + CompressorWriter, + DecompressorWriter, +}; + +const BROTLI_BUFFER_SIZE: usize = 4096; + +/// Decompresses the given bytes using the Brotli algorithm. +/// +/// Returns the decompressed bytes. +/// +/// # Errors +/// +/// Returns an error if the decompression fails. +pub fn decompress_bytes(data: &[u8]) -> Result, std::io::Error> { + let mut output = Vec::with_capacity(BROTLI_BUFFER_SIZE); + { + let mut decompressor = DecompressorWriter::new(&mut output, BROTLI_BUFFER_SIZE); + decompressor.write_all(data)?; + } + + Ok(output) +} + +/// Compresses the given bytes using the Brotli algorithm at setting 5. +/// +/// Returns the compressed bytes. +/// +/// # Errors +/// +/// Returns an error if the compression fails. +pub fn compress_bytes(data: &[u8]) -> Result, std::io::Error> { + let compression_params = BrotliEncoderParams { + quality: 5, + size_hint: data.len(), + ..Default::default() + }; + let mut output = Vec::with_capacity(BROTLI_BUFFER_SIZE); + { + let mut compressor = + CompressorWriter::with_params(&mut output, BROTLI_BUFFER_SIZE, &compression_params); + compressor.write_all(data)?; + } + + Ok(output) +} diff --git a/crates/astria-core/src/lib.rs b/crates/astria-core/src/lib.rs index cc1f875ef9..7dd37e60a8 100644 --- a/crates/astria-core/src/lib.rs +++ b/crates/astria-core/src/lib.rs @@ -13,6 +13,8 @@ pub mod sequencerblock; #[cfg(feature = "serde")] pub(crate) mod serde; +#[cfg(feature = "brotli")] +pub mod brotli; /// A trait to convert from raw decoded protobuf types to idiomatic astria types. /// diff --git a/crates/astria-sequencer-relayer/Cargo.toml b/crates/astria-sequencer-relayer/Cargo.toml index ad1e3ed9c8..0d05a0cd7c 100644 --- a/crates/astria-sequencer-relayer/Cargo.toml +++ b/crates/astria-sequencer-relayer/Cargo.toml @@ -22,7 +22,6 @@ zeroize = { version = "1.6.0", features = ["zeroize_derive"] } axum = { workspace = true } base64 = { workspace = true } base64-serde = { workspace = true } -brotli = { workspace = true } ed25519-consensus = { workspace = true } futures = { workspace = true } hex = { workspace = true, features = ["serde"] } @@ -44,7 +43,7 @@ tokio-util = { workspace = true } tonic = { workspace = true } astria-build-info = { path = "../astria-build-info", features = ["runtime"] } -astria-core = { path = "../astria-core", features = ["client", "serde"] } +astria-core = { path = "../astria-core", features = ["client", "serde", "brotli"] } astria-eyre = { path = "../astria-eyre" } celestia-client = { package = "astria-celestia-client", path = "../astria-celestia-client" } config = { package = "astria-config", path = "../astria-config" } diff --git a/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs b/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs index 8ee78c26a1..035db68085 100644 --- a/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs +++ b/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs @@ -1,9 +1,11 @@ -use astria_core::primitive::v1::RollupId; +use astria_core::{ + primitive::v1::RollupId, + brotli::compress_bytes, +}; use astria_eyre::eyre::{ self, WrapErr as _, }; -use brotli::enc::BrotliEncoderParams; use celestia_client::celestia_types::{ nmt::Namespace, Blob, @@ -70,8 +72,8 @@ pub(super) fn convert(block: SequencerBlock) -> eyre::Result { ); let sequencer_blob_raw = sequencer_blob.into_raw().encode_to_vec(); total_data_uncompressed_size += sequencer_blob_raw.len(); - let compressed_sequencer_blob_raw = brotli_compressed_bytes(&sequencer_blob_raw) - .wrap_err("failed compressing sequencer blob")?; + let compressed_sequencer_blob_raw = + compress_bytes(&sequencer_blob_raw).wrap_err("failed compressing sequencer blob")?; total_data_compressed_size += compressed_sequencer_blob_raw.len(); let header_blob = Blob::new(sequencer_namespace, compressed_sequencer_blob_raw) @@ -88,7 +90,7 @@ pub(super) fn convert(block: SequencerBlock) -> eyre::Result { }; let raw_blob = blob.into_raw().encode_to_vec(); total_data_uncompressed_size += raw_blob.len(); - let compressed_blob = brotli_compressed_bytes(&raw_blob) + let compressed_blob = compress_bytes(&raw_blob) .wrap_err_with(|| format!("failed compressing rollup `{rollup_id}`"))?; total_data_compressed_size += compressed_blob.len(); let blob = Blob::new(namespace, compressed_blob) @@ -120,22 +122,3 @@ pub(super) fn convert(block: SequencerBlock) -> eyre::Result { }, }) } - -fn brotli_compressed_bytes(data: &[u8]) -> eyre::Result> { - use std::io::Write as _; - let compression_params = BrotliEncoderParams { - quality: 5, - size_hint: data.len(), - ..Default::default() - }; - let mut output = Vec::new(); - { - let mut compressor = - brotli::CompressorWriter::with_params(&mut output, 4096, &compression_params); - compressor - .write_all(data) - .wrap_err("failed compressing data")?; - } - - Ok(output) -} From 5387020ba0542cc3ad9c25ab86c8731304c5fbcc Mon Sep 17 00:00:00 2001 From: Jordan Oroshiba Date: Thu, 25 Apr 2024 12:37:36 -0700 Subject: [PATCH 09/14] lint --- crates/astria-conductor/Cargo.toml | 12 ++++++++++-- .../astria-conductor/tests/blackbox/helpers/mod.rs | 2 +- crates/astria-core/Cargo.toml | 2 +- crates/astria-core/src/brotli.rs | 4 ++-- crates/astria-core/src/lib.rs | 4 ++-- crates/astria-sequencer-relayer/Cargo.toml | 6 +++++- .../src/relayer/write/conversion.rs | 2 +- 7 files changed, 22 insertions(+), 10 deletions(-) diff --git a/crates/astria-conductor/Cargo.toml b/crates/astria-conductor/Cargo.toml index 5667725d4c..c6084dad44 100644 --- a/crates/astria-conductor/Cargo.toml +++ b/crates/astria-conductor/Cargo.toml @@ -13,7 +13,11 @@ name = "astria-conductor" [dependencies] astria-build-info = { path = "../astria-build-info", features = ["runtime"] } -astria-core = { path = "../astria-core", features = ["client", "serde", "brotli"] } +astria-core = { path = "../astria-core", features = [ + "client", + "serde", + "brotli", +] } astria-eyre = { path = "../astria-eyre" } celestia-client = { package = "astria-celestia-client", path = "../astria-celestia-client" } config = { package = "astria-config", path = "../astria-config" } @@ -56,7 +60,11 @@ tracing-futures = { version = "0.2.5", features = ["futures-03"] } moka = { version = "0.12.5", features = ["future"] } [dev-dependencies] -astria-core = { path = "../astria-core", features = ["server", "test-utils", "brotli"] } +astria-core = { path = "../astria-core", features = [ + "server", + "test-utils", + "brotli", +] } astria-grpc-mock = { path = "../astria-grpc-mock" } config = { package = "astria-config", path = "../astria-config", features = [ "tests", diff --git a/crates/astria-conductor/tests/blackbox/helpers/mod.rs b/crates/astria-conductor/tests/blackbox/helpers/mod.rs index 2347b359c6..9b6f7bb587 100644 --- a/crates/astria-conductor/tests/blackbox/helpers/mod.rs +++ b/crates/astria-conductor/tests/blackbox/helpers/mod.rs @@ -6,6 +6,7 @@ use astria_conductor::{ Config, }; use astria_core::{ + brotli::compress_bytes, generated::{ execution::v1alpha2::{ Block, @@ -15,7 +16,6 @@ use astria_core::{ sequencerblock::v1alpha1::FilteredSequencerBlock, }, primitive::v1::RollupId, - brotli::compress_bytes, }; use bytes::Bytes; use celestia_client::celestia_types::{ diff --git a/crates/astria-core/Cargo.toml b/crates/astria-core/Cargo.toml index 05b875d83a..856e11ca42 100644 --- a/crates/astria-core/Cargo.toml +++ b/crates/astria-core/Cargo.toml @@ -16,7 +16,7 @@ keywords = ["astria", "grpc", "rpc", "blockchain", "execution", "protobuf"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -brotli = { version = "5.0.0", optional = true } +brotli = { version = "5.0.0", optional = true } pbjson = { version = "0.6.0", optional = true } merkle = { package = "astria-merkle", path = "../astria-merkle" } diff --git a/crates/astria-core/src/brotli.rs b/crates/astria-core/src/brotli.rs index bde243e94a..1e7d7c67e4 100644 --- a/crates/astria-core/src/brotli.rs +++ b/crates/astria-core/src/brotli.rs @@ -11,9 +11,9 @@ const BROTLI_BUFFER_SIZE: usize = 4096; /// Decompresses the given bytes using the Brotli algorithm. /// /// Returns the decompressed bytes. -/// +/// /// # Errors -/// +/// /// Returns an error if the decompression fails. pub fn decompress_bytes(data: &[u8]) -> Result, std::io::Error> { let mut output = Vec::with_capacity(BROTLI_BUFFER_SIZE); diff --git a/crates/astria-core/src/lib.rs b/crates/astria-core/src/lib.rs index 7dd37e60a8..81bef23801 100644 --- a/crates/astria-core/src/lib.rs +++ b/crates/astria-core/src/lib.rs @@ -11,10 +11,10 @@ pub mod primitive; pub mod protocol; pub mod sequencerblock; -#[cfg(feature = "serde")] -pub(crate) mod serde; #[cfg(feature = "brotli")] pub mod brotli; +#[cfg(feature = "serde")] +pub(crate) mod serde; /// A trait to convert from raw decoded protobuf types to idiomatic astria types. /// diff --git a/crates/astria-sequencer-relayer/Cargo.toml b/crates/astria-sequencer-relayer/Cargo.toml index 0d05a0cd7c..690c771ea7 100644 --- a/crates/astria-sequencer-relayer/Cargo.toml +++ b/crates/astria-sequencer-relayer/Cargo.toml @@ -43,7 +43,11 @@ tokio-util = { workspace = true } tonic = { workspace = true } astria-build-info = { path = "../astria-build-info", features = ["runtime"] } -astria-core = { path = "../astria-core", features = ["client", "serde", "brotli"] } +astria-core = { path = "../astria-core", features = [ + "client", + "serde", + "brotli", +] } astria-eyre = { path = "../astria-eyre" } celestia-client = { package = "astria-celestia-client", path = "../astria-celestia-client" } config = { package = "astria-config", path = "../astria-config" } diff --git a/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs b/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs index 035db68085..ed9dead1bd 100644 --- a/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs +++ b/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs @@ -1,6 +1,6 @@ use astria_core::{ - primitive::v1::RollupId, brotli::compress_bytes, + primitive::v1::RollupId, }; use astria_eyre::eyre::{ self, From e205bee9ca2f0ecc790311bb01a642ed8d05a0b0 Mon Sep 17 00:00:00 2001 From: Jordan Oroshiba Date: Thu, 25 Apr 2024 12:39:05 -0700 Subject: [PATCH 10/14] rollback workflow changes --- .github/workflows/release.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 608c241fe6..4be563804c 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -14,7 +14,7 @@ jobs: runs-on: ubuntu-latest environment: BUF needs: run_checker - if: github.event_name != 'merge_group' && (github.event_name != 'pull_request' || github.event.pull_request.head.repo.full_name == 'astriaorg/astria') && needs.run_checker.outputs.run_release_proto == 'true' + if: github.event_name != 'merge_group' && needs.run_checker.outputs.run_release_proto == 'true' && github.repository_owner == 'astriaorg' steps: - uses: actions/checkout@v4 - uses: bufbuild/buf-setup-action@v1 From d8f9e6893d402a5cef90479ce4e3c09039a5eba0 Mon Sep 17 00:00:00 2001 From: Jordan Oroshiba Date: Thu, 25 Apr 2024 12:51:07 -0700 Subject: [PATCH 11/14] metric review updates --- crates/astria-sequencer-relayer/src/metrics_init.rs | 12 +++++++----- .../src/relayer/write/conversion.rs | 10 +++++----- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/crates/astria-sequencer-relayer/src/metrics_init.rs b/crates/astria-sequencer-relayer/src/metrics_init.rs index bc0dc23c97..a07dc3f598 100644 --- a/crates/astria-sequencer-relayer/src/metrics_init.rs +++ b/crates/astria-sequencer-relayer/src/metrics_init.rs @@ -63,15 +63,15 @@ pub fn register() { ); describe_gauge!( - TOTAL_ASTRIA_BLOB_DATA_SIZE_FOR_BLOCK, + TOTAL_BLOB_DATA_SIZE_FOR_ASTRIA_BLOCK, Unit::Bytes, - "The total size of the compressed data blob for a Astria blob" + "The size of all compressed data for all `blob.data`s in an Astria block" ); describe_gauge!( COMPRESSION_RATIO_FOR_ASTRIA_BLOCK, Unit::Count, - "The ratio of the uncompressed to compressed data size for Astria data in a block" + "Ratio of uncompressed:compressed data size for all `blob.data`s in an Astria block" ); } @@ -109,8 +109,10 @@ pub const SEQUENCER_HEIGHT_FETCH_FAILURE_COUNT: &str = concat!( "_sequencer_height_fetch_failure_count", ); -pub const TOTAL_ASTRIA_BLOB_DATA_SIZE_FOR_BLOCK: &str = - concat!(env!("CARGO_CRATE_NAME"), "_total_astria_blob_data_size"); +pub const TOTAL_BLOB_DATA_SIZE_FOR_ASTRIA_BLOCK: &str = concat!( + env!("CARGO_CRATE_NAME"), + "_total_blob_data_size_for_astria_block" +); pub const COMPRESSION_RATIO_FOR_ASTRIA_BLOCK: &str = concat!( env!("CARGO_CRATE_NAME"), "_compression_ratio_for_astria_block" diff --git a/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs b/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs index ed9dead1bd..1946a1ff75 100644 --- a/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs +++ b/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs @@ -99,19 +99,19 @@ pub(super) fn convert(block: SequencerBlock) -> eyre::Result { rollups.push(info); } - let compression_ratio = total_data_uncompressed_size / total_data_compressed_size; + // gauges require f64, it's okay if the metrics get messed up by overflow or precision loss + #[allow(clippy::cast_precision_loss)] + let compression_ratio = total_data_uncompressed_size as f64 / total_data_compressed_size as f64; info!( sequencer_height = %sequencer_height, total_data_compressed_size = total_data_compressed_size, compression_ratio = compression_ratio, "converted blocks into blobs with compressed data", ); - // gauges require f64, it's okay if the metrics get messed up by overflow or precision loss #[allow(clippy::cast_precision_loss)] - metrics::gauge!(metrics_init::TOTAL_ASTRIA_BLOB_DATA_SIZE_FOR_BLOCK) + metrics::gauge!(metrics_init::TOTAL_BLOB_DATA_SIZE_FOR_ASTRIA_BLOCK) .set(total_data_compressed_size as f64); - #[allow(clippy::cast_precision_loss)] - metrics::gauge!(metrics_init::COMPRESSION_RATIO_FOR_ASTRIA_BLOCK).set(compression_ratio as f64); + metrics::gauge!(metrics_init::COMPRESSION_RATIO_FOR_ASTRIA_BLOCK).set(compression_ratio); Ok(Converted { blobs, From fa1bd0d1c278ab280ecdc0a27b6fac00f5c449eb Mon Sep 17 00:00:00 2001 From: Jordan Oroshiba Date: Thu, 25 Apr 2024 13:05:26 -0700 Subject: [PATCH 12/14] edit capacity --- crates/astria-core/src/brotli.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/crates/astria-core/src/brotli.rs b/crates/astria-core/src/brotli.rs index 1e7d7c67e4..b8f6dc42b2 100644 --- a/crates/astria-core/src/brotli.rs +++ b/crates/astria-core/src/brotli.rs @@ -16,7 +16,9 @@ const BROTLI_BUFFER_SIZE: usize = 4096; /// /// Returns an error if the decompression fails. pub fn decompress_bytes(data: &[u8]) -> Result, std::io::Error> { - let mut output = Vec::with_capacity(BROTLI_BUFFER_SIZE); + // Capacity based on expecting best potential compression ratio of 8x (based on benchmarks) + // only would need to resize 2 times to reach worst case. + let mut output = Vec::with_capacity(data.len() * 2); { let mut decompressor = DecompressorWriter::new(&mut output, BROTLI_BUFFER_SIZE); decompressor.write_all(data)?; @@ -38,7 +40,9 @@ pub fn compress_bytes(data: &[u8]) -> Result, std::io::Error> { size_hint: data.len(), ..Default::default() }; - let mut output = Vec::with_capacity(BROTLI_BUFFER_SIZE); + // Capacity based on expecting best potential compression ratio of 8x (based on benchmarks) + // only would need to resize 2 times to reach worst case. + let mut output = Vec::with_capacity(data.len() / 8); { let mut compressor = CompressorWriter::with_params(&mut output, BROTLI_BUFFER_SIZE, &compression_params); From 2e92c568d1ef35f32337d892afee7f658b01b67d Mon Sep 17 00:00:00 2001 From: Jordan Oroshiba Date: Thu, 25 Apr 2024 15:18:12 -0700 Subject: [PATCH 13/14] updated capacity --- crates/astria-core/src/brotli.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/astria-core/src/brotli.rs b/crates/astria-core/src/brotli.rs index b8f6dc42b2..211e75bc4d 100644 --- a/crates/astria-core/src/brotli.rs +++ b/crates/astria-core/src/brotli.rs @@ -16,9 +16,9 @@ const BROTLI_BUFFER_SIZE: usize = 4096; /// /// Returns an error if the decompression fails. pub fn decompress_bytes(data: &[u8]) -> Result, std::io::Error> { - // Capacity based on expecting best potential compression ratio of 8x (based on benchmarks) - // only would need to resize 2 times to reach worst case. - let mut output = Vec::with_capacity(data.len() * 2); + // Header blobs are small and occur frequently with low compression, capacity based on expecting + // those to be the most common case and reduce allocations. + let mut output = Vec::with_capacity(data.len()); { let mut decompressor = DecompressorWriter::new(&mut output, BROTLI_BUFFER_SIZE); decompressor.write_all(data)?; @@ -40,9 +40,9 @@ pub fn compress_bytes(data: &[u8]) -> Result, std::io::Error> { size_hint: data.len(), ..Default::default() }; - // Capacity based on expecting best potential compression ratio of 8x (based on benchmarks) - // only would need to resize 2 times to reach worst case. - let mut output = Vec::with_capacity(data.len() / 8); + // Header blobs are small and occur frequently with low compression, capacity based on expecting + // those to be the most common case and reduce allocations. + let mut output = Vec::with_capacity(data.len()); { let mut compressor = CompressorWriter::with_params(&mut output, BROTLI_BUFFER_SIZE, &compression_params); From 658a4b36c386782354f69a66345831ce6fcea4ca Mon Sep 17 00:00:00 2001 From: Jordan Oroshiba Date: Thu, 25 Apr 2024 15:19:25 -0700 Subject: [PATCH 14/14] debug log to decrease noise --- .../astria-sequencer-relayer/src/relayer/write/conversion.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs b/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs index 1946a1ff75..db6dafae57 100644 --- a/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs +++ b/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs @@ -13,7 +13,7 @@ use celestia_client::celestia_types::{ use prost::Message as _; use sequencer_client::SequencerBlock; use tendermint::block::Height as SequencerHeight; -use tracing::info; +use tracing::debug; use crate::metrics_init; @@ -102,7 +102,7 @@ pub(super) fn convert(block: SequencerBlock) -> eyre::Result { // gauges require f64, it's okay if the metrics get messed up by overflow or precision loss #[allow(clippy::cast_precision_loss)] let compression_ratio = total_data_uncompressed_size as f64 / total_data_compressed_size as f64; - info!( + debug!( sequencer_height = %sequencer_height, total_data_compressed_size = total_data_compressed_size, compression_ratio = compression_ratio,