Skip to content

Commit

Permalink
feat(conductor, relayer)!: batch multiple Sequencer blocks to save on…
Browse files Browse the repository at this point in the history
… Celestia fees (#1045)

## Summary
Batch mutiple Sequencer blocks into single Celestia blobs to reduce fee
payments.

## Background
Until now, each Sequencer block was turned into multiple blobs (one for
overall sequencer block metadata, and one blob per rollup that had
transactiosn in the sequencer block). This wasn't as efficient as it
could be because the new compression scheme introduced in
#1006 can only come to bear with
more bytes to compress.

Relayer will collect sequencer blocks up to a total (compressed) size of
1MB (1/2 of the current max of 2MB that Celestia blocks can be).

## Changes
- Introduce protobuf messages
`astria.sequencerblock.v1alpha1.CelestiaHeaderList` and
`astria.sequencerblock.v1alpha1.CelestiaRollupDataList`
- Rename `astria.sequencerblock.v1alpha1.CelestiaRollupBlob` to
`astria.sequencerblock.v1alpha1.CelestiaRollupData`
- Rename `astria.sequencerblock.v1alpha1.CelestiaSequencerBlob` to
`astria.sequencerblock.v1alpha1.CelestiaHeader`
- Collect Sequencer Blocks into the `*List` protobuf messages before
posting them to Celestia (instead of splitting up each Sequencer block
into mutiple blobs and posting them one by one).

## Testing
Add unit tests around the next submission aggregation logic. Update
conductor blackbox tests.

## Metrics
+ `CELESTIA_PAYLOAD_CREATION_LATENCY`: histogram with microsecond units
to track the time it takes to create a payload of Celestia blobs
(encoding + compressing all protobufs)
+ metrics for reporting compression ratio and total compressed payload
size were moved from the payload/blob construction phase to the
submission phase.

## Breaking Changelist
- Relayer and Conductor write/read new protobuf messages to/from
Celestia.

## Related Issues
Closes #1042
Closes #1049
  • Loading branch information
SuperFluffy authored May 8, 2024
1 parent 0cbcfcd commit 00f6b13
Show file tree
Hide file tree
Showing 21 changed files with 1,686 additions and 1,075 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ base64-serde = "0.7.0"
bytes = "1"
celestia-tendermint = "0.32.1"
celestia-types = "0.1.1"
celestia-rpc = "0.1.1"
clap = "4"
ed25519-consensus = "2.1.0"
ethers = "2.0.11"
Expand Down
4 changes: 2 additions & 2 deletions crates/astria-conductor/src/block_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{

use astria_core::sequencerblock::v1alpha1::{
block::FilteredSequencerBlock,
CelestiaSequencerBlob,
SubmittedMetadata,
};
use pin_project_lite::pin_project;
use sequencer_client::tendermint::block::Height;
Expand All @@ -21,7 +21,7 @@ impl GetSequencerHeight for FilteredSequencerBlock {
}
}

impl GetSequencerHeight for CelestiaSequencerBlob {
impl GetSequencerHeight for SubmittedMetadata {
fn get_height(&self) -> Height {
self.height()
}
Expand Down
6 changes: 3 additions & 3 deletions crates/astria-conductor/src/celestia/block_verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ mod test {
primitive::v1::RollupId,
sequencerblock::v1alpha1::{
block::SequencerBlockHeader,
celestia::UncheckedCelestiaSequencerBlob,
celestia::UncheckedSubmittedMetadata,
},
};
use prost::Message as _;
Expand Down Expand Up @@ -343,7 +343,7 @@ mod test {
};
let header = SequencerBlockHeader::try_from_raw(header).unwrap();

let sequencer_blob = UncheckedCelestiaSequencerBlob {
let sequencer_blob = UncheckedSubmittedMetadata {
block_hash: [0u8; 32],
header,
rollup_ids: vec![],
Expand Down Expand Up @@ -388,7 +388,7 @@ mod test {
};
let header = SequencerBlockHeader::try_from_raw(header).unwrap();

let sequencer_blob = UncheckedCelestiaSequencerBlob {
let sequencer_blob = UncheckedSubmittedMetadata {
block_hash: [0u8; 32],
header,
rollup_ids: vec![rollup_id],
Expand Down
118 changes: 71 additions & 47 deletions crates/astria-conductor/src/celestia/convert.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
use astria_core::{
brotli::decompress_bytes,
generated::sequencerblock::v1alpha1::{
SubmittedMetadataList,
SubmittedRollupDataList,
},
sequencerblock::v1alpha1::{
CelestiaRollupBlob,
CelestiaSequencerBlob,
celestia::{
SubmittedMetadataError,
SubmittedRollupDataError,
},
SubmittedMetadata,
SubmittedRollupData,
},
};
use celestia_types::{
Expand Down Expand Up @@ -33,8 +41,8 @@ pub(super) fn decode_raw_blobs(
let mut converted_blobs = ConvertedBlobs::new(raw_blobs.celestia_height);
for blob in raw_blobs.header_blobs {
if blob.namespace == sequencer_namespace {
if let Some(header) = convert_header(&blob) {
converted_blobs.push_header(header);
if let Some(header_list) = convert_blob_to_header_list(&blob) {
converted_blobs.extend_from_header_list_if_well_formed(header_list);
}
} else {
warn!(
Expand All @@ -47,8 +55,8 @@ pub(super) fn decode_raw_blobs(

for blob in raw_blobs.rollup_blobs {
if blob.namespace == rollup_namespace {
if let Some(rollup) = convert_rollup(&blob) {
converted_blobs.push_rollup(rollup);
if let Some(rollup_list) = convert_blob_to_rollup_data_list(&blob) {
converted_blobs.extend_from_rollup_data_list_if_well_formed(rollup_list);
}
} else {
warn!(
Expand All @@ -61,45 +69,76 @@ pub(super) fn decode_raw_blobs(
converted_blobs
}

/// An unsorted [`CelestiaSequencerBlob`] and [`CelestiaRollupBlob`].
/// An unsorted [`SubmittedMetadata`] and [`SubmittedRollupData`].
pub(super) struct ConvertedBlobs {
celestia_height: u64,
header_blobs: Vec<CelestiaSequencerBlob>,
rollup_blobs: Vec<CelestiaRollupBlob>,
metadata: Vec<SubmittedMetadata>,
rollup_data: Vec<SubmittedRollupData>,
}

impl ConvertedBlobs {
pub(super) fn len_header_blobs(&self) -> usize {
self.header_blobs.len()
pub(super) fn len_headers(&self) -> usize {
self.metadata.len()
}

pub(super) fn len_rollup_blobs(&self) -> usize {
self.rollup_blobs.len()
pub(super) fn len_rollup_data_entries(&self) -> usize {
self.rollup_data.len()
}

pub(super) fn into_parts(self) -> (u64, Vec<CelestiaSequencerBlob>, Vec<CelestiaRollupBlob>) {
(self.celestia_height, self.header_blobs, self.rollup_blobs)
pub(super) fn into_parts(self) -> (u64, Vec<SubmittedMetadata>, Vec<SubmittedRollupData>) {
(self.celestia_height, self.metadata, self.rollup_data)
}

fn new(celestia_height: u64) -> Self {
Self {
celestia_height,
header_blobs: Vec::new(),
rollup_blobs: Vec::new(),
metadata: Vec::new(),
rollup_data: Vec::new(),
}
}

fn push_header(&mut self, header: CelestiaSequencerBlob) {
self.header_blobs.push(header);
fn push_header(&mut self, header: SubmittedMetadata) {
self.metadata.push(header);
}

fn push_rollup_data(&mut self, rollup: SubmittedRollupData) {
self.rollup_data.push(rollup);
}

fn push_rollup(&mut self, rollup: CelestiaRollupBlob) {
self.rollup_blobs.push(rollup);
fn extend_from_header_list_if_well_formed(&mut self, list: SubmittedMetadataList) {
let initial_len = self.metadata.len();
if let Err(err) = list.entries.into_iter().try_for_each(|raw| {
let header = SubmittedMetadata::try_from_raw(raw)?;
self.push_header(header);
Ok::<(), SubmittedMetadataError>(())
}) {
info!(
error = &err as &StdError,
"one header in {} was not well-formed; dropping all",
SubmittedMetadataList::full_name(),
);
self.metadata.truncate(initial_len);
}
}

fn extend_from_rollup_data_list_if_well_formed(&mut self, list: SubmittedRollupDataList) {
let initial_len = self.rollup_data.len();
if let Err(err) = list.entries.into_iter().try_for_each(|raw| {
let entry = SubmittedRollupData::try_from_raw(raw)?;
self.push_rollup_data(entry);
Ok::<(), SubmittedRollupDataError>(())
}) {
info!(
error = &err as &StdError,
"one entry in {} was not well-formed; dropping all",
SubmittedRollupDataList::full_name(),
);
self.rollup_data.truncate(initial_len);
}
}
}

fn convert_header(blob: &Blob) -> Option<CelestiaSequencerBlob> {
use astria_core::generated::sequencerblock::v1alpha1::CelestiaSequencerBlob as ProtoType;
fn convert_blob_to_header_list(blob: &Blob) -> Option<SubmittedMetadataList> {
let data = decompress_bytes(&blob.data)
.inspect_err(|err| {
info!(
Expand All @@ -108,27 +147,19 @@ fn convert_header(blob: &Blob) -> Option<CelestiaSequencerBlob> {
);
})
.ok()?;
let raw = ProtoType::decode(&*data)
let raw = SubmittedMetadataList::decode(&*data)
.inspect_err(|err| {
info!(
error = err as &StdError,
target = ProtoType::full_name(),
"failed decoding blob bytes as sequencer header; dropping the blob",
target = SubmittedMetadataList::full_name(),
"failed decoding blob bytes; dropping the blob",
);
})
.ok()?;
CelestiaSequencerBlob::try_from_raw(raw)
.inspect_err(|err| {
info!(
error = err as &StdError,
"failed verifying decoded sequencer header; dropping it"
);
})
.ok()
Some(raw)
}

fn convert_rollup(blob: &Blob) -> Option<CelestiaRollupBlob> {
use astria_core::generated::sequencerblock::v1alpha1::CelestiaRollupBlob as ProtoType;
fn convert_blob_to_rollup_data_list(blob: &Blob) -> Option<SubmittedRollupDataList> {
let data = decompress_bytes(&blob.data)
.inspect_err(|err| {
info!(
Expand All @@ -137,21 +168,14 @@ fn convert_rollup(blob: &Blob) -> Option<CelestiaRollupBlob> {
);
})
.ok()?;
let raw_blob = ProtoType::decode(&*data)
let raw = SubmittedRollupDataList::decode(&*data)
.inspect_err(|err| {
info!(
error = err as &StdError,
target = ProtoType::full_name(),
"failed decoding blob bytes as rollup element; dropping the blob",
target = SubmittedRollupDataList::full_name(),
"failed decoding blob bytes; dropping the blob",
);
})
.ok()?;
CelestiaRollupBlob::try_from_raw(raw_blob)
.inspect_err(|err| {
info!(
error = err as &StdError,
"failed verifying decoded rollup element; dropping it"
);
})
.ok()
Some(raw)
}
8 changes: 4 additions & 4 deletions crates/astria-conductor/src/celestia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ use self::{
latest_height_stream::stream_latest_heights,
reconstruct::reconstruct_blocks_from_verified_blobs,
verify::{
verify_header_blobs,
verify_headers,
BlobVerifier,
},
};
Expand Down Expand Up @@ -516,12 +516,12 @@ impl FetchConvertVerifyAndReconstruct {
.wrap_err("encountered panic while decoding raw Celestia blobs")?;

info!(
number_of_header_blobs = decoded_blobs.len_header_blobs(),
number_of_rollup_blobs = decoded_blobs.len_rollup_blobs(),
number_of_header_blobs = decoded_blobs.len_headers(),
number_of_rollup_blobs = decoded_blobs.len_rollup_data_entries(),
"decoded Sequencer header and rollup info from raw Celestia blobs",
);

let verified_blobs = verify_header_blobs(blob_verifier, decoded_blobs).await;
let verified_blobs = verify_headers(blob_verifier, decoded_blobs).await;

info!(
number_of_verified_header_blobs = verified_blobs.len_header_blobs(),
Expand Down
14 changes: 7 additions & 7 deletions crates/astria-conductor/src/celestia/reconstruct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use std::collections::HashMap;
use astria_core::{
primitive::v1::RollupId,
sequencerblock::v1alpha1::{
CelestiaRollupBlob,
CelestiaSequencerBlob,
SubmittedMetadata,
SubmittedRollupData,
},
};
use telemetry::display::base64;
Expand Down Expand Up @@ -93,9 +93,9 @@ pub(super) fn reconstruct_blocks_from_verified_blobs(
}

fn remove_header_blob_matching_rollup_blob(
headers: &mut HashMap<[u8; 32], CelestiaSequencerBlob>,
rollup: &CelestiaRollupBlob,
) -> Option<CelestiaSequencerBlob> {
headers: &mut HashMap<[u8; 32], SubmittedMetadata>,
rollup: &SubmittedRollupData,
) -> Option<SubmittedMetadata> {
// chaining methods and returning () to use the ? operator and to not bind the value
headers
.get(&rollup.sequencer_block_hash())
Expand All @@ -106,8 +106,8 @@ fn remove_header_blob_matching_rollup_blob(
}

fn verify_rollup_blob_against_sequencer_blob(
rollup_blob: &CelestiaRollupBlob,
sequencer_blob: &CelestiaSequencerBlob,
rollup_blob: &SubmittedRollupData,
sequencer_blob: &SubmittedMetadata,
) -> bool {
rollup_blob
.proof()
Expand Down
Loading

0 comments on commit 00f6b13

Please sign in to comment.