Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(conductor, relayer)!: brotli compress data blobs #1006

Merged
merged 15 commits into from
Apr 25, 2024
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
runs-on: ubuntu-latest
environment: BUF
needs: run_checker
if: github.event_name != 'merge_group' && needs.run_checker.outputs.run_release_proto == 'true' && github.repository_owner == 'astriaorg'
if: github.event_name != 'merge_group' && (github.event_name != 'pull_request' || github.event.pull_request.head.repo.full_name == 'astriaorg/astria') && needs.run_checker.outputs.run_release_proto == 'true'
steps:
- uses: actions/checkout@v4
- uses: bufbuild/buf-setup-action@v1
Expand Down
37 changes: 37 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions crates/astria-conductor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ name = "astria-conductor"

[dependencies]
astria-build-info = { path = "../astria-build-info", features = ["runtime"] }
astria-core = { path = "../astria-core", features = ["client", "serde"] }
astria-core = { path = "../astria-core", features = ["client", "serde", "brotli"] }
astria-eyre = { path = "../astria-eyre" }
celestia-client = { package = "astria-celestia-client", path = "../astria-celestia-client" }
config = { package = "astria-config", path = "../astria-config" }
Expand Down Expand Up @@ -56,7 +56,7 @@ tracing-futures = { version = "0.2.5", features = ["futures-03"] }
moka = { version = "0.12.5", features = ["future"] }

