diff --git a/Cargo.lock b/Cargo.lock index 3a536075cf..9349e7f0c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -68,6 +68,21 @@ dependencies = [ "memchr", ] +[[package]] +name = "alloc-no-stdlib" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc7bb162ec39d46ab1ca8c77bf72e890535becd1751bb45f64c597edb4c8c6b3" + +[[package]] +name = "alloc-stdlib" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94fb8275041c72129eb51b7d0322c29b8387a0386127718b096429201a5d6ece" +dependencies = [ + "alloc-no-stdlib", +] + [[package]] name = "allocator-api2" version = "0.2.16" @@ -653,6 +668,7 @@ dependencies = [ "astria-merkle", "base64 0.21.7", "base64-serde", + "brotli", "bytes", "celestia-tendermint", "ed25519-consensus", @@ -1337,6 +1353,27 @@ dependencies = [ "syn_derive", ] +[[package]] +name = "brotli" +version = "5.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19483b140a7ac7174d34b5a581b406c64f84da5409d3e09cf4fff604f9270e67" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", + "brotli-decompressor", +] + +[[package]] +name = "brotli-decompressor" +version = "4.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6221fe77a248b9117d431ad93761222e1cf8ff282d9d1d5d9f53d6299a1cf76" +dependencies = [ + "alloc-no-stdlib", + "alloc-stdlib", +] + [[package]] name = "bs58" version = "0.5.1" diff --git a/crates/astria-conductor/Cargo.toml b/crates/astria-conductor/Cargo.toml index e3a130bbca..6a1770b59b 100644 --- a/crates/astria-conductor/Cargo.toml +++ b/crates/astria-conductor/Cargo.toml @@ -13,7 +13,11 @@ name = "astria-conductor" [dependencies] astria-build-info = { path = "../astria-build-info", features = ["runtime"] } -astria-core = { path = "../astria-core", features = ["client", "serde"] } +astria-core = { path = "../astria-core", features = [ + "client", + "serde", + "brotli", +] } astria-eyre = { path = "../astria-eyre" } celestia-client = { package = "astria-celestia-client", path = "../astria-celestia-client" } config = { package = "astria-config", path = "../astria-config" } @@ -56,7 +60,11 @@ tracing-futures = { version = "0.2.5", features = ["futures-03"] } moka = { version = "0.12.5", features = ["future"] } [dev-dependencies] -astria-core = { path = "../astria-core", features = ["server", "test-utils"] } +astria-core = { path = "../astria-core", features = [ + "server", + "test-utils", + "brotli", +] } astria-grpc-mock = { path = "../astria-grpc-mock" } config = { package = "astria-config", path = "../astria-config", features = [ "tests", diff --git a/crates/astria-conductor/src/celestia/convert.rs b/crates/astria-conductor/src/celestia/convert.rs index 10b7ee7462..76e1e03984 100644 --- a/crates/astria-conductor/src/celestia/convert.rs +++ b/crates/astria-conductor/src/celestia/convert.rs @@ -1,3 +1,4 @@ +use astria_core::brotli::decompress_bytes; use celestia_client::{ celestia_types::{ nmt::Namespace, @@ -97,7 +98,15 @@ impl ConvertedBlobs { fn convert_header(blob: &Blob) -> Option { use astria_core::generated::sequencerblock::v1alpha1::CelestiaSequencerBlob as ProtoType; - let raw = ProtoType::decode(&*blob.data) + let data = decompress_bytes(&blob.data) + .inspect_err(|err| { + info!( + error = err as &StdError, + "failed decompressing blob data; dropping the blob", + ); + }) + .ok()?; + let raw = ProtoType::decode(&*data) .inspect_err(|err| { info!( error = err as &StdError, @@ -118,7 +127,15 @@ fn convert_header(blob: &Blob) -> Option { fn convert_rollup(blob: &Blob) -> Option { use astria_core::generated::sequencerblock::v1alpha1::CelestiaRollupBlob as ProtoType; - let raw_blob = ProtoType::decode(&*blob.data) + let data = decompress_bytes(&blob.data) + .inspect_err(|err| { + info!( + error = err as &StdError, + "failed decompressing rollup blob data; dropping the blob", + ); + }) + .ok()?; + let raw_blob = ProtoType::decode(&*data) .inspect_err(|err| { info!( error = err as &StdError, diff --git a/crates/astria-conductor/tests/blackbox/helpers/mod.rs b/crates/astria-conductor/tests/blackbox/helpers/mod.rs index f1ecf4989a..032693b2a8 100644 --- a/crates/astria-conductor/tests/blackbox/helpers/mod.rs +++ b/crates/astria-conductor/tests/blackbox/helpers/mod.rs @@ -6,6 +6,7 @@ use astria_conductor::{ Config, }; use astria_core::{ + brotli::compress_bytes, generated::{ execution::v1alpha2::{ Block, @@ -32,6 +33,7 @@ use sequencer_client::{ #[macro_use] mod macros; mod mock_grpc; +use astria_eyre; pub use mock_grpc::MockGrpc; use serde_json::json; use tokio::task::JoinHandle; @@ -476,17 +478,21 @@ pub struct Blobs { pub fn make_blobs(height: u32) -> Blobs { let (head, tail) = make_sequencer_block(height).into_celestia_blobs(); + let raw_header = ::prost::Message::encode_to_vec(&head.into_raw()); + let head_compressed = compress_bytes(&raw_header).unwrap(); let header = ::celestia_client::celestia_types::Blob::new( ::celestia_client::celestia_namespace_v0_from_bytes(crate::SEQUENCER_CHAIN_ID.as_bytes()), - ::prost::Message::encode_to_vec(&head.into_raw()), + head_compressed, ) .unwrap(); let mut rollup = Vec::new(); for elem in tail { + let raw_rollup = ::prost::Message::encode_to_vec(&elem.into_raw()); + let rollup_compressed = compress_bytes(&raw_rollup).unwrap(); let blob = ::celestia_client::celestia_types::Blob::new( ::celestia_client::celestia_namespace_v0_from_rollup_id(crate::ROLLUP_ID), - ::prost::Message::encode_to_vec(&elem.into_raw()), + rollup_compressed, ) .unwrap(); rollup.push(blob); diff --git a/crates/astria-core/Cargo.toml b/crates/astria-core/Cargo.toml index f3be1f5daf..856e11ca42 100644 --- a/crates/astria-core/Cargo.toml +++ b/crates/astria-core/Cargo.toml @@ -16,6 +16,7 @@ keywords = ["astria", "grpc", "rpc", "blockchain", "execution", "protobuf"] # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +brotli = { version = "5.0.0", optional = true } pbjson = { version = "0.6.0", optional = true } merkle = { package = "astria-merkle", path = "../astria-merkle" } @@ -46,6 +47,7 @@ serde = ["dep:serde", "dep:pbjson", "dep:base64-serde"] server = ["dep:tonic"] test-utils = ["dep:rand"] base64-serde = ["dep:base64-serde"] +brotli = ["dep:brotli"] [dev-dependencies] astria-core = { path = ".", features = ["serde"] } diff --git a/crates/astria-core/src/brotli.rs b/crates/astria-core/src/brotli.rs new file mode 100644 index 0000000000..211e75bc4d --- /dev/null +++ b/crates/astria-core/src/brotli.rs @@ -0,0 +1,53 @@ +use std::io::Write as _; + +use brotli::{ + enc::BrotliEncoderParams, + CompressorWriter, + DecompressorWriter, +}; + +const BROTLI_BUFFER_SIZE: usize = 4096; + +/// Decompresses the given bytes using the Brotli algorithm. +/// +/// Returns the decompressed bytes. +/// +/// # Errors +/// +/// Returns an error if the decompression fails. +pub fn decompress_bytes(data: &[u8]) -> Result, std::io::Error> { + // Header blobs are small and occur frequently with low compression, capacity based on expecting + // those to be the most common case and reduce allocations. + let mut output = Vec::with_capacity(data.len()); + { + let mut decompressor = DecompressorWriter::new(&mut output, BROTLI_BUFFER_SIZE); + decompressor.write_all(data)?; + } + + Ok(output) +} + +/// Compresses the given bytes using the Brotli algorithm at setting 5. +/// +/// Returns the compressed bytes. +/// +/// # Errors +/// +/// Returns an error if the compression fails. +pub fn compress_bytes(data: &[u8]) -> Result, std::io::Error> { + let compression_params = BrotliEncoderParams { + quality: 5, + size_hint: data.len(), + ..Default::default() + }; + // Header blobs are small and occur frequently with low compression, capacity based on expecting + // those to be the most common case and reduce allocations. + let mut output = Vec::with_capacity(data.len()); + { + let mut compressor = + CompressorWriter::with_params(&mut output, BROTLI_BUFFER_SIZE, &compression_params); + compressor.write_all(data)?; + } + + Ok(output) +} diff --git a/crates/astria-core/src/lib.rs b/crates/astria-core/src/lib.rs index cc1f875ef9..81bef23801 100644 --- a/crates/astria-core/src/lib.rs +++ b/crates/astria-core/src/lib.rs @@ -11,6 +11,8 @@ pub mod primitive; pub mod protocol; pub mod sequencerblock; +#[cfg(feature = "brotli")] +pub mod brotli; #[cfg(feature = "serde")] pub(crate) mod serde; diff --git a/crates/astria-sequencer-relayer/Cargo.toml b/crates/astria-sequencer-relayer/Cargo.toml index d8ef64fda9..920c6d6b36 100644 --- a/crates/astria-sequencer-relayer/Cargo.toml +++ b/crates/astria-sequencer-relayer/Cargo.toml @@ -47,7 +47,11 @@ tokio-util = { workspace = true } tonic = { workspace = true } astria-build-info = { path = "../astria-build-info", features = ["runtime"] } -astria-core = { path = "../astria-core", features = ["client", "serde"] } +astria-core = { path = "../astria-core", features = [ + "client", + "serde", + "brotli", +] } astria-eyre = { path = "../astria-eyre" } celestia-client = { package = "astria-celestia-client", path = "../astria-celestia-client" } config = { package = "astria-config", path = "../astria-config" } diff --git a/crates/astria-sequencer-relayer/src/metrics_init.rs b/crates/astria-sequencer-relayer/src/metrics_init.rs index 2b47666c6a..a07dc3f598 100644 --- a/crates/astria-sequencer-relayer/src/metrics_init.rs +++ b/crates/astria-sequencer-relayer/src/metrics_init.rs @@ -61,6 +61,18 @@ pub fn register() { Unit::Seconds, "The time it takes to submit a blob to Celestia" ); + + describe_gauge!( + TOTAL_BLOB_DATA_SIZE_FOR_ASTRIA_BLOCK, + Unit::Bytes, + "The size of all compressed data for all `blob.data`s in an Astria block" + ); + + describe_gauge!( + COMPRESSION_RATIO_FOR_ASTRIA_BLOCK, + Unit::Count, + "Ratio of uncompressed:compressed data size for all `blob.data`s in an Astria block" + ); } // We configure buckets for manually, in order to ensure Prometheus metrics are structured as a @@ -96,3 +108,12 @@ pub const SEQUENCER_HEIGHT_FETCH_FAILURE_COUNT: &str = concat!( env!("CARGO_CRATE_NAME"), "_sequencer_height_fetch_failure_count", ); + +pub const TOTAL_BLOB_DATA_SIZE_FOR_ASTRIA_BLOCK: &str = concat!( + env!("CARGO_CRATE_NAME"), + "_total_blob_data_size_for_astria_block" +); +pub const COMPRESSION_RATIO_FOR_ASTRIA_BLOCK: &str = concat!( + env!("CARGO_CRATE_NAME"), + "_compression_ratio_for_astria_block" +); diff --git a/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs b/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs index f1bc80d1e0..db6dafae57 100644 --- a/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs +++ b/crates/astria-sequencer-relayer/src/relayer/write/conversion.rs @@ -1,4 +1,7 @@ -use astria_core::primitive::v1::RollupId; +use astria_core::{ + brotli::compress_bytes, + primitive::v1::RollupId, +}; use astria_eyre::eyre::{ self, WrapErr as _, @@ -10,6 +13,9 @@ use celestia_client::celestia_types::{ use prost::Message as _; use sequencer_client::SequencerBlock; use tendermint::block::Height as SequencerHeight; +use tracing::debug; + +use crate::metrics_init; // allow: the signature is dictated by the `serde(serialize_with = ...)` attribute. #[allow(clippy::trivially_copy_pass_by_ref)] @@ -54,6 +60,8 @@ pub(super) struct Converted { pub(super) fn convert(block: SequencerBlock) -> eyre::Result { let sequencer_height = block.height(); + let mut total_data_uncompressed_size = 0; + let mut total_data_compressed_size = 0; let (sequencer_blob, rollup_blobs) = block.into_celestia_blobs(); // Allocate extra space: one blob for the sequencer blob "header", @@ -62,12 +70,14 @@ pub(super) fn convert(block: SequencerBlock) -> eyre::Result { let sequencer_namespace = celestia_client::celestia_namespace_v0_from_str( sequencer_blob.header().chain_id().as_str(), ); + let sequencer_blob_raw = sequencer_blob.into_raw().encode_to_vec(); + total_data_uncompressed_size += sequencer_blob_raw.len(); + let compressed_sequencer_blob_raw = + compress_bytes(&sequencer_blob_raw).wrap_err("failed compressing sequencer blob")?; + total_data_compressed_size += compressed_sequencer_blob_raw.len(); - let header_blob = Blob::new( - sequencer_namespace, - sequencer_blob.into_raw().encode_to_vec(), - ) - .wrap_err("failed creating head Celestia blob")?; + let header_blob = Blob::new(sequencer_namespace, compressed_sequencer_blob_raw) + .wrap_err("failed creating head Celestia blob")?; blobs.push(header_blob); let mut rollups = Vec::new(); for blob in rollup_blobs { @@ -78,11 +88,31 @@ pub(super) fn convert(block: SequencerBlock) -> eyre::Result { celestia_namespace: namespace, sequencer_rollup_id: blob.rollup_id(), }; - let blob = Blob::new(namespace, blob.into_raw().encode_to_vec()) + let raw_blob = blob.into_raw().encode_to_vec(); + total_data_uncompressed_size += raw_blob.len(); + let compressed_blob = compress_bytes(&raw_blob) + .wrap_err_with(|| format!("failed compressing rollup `{rollup_id}`"))?; + total_data_compressed_size += compressed_blob.len(); + let blob = Blob::new(namespace, compressed_blob) .wrap_err_with(|| format!("failed creating blob for rollup `{rollup_id}`"))?; blobs.push(blob); rollups.push(info); } + + // gauges require f64, it's okay if the metrics get messed up by overflow or precision loss + #[allow(clippy::cast_precision_loss)] + let compression_ratio = total_data_uncompressed_size as f64 / total_data_compressed_size as f64; + debug!( + sequencer_height = %sequencer_height, + total_data_compressed_size = total_data_compressed_size, + compression_ratio = compression_ratio, + "converted blocks into blobs with compressed data", + ); + #[allow(clippy::cast_precision_loss)] + metrics::gauge!(metrics_init::TOTAL_BLOB_DATA_SIZE_FOR_ASTRIA_BLOCK) + .set(total_data_compressed_size as f64); + metrics::gauge!(metrics_init::COMPRESSION_RATIO_FOR_ASTRIA_BLOCK).set(compression_ratio); + Ok(Converted { blobs, info: ConversionInfo {