Skip to content

Commit

Permalink
feat(conductor, relayer)!: brotli compress data blobs (#1006)
Browse files Browse the repository at this point in the history
## Summary
Uses brotli level 5 to compress data posted to celestia in relayer, and
decompresses from read in conductor.

## Background
Data size is largest component of DA costs, compression can reduce costs
of data posting. Based on research, and testing brotli level 5 provides
us fast enough compression with a good compression ratio for encoded
protobuf data.

## Changes
- compression with brotli in relayer
- decompression in conductor

## Testing
updated blackbox tests, smoke test on the repo

## Metrics
- `TOTAL_ASTRIA_BLOB_DATA_SIZE_FOR_BLOCK` - the total sum of bytes which
are in the data field of blobs for a single sequencer block
- `COMPRESSION_RATIO_FOR_ASTRIA_BLOCK` - the compression ratio of data
for the sequencer block
  • Loading branch information
joroshiba authored Apr 25, 2024
1 parent 65a22ce commit 1398555
Show file tree
Hide file tree
Showing 10 changed files with 194 additions and 14 deletions.
37 changes: 37 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 10 additions & 2 deletions crates/astria-conductor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ name = "astria-conductor"

[dependencies]
astria-build-info = { path = "../astria-build-info", features = ["runtime"] }
astria-core = { path = "../astria-core", features = ["client", "serde"] }
astria-core = { path = "../astria-core", features = [
"client",
"serde",
"brotli",
] }
astria-eyre = { path = "../astria-eyre" }
celestia-client = { package = "astria-celestia-client", path = "../astria-celestia-client" }
config = { package = "astria-config", path = "../astria-config" }
Expand Down Expand Up @@ -56,7 +60,11 @@ tracing-futures = { version = "0.2.5", features = ["futures-03"] }
moka = { version = "0.12.5", features = ["future"] }

[dev-dependencies]
astria-core = { path = "../astria-core", features = ["server", "test-utils"] }
astria-core = { path = "../astria-core", features = [
"server",
"test-utils",
"brotli",
] }
astria-grpc-mock = { path = "../astria-grpc-mock" }
config = { package = "astria-config", path = "../astria-config", features = [
"tests",
Expand Down
21 changes: 19 additions & 2 deletions crates/astria-conductor/src/celestia/convert.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use astria_core::brotli::decompress_bytes;
use celestia_client::{
celestia_types::{
nmt::Namespace,
Expand Down Expand Up @@ -97,7 +98,15 @@ impl ConvertedBlobs {

fn convert_header(blob: &Blob) -> Option<CelestiaSequencerBlob> {
use astria_core::generated::sequencerblock::v1alpha1::CelestiaSequencerBlob as ProtoType;
let raw = ProtoType::decode(&*blob.data)
let data = decompress_bytes(&blob.data)
.inspect_err(|err| {
info!(
error = err as &StdError,
"failed decompressing blob data; dropping the blob",
);
})
.ok()?;
let raw = ProtoType::decode(&*data)
.inspect_err(|err| {
info!(
error = err as &StdError,
Expand All @@ -118,7 +127,15 @@ fn convert_header(blob: &Blob) -> Option<CelestiaSequencerBlob> {

fn convert_rollup(blob: &Blob) -> Option<CelestiaRollupBlob> {
use astria_core::generated::sequencerblock::v1alpha1::CelestiaRollupBlob as ProtoType;
let raw_blob = ProtoType::decode(&*blob.data)
let data = decompress_bytes(&blob.data)
.inspect_err(|err| {
info!(
error = err as &StdError,
"failed decompressing rollup blob data; dropping the blob",
);
})
.ok()?;
let raw_blob = ProtoType::decode(&*data)
.inspect_err(|err| {
info!(
error = err as &StdError,
Expand Down
10 changes: 8 additions & 2 deletions crates/astria-conductor/tests/blackbox/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use astria_conductor::{
Config,
};
use astria_core::{
brotli::compress_bytes,
generated::{
execution::v1alpha2::{
Block,
Expand All @@ -32,6 +33,7 @@ use sequencer_client::{
#[macro_use]
mod macros;
mod mock_grpc;
use astria_eyre;
pub use mock_grpc::MockGrpc;
use serde_json::json;
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -476,17 +478,21 @@ pub struct Blobs {
pub fn make_blobs(height: u32) -> Blobs {
let (head, tail) = make_sequencer_block(height).into_celestia_blobs();

let raw_header = ::prost::Message::encode_to_vec(&head.into_raw());
let head_compressed = compress_bytes(&raw_header).unwrap();
let header = ::celestia_client::celestia_types::Blob::new(
::celestia_client::celestia_namespace_v0_from_bytes(crate::SEQUENCER_CHAIN_ID.as_bytes()),
::prost::Message::encode_to_vec(&head.into_raw()),
head_compressed,
)
.unwrap();

let mut rollup = Vec::new();
for elem in tail {
let raw_rollup = ::prost::Message::encode_to_vec(&elem.into_raw());
let rollup_compressed = compress_bytes(&raw_rollup).unwrap();
let blob = ::celestia_client::celestia_types::Blob::new(
::celestia_client::celestia_namespace_v0_from_rollup_id(crate::ROLLUP_ID),
::prost::Message::encode_to_vec(&elem.into_raw()),
rollup_compressed,
)
.unwrap();
rollup.push(blob);
Expand Down
2 changes: 2 additions & 0 deletions crates/astria-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ keywords = ["astria", "grpc", "rpc", "blockchain", "execution", "protobuf"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
brotli = { version = "5.0.0", optional = true }
pbjson = { version = "0.6.0", optional = true }

merkle = { package = "astria-merkle", path = "../astria-merkle" }
Expand Down Expand Up @@ -46,6 +47,7 @@ serde = ["dep:serde", "dep:pbjson", "dep:base64-serde"]
server = ["dep:tonic"]
test-utils = ["dep:rand"]
base64-serde = ["dep:base64-serde"]
brotli = ["dep:brotli"]

[dev-dependencies]
astria-core = { path = ".", features = ["serde"] }
Expand Down
53 changes: 53 additions & 0 deletions crates/astria-core/src/brotli.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use std::io::Write as _;

use brotli::{
enc::BrotliEncoderParams,
CompressorWriter,
DecompressorWriter,
};

const BROTLI_BUFFER_SIZE: usize = 4096;

/// Decompresses the given bytes using the Brotli algorithm.
///
/// Returns the decompressed bytes.
///
/// # Errors
///
/// Returns an error if the decompression fails.
pub fn decompress_bytes(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
// Header blobs are small and occur frequently with low compression, capacity based on expecting
// those to be the most common case and reduce allocations.
let mut output = Vec::with_capacity(data.len());
{
let mut decompressor = DecompressorWriter::new(&mut output, BROTLI_BUFFER_SIZE);
decompressor.write_all(data)?;
}

Ok(output)
}

/// Compresses the given bytes using the Brotli algorithm at setting 5.
///
/// Returns the compressed bytes.
///
/// # Errors
///
/// Returns an error if the compression fails.
pub fn compress_bytes(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
let compression_params = BrotliEncoderParams {
quality: 5,
size_hint: data.len(),
..Default::default()
};
// Header blobs are small and occur frequently with low compression, capacity based on expecting
// those to be the most common case and reduce allocations.
let mut output = Vec::with_capacity(data.len());
{
let mut compressor =
CompressorWriter::with_params(&mut output, BROTLI_BUFFER_SIZE, &compression_params);
compressor.write_all(data)?;
}

Ok(output)
}
2 changes: 2 additions & 0 deletions crates/astria-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ pub mod primitive;
pub mod protocol;
pub mod sequencerblock;

#[cfg(feature = "brotli")]
pub mod brotli;
#[cfg(feature = "serde")]
pub(crate) mod serde;

Expand Down
6 changes: 5 additions & 1 deletion crates/astria-sequencer-relayer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ tokio-util = { workspace = true }
tonic = { workspace = true }

astria-build-info = { path = "../astria-build-info", features = ["runtime"] }
astria-core = { path = "../astria-core", features = ["client", "serde"] }
astria-core = { path = "../astria-core", features = [
"client",
"serde",
"brotli",
] }
astria-eyre = { path = "../astria-eyre" }
celestia-client = { package = "astria-celestia-client", path = "../astria-celestia-client" }
config = { package = "astria-config", path = "../astria-config" }
Expand Down
21 changes: 21 additions & 0 deletions crates/astria-sequencer-relayer/src/metrics_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,18 @@ pub fn register() {
Unit::Seconds,
"The time it takes to submit a blob to Celestia"
);

describe_gauge!(
TOTAL_BLOB_DATA_SIZE_FOR_ASTRIA_BLOCK,
Unit::Bytes,
"The size of all compressed data for all `blob.data`s in an Astria block"
);

describe_gauge!(
COMPRESSION_RATIO_FOR_ASTRIA_BLOCK,
Unit::Count,
"Ratio of uncompressed:compressed data size for all `blob.data`s in an Astria block"
);
}

// We configure buckets for manually, in order to ensure Prometheus metrics are structured as a
Expand Down Expand Up @@ -96,3 +108,12 @@ pub const SEQUENCER_HEIGHT_FETCH_FAILURE_COUNT: &str = concat!(
env!("CARGO_CRATE_NAME"),
"_sequencer_height_fetch_failure_count",
);

pub const TOTAL_BLOB_DATA_SIZE_FOR_ASTRIA_BLOCK: &str = concat!(
env!("CARGO_CRATE_NAME"),
"_total_blob_data_size_for_astria_block"
);
pub const COMPRESSION_RATIO_FOR_ASTRIA_BLOCK: &str = concat!(
env!("CARGO_CRATE_NAME"),
"_compression_ratio_for_astria_block"
);
44 changes: 37 additions & 7 deletions crates/astria-sequencer-relayer/src/relayer/write/conversion.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use astria_core::primitive::v1::RollupId;
use astria_core::{
brotli::compress_bytes,
primitive::v1::RollupId,
};
use astria_eyre::eyre::{
self,
WrapErr as _,
Expand All @@ -10,6 +13,9 @@ use celestia_client::celestia_types::{
use prost::Message as _;
use sequencer_client::SequencerBlock;
use tendermint::block::Height as SequencerHeight;
use tracing::debug;

use crate::metrics_init;

// allow: the signature is dictated by the `serde(serialize_with = ...)` attribute.
#[allow(clippy::trivially_copy_pass_by_ref)]
Expand Down Expand Up @@ -54,6 +60,8 @@ pub(super) struct Converted {

pub(super) fn convert(block: SequencerBlock) -> eyre::Result<Converted> {
let sequencer_height = block.height();
let mut total_data_uncompressed_size = 0;
let mut total_data_compressed_size = 0;

let (sequencer_blob, rollup_blobs) = block.into_celestia_blobs();
// Allocate extra space: one blob for the sequencer blob "header",
Expand All @@ -62,12 +70,14 @@ pub(super) fn convert(block: SequencerBlock) -> eyre::Result<Converted> {
let sequencer_namespace = celestia_client::celestia_namespace_v0_from_str(
sequencer_blob.header().chain_id().as_str(),
);
let sequencer_blob_raw = sequencer_blob.into_raw().encode_to_vec();
total_data_uncompressed_size += sequencer_blob_raw.len();
let compressed_sequencer_blob_raw =
compress_bytes(&sequencer_blob_raw).wrap_err("failed compressing sequencer blob")?;
total_data_compressed_size += compressed_sequencer_blob_raw.len();

let header_blob = Blob::new(
sequencer_namespace,
sequencer_blob.into_raw().encode_to_vec(),
)
.wrap_err("failed creating head Celestia blob")?;
let header_blob = Blob::new(sequencer_namespace, compressed_sequencer_blob_raw)
.wrap_err("failed creating head Celestia blob")?;
blobs.push(header_blob);
let mut rollups = Vec::new();
for blob in rollup_blobs {
Expand All @@ -78,11 +88,31 @@ pub(super) fn convert(block: SequencerBlock) -> eyre::Result<Converted> {
celestia_namespace: namespace,
sequencer_rollup_id: blob.rollup_id(),
};
let blob = Blob::new(namespace, blob.into_raw().encode_to_vec())
let raw_blob = blob.into_raw().encode_to_vec();
total_data_uncompressed_size += raw_blob.len();
let compressed_blob = compress_bytes(&raw_blob)
.wrap_err_with(|| format!("failed compressing rollup `{rollup_id}`"))?;
total_data_compressed_size += compressed_blob.len();
let blob = Blob::new(namespace, compressed_blob)
.wrap_err_with(|| format!("failed creating blob for rollup `{rollup_id}`"))?;
blobs.push(blob);
rollups.push(info);
}

// gauges require f64, it's okay if the metrics get messed up by overflow or precision loss
#[allow(clippy::cast_precision_loss)]
let compression_ratio = total_data_uncompressed_size as f64 / total_data_compressed_size as f64;
debug!(
sequencer_height = %sequencer_height,
total_data_compressed_size = total_data_compressed_size,
compression_ratio = compression_ratio,
"converted blocks into blobs with compressed data",
);
#[allow(clippy::cast_precision_loss)]
metrics::gauge!(metrics_init::TOTAL_BLOB_DATA_SIZE_FOR_ASTRIA_BLOCK)
.set(total_data_compressed_size as f64);
metrics::gauge!(metrics_init::COMPRESSION_RATIO_FOR_ASTRIA_BLOCK).set(compression_ratio);

Ok(Converted {
blobs,
info: ConversionInfo {
Expand Down

0 comments on commit 1398555

Please sign in to comment.