[dev-dependencies]
astria-core = { path = "../astria-core", features = ["server", "test-utils"] }
astria-core = { path = "../astria-core", features = ["server", "test-utils", "brotli"] }
astria-grpc-mock = { path = "../astria-grpc-mock" }
config = { package = "astria-config", path = "../astria-config", features = [
"tests",
Expand Down
21 changes: 19 additions & 2 deletions crates/astria-conductor/src/celestia/convert.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use astria_core::brotli::decompress_bytes;
use celestia_client::{
celestia_types::{
nmt::Namespace,
Expand Down Expand Up @@ -97,7 +98,15 @@ impl ConvertedBlobs {

fn convert_header(blob: &Blob) -> Option<CelestiaSequencerBlob> {
use astria_core::generated::sequencerblock::v1alpha1::CelestiaSequencerBlob as ProtoType;
let raw = ProtoType::decode(&*blob.data)
let data = decompress_bytes(&blob.data)
.inspect_err(|err| {
info!(
error = err as &StdError,
"failed decompressing blob data; dropping the blob",
);
})
.ok()?;
let raw = ProtoType::decode(&*data)
.inspect_err(|err| {
info!(
error = err as &StdError,
Expand All @@ -118,7 +127,15 @@ fn convert_header(blob: &Blob) -> Option<CelestiaSequencerBlob> {

fn convert_rollup(blob: &Blob) -> Option<CelestiaRollupBlob> {
use astria_core::generated::sequencerblock::v1alpha1::CelestiaRollupBlob as ProtoType;
let raw_blob = ProtoType::decode(&*blob.data)
let data = decompress_bytes(&blob.data)
.inspect_err(|err| {
info!(
error = err as &StdError,
"failed decompressing rollup blob data; dropping the blob",
);
})
.ok()?;
let raw_blob = ProtoType::decode(&*data)
.inspect_err(|err| {
info!(
error = err as &StdError,
Expand Down
10 changes: 8 additions & 2 deletions crates/astria-conductor/tests/blackbox/helpers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use astria_core::{
sequencerblock::v1alpha1::FilteredSequencerBlock,
},
primitive::v1::RollupId,
brotli::compress_bytes,
};
use bytes::Bytes;
use celestia_client::celestia_types::{
Expand All @@ -32,6 +33,7 @@ use sequencer_client::{
#[macro_use]
mod macros;
mod mock_grpc;
use astria_eyre;
pub use mock_grpc::MockGrpc;
use serde_json::json;
use tokio::task::JoinHandle;
Expand Down Expand Up @@ -449,17 +451,21 @@ pub struct Blobs {
pub fn make_blobs(height: u32) -> Blobs {
let (head, tail) = make_sequencer_block(height).into_celestia_blobs();

let raw_header = ::prost::Message::encode_to_vec(&head.into_raw());
let head_compressed = compress_bytes(&raw_header).unwrap();
let header = ::celestia_client::celestia_types::Blob::new(
::celestia_client::celestia_namespace_v0_from_bytes(crate::SEQUENCER_CHAIN_ID.as_bytes()),
::prost::Message::encode_to_vec(&head.into_raw()),
head_compressed,
)
.unwrap();

let mut rollup = Vec::new();
for elem in tail {
let raw_rollup = ::prost::Message::encode_to_vec(&elem.into_raw());
let rollup_compressed = compress_bytes(&raw_rollup).unwrap();
let blob = ::celestia_client::celestia_types::Blob::new(
::celestia_client::celestia_namespace_v0_from_rollup_id(crate::ROLLUP_ID),
::prost::Message::encode_to_vec(&elem.into_raw()),
rollup_compressed,
)
.unwrap();
rollup.push(blob);
Expand Down
2 changes: 2 additions & 0 deletions crates/astria-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ keywords = ["astria", "grpc", "rpc", "blockchain", "execution", "protobuf"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
brotli = { version = "5.0.0", optional = true }
pbjson = { version = "0.6.0", optional = true }

merkle = { package = "astria-merkle", path = "../astria-merkle" }
Expand Down Expand Up @@ -46,6 +47,7 @@ serde = ["dep:serde", "dep:pbjson", "dep:base64-serde"]
server = ["dep:tonic"]
test-utils = ["dep:rand"]
base64-serde = ["dep:base64-serde"]
brotli = ["dep:brotli"]

[dev-dependencies]
astria-core = { path = ".", features = ["serde"] }
Expand Down
49 changes: 49 additions & 0 deletions crates/astria-core/src/brotli.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use std::io::Write as _;

use brotli::{
enc::BrotliEncoderParams,
CompressorWriter,
DecompressorWriter,
};

const BROTLI_BUFFER_SIZE: usize = 4096;

/// Decompresses the given bytes using the Brotli algorithm.
///
/// Returns the decompressed bytes.
///
/// # Errors
///
/// Returns an error if the decompression fails.
pub fn decompress_bytes(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
let mut output = Vec::with_capacity(BROTLI_BUFFER_SIZE);
{
let mut decompressor = DecompressorWriter::new(&mut output, BROTLI_BUFFER_SIZE);
decompressor.write_all(data)?;
}

Ok(output)
}

/// Compresses the given bytes using the Brotli algorithm at setting 5.
///
/// Returns the compressed bytes.
///
/// # Errors
///
/// Returns an error if the compression fails.
pub fn compress_bytes(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
let compression_params = BrotliEncoderParams {
quality: 5,
size_hint: data.len(),
..Default::default()
};
let mut output = Vec::with_capacity(BROTLI_BUFFER_SIZE);
{
let mut compressor =
CompressorWriter::with_params(&mut output, BROTLI_BUFFER_SIZE, &compression_params);
compressor.write_all(data)?;
}

Ok(output)
}
2 changes: 2 additions & 0 deletions crates/astria-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ pub mod sequencerblock;

#[cfg(feature = "serde")]
pub(crate) mod serde;
#[cfg(feature = "brotli")]
pub mod brotli;

/// A trait to convert from raw decoded protobuf types to idiomatic astria types.
///
Expand Down
2 changes: 1 addition & 1 deletion crates/astria-sequencer-relayer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ tokio-util = { workspace = true }
tonic = { workspace = true }

astria-build-info = { path = "../astria-build-info", features = ["runtime"] }
astria-core = { path = "../astria-core", features = ["client", "serde"] }
astria-core = { path = "../astria-core", features = ["client", "serde", "brotli"] }
astria-eyre = { path = "../astria-eyre" }
celestia-client = { package = "astria-celestia-client", path = "../astria-celestia-client" }
config = { package = "astria-config", path = "../astria-config" }
Expand Down
19 changes: 19 additions & 0 deletions crates/astria-sequencer-relayer/src/metrics_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,18 @@ pub fn register() {
Unit::Seconds,
"The time it takes to submit a blob to Celestia"
);

describe_gauge!(
TOTAL_ASTRIA_BLOB_DATA_SIZE_FOR_BLOCK,
Unit::Bytes,
"The total size of the compressed data blob for a Astria blob"
);

describe_gauge!(
COMPRESSION_RATIO_FOR_ASTRIA_BLOCK,
Unit::Count,
"The ratio of the uncompressed to compressed data size for Astria data in a block"
);
}

// We configure buckets for manually, in order to ensure Prometheus metrics are structured as a
Expand Down Expand Up @@ -96,3 +108,10 @@ pub const SEQUENCER_HEIGHT_FETCH_FAILURE_COUNT: &str = concat!(
env!("CARGO_CRATE_NAME"),
"_sequencer_height_fetch_failure_count",
);

pub const TOTAL_ASTRIA_BLOB_DATA_SIZE_FOR_BLOCK: &str =
concat!(env!("CARGO_CRATE_NAME"), "_total_astria_blob_data_size");
joroshiba marked this conversation as resolved.
Show resolved Hide resolved
pub const COMPRESSION_RATIO_FOR_ASTRIA_BLOCK: &str = concat!(
env!("CARGO_CRATE_NAME"),
"_compression_ratio_for_astria_block"
);
44 changes: 37 additions & 7 deletions crates/astria-sequencer-relayer/src/relayer/write/conversion.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use astria_core::primitive::v1::RollupId;
use astria_core::{
primitive::v1::RollupId,
brotli::compress_bytes,
};
use astria_eyre::eyre::{
self,
WrapErr as _,
Expand All @@ -10,6 +13,9 @@ use celestia_client::celestia_types::{
use prost::Message as _;
use sequencer_client::SequencerBlock;
use tendermint::block::Height as SequencerHeight;
use tracing::info;

use crate::metrics_init;

// allow: the signature is dictated by the `serde(serialize_with = ...)` attribute.
#[allow(clippy::trivially_copy_pass_by_ref)]
Expand Down Expand Up @@ -54,6 +60,8 @@ pub(super) struct Converted {

pub(super) fn convert(block: SequencerBlock) -> eyre::Result<Converted> {
let sequencer_height = block.height();
let mut total_data_uncompressed_size = 0;
let mut total_data_compressed_size = 0;

let (sequencer_blob, rollup_blobs) = block.into_celestia_blobs();
// Allocate extra space: one blob for the sequencer blob "header",
Expand All @@ -62,12 +70,14 @@ pub(super) fn convert(block: SequencerBlock) -> eyre::Result<Converted> {
let sequencer_namespace = celestia_client::celestia_namespace_v0_from_str(
sequencer_blob.header().chain_id().as_str(),
);
let sequencer_blob_raw = sequencer_blob.into_raw().encode_to_vec();
total_data_uncompressed_size += sequencer_blob_raw.len();
let compressed_sequencer_blob_raw =
compress_bytes(&sequencer_blob_raw).wrap_err("failed compressing sequencer blob")?;
total_data_compressed_size += compressed_sequencer_blob_raw.len();

let header_blob = Blob::new(
sequencer_namespace,
sequencer_blob.into_raw().encode_to_vec(),
)
.wrap_err("failed creating head Celestia blob")?;
let header_blob = Blob::new(sequencer_namespace, compressed_sequencer_blob_raw)
.wrap_err("failed creating head Celestia blob")?;
blobs.push(header_blob);
let mut rollups = Vec::new();
for blob in rollup_blobs {
Expand All @@ -78,11 +88,31 @@ pub(super) fn convert(block: SequencerBlock) -> eyre::Result<Converted> {
celestia_namespace: namespace,
sequencer_rollup_id: blob.rollup_id(),
};
let blob = Blob::new(namespace, blob.into_raw().encode_to_vec())
let raw_blob = blob.into_raw().encode_to_vec();
total_data_uncompressed_size += raw_blob.len();
let compressed_blob = compress_bytes(&raw_blob)
.wrap_err_with(|| format!("failed compressing rollup `{rollup_id}`"))?;
total_data_compressed_size += compressed_blob.len();
let blob = Blob::new(namespace, compressed_blob)
.wrap_err_with(|| format!("failed creating blob for rollup `{rollup_id}`"))?;
blobs.push(blob);
rollups.push(info);
}

let compression_ratio = total_data_uncompressed_size / total_data_compressed_size;
info!(
joroshiba marked this conversation as resolved.
Show resolved Hide resolved
sequencer_height = %sequencer_height,
total_data_compressed_size = total_data_compressed_size,
compression_ratio = compression_ratio,
"converted blocks into blobs with compressed data",
);
// gauges require f64, it's okay if the metrics get messed up by overflow or precision loss
#[allow(clippy::cast_precision_loss)]
metrics::gauge!(metrics_init::TOTAL_ASTRIA_BLOB_DATA_SIZE_FOR_BLOCK)
.set(total_data_compressed_size as f64);
#[allow(clippy::cast_precision_loss)]
metrics::gauge!(metrics_init::COMPRESSION_RATIO_FOR_ASTRIA_BLOCK).set(compression_ratio as f64);

Ok(Converted {
blobs,
info: ConversionInfo {
Expand Down
Loading