From 2e2d1a6f9207dad4a921c48575d7e26cc68b6132 Mon Sep 17 00:00:00 2001 From: clabby Date: Mon, 21 Oct 2024 15:12:10 -0400 Subject: [PATCH 1/6] feat(derive): `BatchProvider` multiplexed stage --- bin/client/src/l1/driver.rs | 4 +- crates/derive-alloy/src/pipeline.rs | 4 +- crates/derive/src/pipeline/builder.rs | 12 +-- .../derive/src/stages/batch/batch_provider.rs | 80 +++++++++++++++++++ crates/derive/src/stages/batch/batch_queue.rs | 47 ++++++----- .../src/stages/batch/batch_validator.rs | 25 +++--- crates/derive/src/stages/batch/mod.rs | 9 ++- .../src/stages/channel/channel_provider.rs | 2 +- crates/derive/src/stages/mod.rs | 4 +- crates/derive/src/stages/multiplexed.rs | 40 +++++----- .../{batch_queue.rs => batch_provider.rs} | 12 +-- .../{channel_bank.rs => channel_provider.rs} | 0 crates/derive/src/test_utils/mod.rs | 14 ++-- crates/derive/src/test_utils/pipeline.rs | 13 +-- 14 files changed, 182 insertions(+), 84 deletions(-) create mode 100644 crates/derive/src/stages/batch/batch_provider.rs rename crates/derive/src/test_utils/{batch_queue.rs => batch_provider.rs} (87%) rename crates/derive/src/test_utils/{channel_bank.rs => channel_provider.rs} (100%) diff --git a/bin/client/src/l1/driver.rs b/bin/client/src/l1/driver.rs index e7804dbcf..73dffd3d8 100644 --- a/bin/client/src/l1/driver.rs +++ b/bin/client/src/l1/driver.rs @@ -16,7 +16,7 @@ use kona_derive::{ pipeline::{DerivationPipeline, Pipeline, PipelineBuilder, StepResult}, sources::EthereumDataSource, stages::{ - AttributesQueue, BatchQueue, BatchStream, ChannelProvider, ChannelReader, FrameQueue, + AttributesQueue, BatchProvider, BatchStream, ChannelProvider, ChannelReader, FrameQueue, L1Retrieval, L1Traversal, }, traits::{ @@ -49,7 +49,7 @@ pub type OracleAttributesBuilder = /// An oracle-backed attributes queue for the derivation pipeline. pub type OracleAttributesQueue = AttributesQueue< - BatchQueue< + BatchProvider< BatchStream< ChannelReader< ChannelProvider< diff --git a/crates/derive-alloy/src/pipeline.rs b/crates/derive-alloy/src/pipeline.rs index 664c55756..d284b0a83 100644 --- a/crates/derive-alloy/src/pipeline.rs +++ b/crates/derive-alloy/src/pipeline.rs @@ -5,7 +5,7 @@ use kona_derive::{ pipeline::{DerivationPipeline, PipelineBuilder}, sources::EthereumDataSource, stages::{ - AttributesQueue, BatchQueue, BatchStream, ChannelProvider, ChannelReader, FrameQueue, + AttributesQueue, BatchProvider, BatchStream, ChannelProvider, ChannelReader, FrameQueue, L1Retrieval, L1Traversal, }, }; @@ -34,7 +34,7 @@ pub type OnlineAttributesBuilder = /// An `online` attributes queue for the derivation pipeline. pub type OnlineAttributesQueue = AttributesQueue< - BatchQueue< + BatchProvider< BatchStream< ChannelReader< ChannelProvider>>>, diff --git a/crates/derive/src/pipeline/builder.rs b/crates/derive/src/pipeline/builder.rs index 75c75e1a1..8dc995412 100644 --- a/crates/derive/src/pipeline/builder.rs +++ b/crates/derive/src/pipeline/builder.rs @@ -3,7 +3,7 @@ use super::{AttributesBuilder, DataAvailabilityProvider, DerivationPipeline}; use crate::{ stages::{ - AttributesQueue, BatchQueue, BatchStream, ChannelProvider, ChannelReader, FrameQueue, + AttributesQueue, BatchProvider, BatchStream, ChannelProvider, ChannelReader, FrameQueue, L1Retrieval, L1Traversal, }, traits::{ChainProvider, L2ChainProvider}, @@ -19,8 +19,8 @@ type FrameQueueStage = FrameQueue>; type ChannelProviderStage = ChannelProvider>; type ChannelReaderStage = ChannelReader>; type BatchStreamStage = BatchStream, T>; -type BatchQueueStage = BatchQueue, T>; -type AttributesQueueStage = AttributesQueue, B>; +type BatchProviderStage = BatchProvider, T>; +type AttributesQueueStage = AttributesQueue, B>; /// The `PipelineBuilder` constructs a [DerivationPipeline] using a builder pattern. #[derive(Debug)] @@ -137,10 +137,10 @@ where let channel_reader = ChannelReader::new(channel_provider, Arc::clone(&rollup_config)); let batch_stream = BatchStream::new(channel_reader, rollup_config.clone(), l2_chain_provider.clone()); - let batch_queue = - BatchQueue::new(rollup_config.clone(), batch_stream, l2_chain_provider.clone()); + let batch_provider = + BatchProvider::new(rollup_config.clone(), batch_stream, l2_chain_provider.clone()); let attributes = - AttributesQueue::new(rollup_config.clone(), batch_queue, attributes_builder); + AttributesQueue::new(rollup_config.clone(), batch_provider, attributes_builder); // Create the pipeline. Self::new(attributes, rollup_config, l2_chain_provider) diff --git a/crates/derive/src/stages/batch/batch_provider.rs b/crates/derive/src/stages/batch/batch_provider.rs new file mode 100644 index 000000000..e2a4e398a --- /dev/null +++ b/crates/derive/src/stages/batch/batch_provider.rs @@ -0,0 +1,80 @@ +//! This module contains the [BatchProvider] stage. + +use super::NextBatchProvider; +use crate::{ + batch::SingleBatch, + stages::{multiplexed::multiplexed_stage, AttributesProvider, BatchQueue, BatchValidator}, + traits::L2ChainProvider, +}; +use core::fmt::Debug; +use op_alloy_protocol::L2BlockInfo; + +multiplexed_stage!( + BatchProvider, + additional_fields: { + /// The L2 chain fetcher. + fetcher: F, + }, + stages: { + BatchValidator => is_holocene_active, + }, + default_stage: BatchQueue(fetcher) +); + +#[async_trait] +impl AttributesProvider for BatchProvider +where + P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug + Send, + F: L2ChainProvider + Clone + Send + Debug, +{ + fn is_last_in_span(&self) -> bool { + let Some(stage) = self.active_stage_ref() else { + return false; + }; + + match stage { + ActiveStage::BatchQueue(stage) => stage.is_last_in_span(), + ActiveStage::BatchValidator(stage) => stage.is_last_in_span(), + } + } + + async fn next_batch(&mut self, parent: L2BlockInfo) -> PipelineResult { + match self.active_stage_mut()? { + ActiveStage::BatchQueue(stage) => stage.next_batch(parent).await, + ActiveStage::BatchValidator(stage) => stage.next_batch(parent).await, + } + } +} + +#[cfg(test)] +mod test { + use super::BatchProvider; + use crate::{ + stages::batch::batch_provider::ActiveStage, + test_utils::{TestL2ChainProvider, TestNextBatchProvider}, + }; + use op_alloy_genesis::RollupConfig; + use std::sync::Arc; + + #[test] + fn test_batch_provider_validator_active() { + let provider = TestNextBatchProvider::new(vec![]); + let l2_provider = TestL2ChainProvider::default(); + let cfg = Arc::new(RollupConfig { holocene_time: Some(0), ..Default::default() }); + let mut batch_provider = BatchProvider::new(cfg, provider, l2_provider); + + let active_stage = batch_provider.active_stage_mut().unwrap(); + assert!(matches!(active_stage, ActiveStage::BatchValidator(_))); + } + + #[test] + fn test_batch_provider_batch_queue_active() { + let provider = TestNextBatchProvider::new(vec![]); + let l2_provider = TestL2ChainProvider::default(); + let cfg = Arc::new(RollupConfig::default()); + let mut batch_provider = BatchProvider::new(cfg, provider, l2_provider); + + let active_stage = batch_provider.active_stage_mut().unwrap(); + assert!(matches!(active_stage, ActiveStage::BatchQueue(_))); + } +} diff --git a/crates/derive/src/stages/batch/batch_queue.rs b/crates/derive/src/stages/batch/batch_queue.rs index cf32463e0..b82fe50cb 100644 --- a/crates/derive/src/stages/batch/batch_queue.rs +++ b/crates/derive/src/stages/batch/batch_queue.rs @@ -78,6 +78,11 @@ where } } + /// Consumes [Self] and returns the inner previous stage. + pub fn into_prev(self) -> P { + self.prev + } + /// Pops the next batch from the current queued up span-batch cache. /// The parent is used to set the parent hash of the batch. /// The parent is verified when the batch is later validated. @@ -475,7 +480,7 @@ mod tests { use super::*; use crate::{ stages::channel::channel_reader::BatchReader, - test_utils::{CollectingLayer, TestBatchQueueProvider, TestL2ChainProvider, TraceStorage}, + test_utils::{CollectingLayer, TestL2ChainProvider, TestNextBatchProvider, TraceStorage}, }; use alloc::vec; use alloy_consensus::Header; @@ -500,7 +505,7 @@ mod tests { #[test] fn test_pop_next_batch() { let cfg = Arc::new(RollupConfig::default()); - let mock = TestBatchQueueProvider::new(vec![]); + let mock = TestNextBatchProvider::new(vec![]); let fetcher = TestL2ChainProvider::default(); let mut bq = BatchQueue::new(cfg, mock, fetcher); let parent = L2BlockInfo::default(); @@ -514,7 +519,7 @@ mod tests { #[tokio::test] async fn test_batch_queue_reset() { let cfg = Arc::new(RollupConfig::default()); - let mock = TestBatchQueueProvider::new(vec![]); + let mock = TestNextBatchProvider::new(vec![]); let fetcher = TestL2ChainProvider::default(); let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher); bq.l1_blocks.push(BlockInfo::default()); @@ -535,7 +540,7 @@ mod tests { #[tokio::test] async fn test_batch_queue_flush() { let cfg = Arc::new(RollupConfig::default()); - let mock = TestBatchQueueProvider::new(vec![]); + let mock = TestNextBatchProvider::new(vec![]); let fetcher = TestL2ChainProvider::default(); let mut bq = BatchQueue::new(cfg.clone(), mock, fetcher); bq.l1_blocks.push(BlockInfo::default()); @@ -574,7 +579,7 @@ mod tests { // Setup batch queue deps let batch_vec = vec![PipelineResult::Ok(Batch::Single(batch.clone()))]; - let mut mock = TestBatchQueueProvider::new(batch_vec); + let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); @@ -605,7 +610,7 @@ mod tests { // Setup batch queue deps let batch_vec = vec![PipelineResult::Ok(Batch::Single(batch.clone()))]; - let mut mock = TestBatchQueueProvider::new(batch_vec); + let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); @@ -639,7 +644,7 @@ mod tests { // Setup batch queue deps let batch_vec = vec![PipelineResult::Ok(Batch::Single(batch.clone()))]; - let mut mock = TestBatchQueueProvider::new(batch_vec); + let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); @@ -674,7 +679,7 @@ mod tests { // Setup batch queue deps let batch_vec = vec![PipelineResult::Ok(Batch::Single(batch.clone()))]; - let mut mock = TestBatchQueueProvider::new(batch_vec); + let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); @@ -693,7 +698,7 @@ mod tests { async fn test_derive_next_batch_missing_origin() { let data = vec![Ok(Batch::Single(SingleBatch::default()))]; let cfg = Arc::new(RollupConfig::default()); - let mock = TestBatchQueueProvider::new(data); + let mock = TestNextBatchProvider::new(data); let fetcher = TestL2ChainProvider::default(); let mut bq = BatchQueue::new(cfg, mock, fetcher); let parent = L2BlockInfo::default(); @@ -709,7 +714,7 @@ mod tests { while let Some(batch) = reader.next_batch(cfg.as_ref()) { batch_vec.push(Ok(batch)); } - let mut mock = TestBatchQueueProvider::new(batch_vec); + let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); let mut bq = BatchQueue::new(cfg, mock, fetcher); @@ -730,7 +735,7 @@ mod tests { while let Some(batch) = reader.next_batch(cfg.as_ref()) { batch_vec.push(Ok(batch)); } - let mut mock = TestBatchQueueProvider::new(batch_vec); + let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); let mut bq = BatchQueue::new(cfg, mock, fetcher); @@ -754,7 +759,7 @@ mod tests { while let Some(batch) = reader.next_batch(cfg.as_ref()) { batch_vec.push(Ok(batch)); } - let mut mock = TestBatchQueueProvider::new(batch_vec); + let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); let mut bq = BatchQueue::new(cfg, mock, fetcher); @@ -778,7 +783,7 @@ mod tests { while let Some(batch) = reader.next_batch(cfg.as_ref()) { batch_vec.push(Ok(batch)); } - let mut mock = TestBatchQueueProvider::new(batch_vec); + let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); let mut bq = BatchQueue::new(cfg, mock, fetcher); @@ -811,7 +816,7 @@ mod tests { // Setup batch queue deps let batch_vec = vec![PipelineResult::Ok(Batch::Single(batch.clone()))]; - let mut mock = TestBatchQueueProvider::new(batch_vec); + let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); @@ -852,7 +857,7 @@ mod tests { // Setup batch queue deps let batch_vec = vec![PipelineResult::Ok(Batch::Single(batch.clone()))]; - let mut mock = TestBatchQueueProvider::new(batch_vec); + let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); @@ -891,7 +896,7 @@ mod tests { while let Some(batch) = reader.next_batch(cfg.as_ref()) { batch_vec.push(Ok(batch)); } - let mut mock = TestBatchQueueProvider::new(batch_vec); + let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); let mut bq = BatchQueue::new(cfg, mock, fetcher); @@ -910,7 +915,7 @@ mod tests { while let Some(batch) = reader.next_batch(cfg.as_ref()) { batch_vec.push(Ok(batch)); } - let mut mock = TestBatchQueueProvider::new(batch_vec); + let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); let mut bq = BatchQueue::new(cfg, mock, fetcher); @@ -926,7 +931,7 @@ mod tests { let mut reader = new_batch_reader(); let cfg = Arc::new(RollupConfig::default()); let batch = reader.next_batch(cfg.as_ref()).unwrap(); - let mock = TestBatchQueueProvider::new(vec![Ok(batch)]); + let mock = TestNextBatchProvider::new(vec![Ok(batch)]); let fetcher = TestL2ChainProvider::default(); let mut bq = BatchQueue::new(cfg, mock, fetcher); let res = bq.next_batch(L2BlockInfo::default()).await.unwrap_err(); @@ -942,7 +947,7 @@ mod tests { while let Some(batch) = reader.next_batch(cfg.as_ref()) { batch_vec.push(Ok(batch)); } - let mut mock = TestBatchQueueProvider::new(batch_vec); + let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo::default()); let fetcher = TestL2ChainProvider::default(); let mut bq = BatchQueue::new(cfg, mock, fetcher); @@ -1012,7 +1017,7 @@ mod tests { tx.encode(&mut buf); let prefixed = [&[OpTxType::Deposit as u8], &buf[..]].concat(); second_batch_txs.insert(0, Bytes::copy_from_slice(&prefixed)); - let mut mock = TestBatchQueueProvider::new(batch_vec); + let mut mock = TestNextBatchProvider::new(batch_vec); let origin_check = b256!("8527cdb6f601acf9b483817abd1da92790c92b19000000000000000000000000"); mock.origin = Some(BlockInfo { @@ -1099,7 +1104,7 @@ mod tests { async fn test_batch_queue_empty_bytes() { let data = vec![Ok(Batch::Single(SingleBatch::default()))]; let cfg = Arc::new(RollupConfig::default()); - let mock = TestBatchQueueProvider::new(data); + let mock = TestNextBatchProvider::new(data); let fetcher = TestL2ChainProvider::default(); let mut bq = BatchQueue::new(cfg, mock, fetcher); let parent = L2BlockInfo::default(); diff --git a/crates/derive/src/stages/batch/batch_validator.rs b/crates/derive/src/stages/batch/batch_validator.rs index 278a80170..1c4c3447f 100644 --- a/crates/derive/src/stages/batch/batch_validator.rs +++ b/crates/derive/src/stages/batch/batch_validator.rs @@ -49,6 +49,11 @@ where Self { cfg, prev, origin: None, l1_blocks: Vec::new() } } + /// Consumes [Self] and returns the previous stage. + pub fn into_prev(self) -> P { + self.prev + } + /// Returns `true` if the pipeline origin is behind the parent origin. /// /// ## Takes @@ -332,7 +337,7 @@ mod test { #[tokio::test] async fn test_batch_validator_origin_behind_eof() { let cfg = Arc::new(RollupConfig::default()); - let mut mock = TestBatchQueueProvider::new(vec![]); + let mut mock = TestNextBatchProvider::new(vec![]); mock.origin = Some(BlockInfo::default()); let mut bv = BatchValidator::new(cfg, mock); bv.origin = Some(BlockInfo { number: 1, ..Default::default() }); @@ -347,7 +352,7 @@ mod test { #[tokio::test] async fn test_batch_validator_origin_behind_startup() { let cfg = Arc::new(RollupConfig::default()); - let mut mock = TestBatchQueueProvider::new(vec![]); + let mut mock = TestNextBatchProvider::new(vec![]); mock.origin = Some(BlockInfo::default()); let mut bv = BatchValidator::new(cfg, mock); @@ -376,7 +381,7 @@ mod test { #[tokio::test] async fn test_batch_validator_origin_behind_advance() { let cfg = Arc::new(RollupConfig::default()); - let mut mock = TestBatchQueueProvider::new(vec![]); + let mut mock = TestNextBatchProvider::new(vec![]); mock.origin = Some(BlockInfo { number: 2, ..Default::default() }); let mut bv = BatchValidator::new(cfg, mock); @@ -405,7 +410,7 @@ mod test { #[tokio::test] async fn test_batch_validator_advance_epoch() { let cfg = Arc::new(RollupConfig::default()); - let mut mock = TestBatchQueueProvider::new(vec![]); + let mut mock = TestNextBatchProvider::new(vec![]); mock.origin = Some(BlockInfo { number: 2, ..Default::default() }); let mut bv = BatchValidator::new(cfg, mock); @@ -436,7 +441,7 @@ mod test { #[tokio::test] async fn test_batch_validator_origin_behind_drain_prev() { let cfg = Arc::new(RollupConfig::default()); - let mut mock = TestBatchQueueProvider::new( + let mut mock = TestNextBatchProvider::new( (0..5).map(|_| Ok(Batch::Single(SingleBatch::default()))).collect(), ); mock.origin = Some(BlockInfo::default()); @@ -461,7 +466,7 @@ mod test { #[tokio::test] async fn test_batch_validator_l1_origin_mismatch() { let cfg = Arc::new(RollupConfig::default()); - let mut mock = TestBatchQueueProvider::new(vec![Ok(Batch::Single(SingleBatch::default()))]); + let mut mock = TestNextBatchProvider::new(vec![Ok(Batch::Single(SingleBatch::default()))]); mock.origin = Some(BlockInfo { number: 1, ..Default::default() }); let mut bv = BatchValidator::new(cfg, mock); bv.origin = Some(BlockInfo::default()); @@ -481,7 +486,7 @@ mod test { #[tokio::test] async fn test_batch_validator_received_span_batch() { let cfg = Arc::new(RollupConfig::default()); - let mut mock = TestBatchQueueProvider::new(vec![Ok(Batch::Span(SpanBatch::default()))]); + let mut mock = TestNextBatchProvider::new(vec![Ok(Batch::Span(SpanBatch::default()))]); mock.origin = Some(BlockInfo { number: 1, ..Default::default() }); let mut bv = BatchValidator::new(cfg, mock); bv.origin = Some(BlockInfo::default()); @@ -523,7 +528,7 @@ mod test { // Setup batch validator deps let batch_vec = vec![PipelineResult::Ok(Batch::Single(batch.clone()))]; - let mut mock = TestBatchQueueProvider::new(batch_vec); + let mut mock = TestNextBatchProvider::new(batch_vec); mock.origin = Some(BlockInfo { number: 1, ..Default::default() }); // Configure batch validator @@ -550,7 +555,7 @@ mod test { tracing_subscriber::Registry::default().with(layer).init(); let cfg = Arc::new(RollupConfig { seq_window_size: 5, ..Default::default() }); - let mut mock = TestBatchQueueProvider::new(vec![]); + let mut mock = TestNextBatchProvider::new(vec![]); mock.origin = Some(BlockInfo { number: 1, ..Default::default() }); let mut bv = BatchValidator::new(cfg, mock); @@ -588,7 +593,7 @@ mod test { tracing_subscriber::Registry::default().with(layer).init(); let cfg = Arc::new(RollupConfig { seq_window_size: 5, ..Default::default() }); - let mut mock = TestBatchQueueProvider::new(vec![]); + let mut mock = TestNextBatchProvider::new(vec![]); mock.origin = Some(BlockInfo { number: 1, ..Default::default() }); let mut bv = BatchValidator::new(cfg, mock); diff --git a/crates/derive/src/stages/batch/mod.rs b/crates/derive/src/stages/batch/mod.rs index f6e7a01ba..6bf52b4d8 100644 --- a/crates/derive/src/stages/batch/mod.rs +++ b/crates/derive/src/stages/batch/mod.rs @@ -1,9 +1,9 @@ //! Contains stages pertaining to the processing of [Batch]es. //! -//! Sitting after the [ChannelReader] stage, the [BatchStream] and [BatchQueue] stages are +//! Sitting after the [ChannelReader] stage, the [BatchStream] and [BatchProvider] stages are //! responsible for validating and ordering the [Batch]es. The [BatchStream] stage is responsible -//! for streaming [SingleBatch]es from [SpanBatch]es, while the [BatchQueue] stage is responsible -//! for ordering the [Batch]es for the [AttributesQueue] stage. +//! for streaming [SingleBatch]es from [SpanBatch]es, while the [BatchProvider] stage is responsible +//! for ordering and validating the [Batch]es for the [AttributesQueue] stage. //! //! [Batch]: crate::batch::Batch //! [SingleBatch]: crate::batch::SingleBatch @@ -25,6 +25,9 @@ pub use batch_queue::BatchQueue; mod batch_validator; pub use batch_validator::BatchValidator; +mod batch_provider; +pub use batch_provider::BatchProvider; + /// Provides [Batch]es for the [BatchQueue] and [BatchValidator] stages. #[async_trait] pub trait NextBatchProvider { diff --git a/crates/derive/src/stages/channel/channel_provider.rs b/crates/derive/src/stages/channel/channel_provider.rs index 6d6ad94b9..49ca1d57f 100644 --- a/crates/derive/src/stages/channel/channel_provider.rs +++ b/crates/derive/src/stages/channel/channel_provider.rs @@ -6,7 +6,7 @@ use alloy_primitives::Bytes; use core::fmt::Debug; multiplexed_stage!( - ChannelProvider, + ChannelProvider stages: { ChannelAssembler => is_holocene_active, } diff --git a/crates/derive/src/stages/mod.rs b/crates/derive/src/stages/mod.rs index 9ec070202..b5bceb949 100644 --- a/crates/derive/src/stages/mod.rs +++ b/crates/derive/src/stages/mod.rs @@ -31,7 +31,9 @@ pub use channel::{ }; mod batch; -pub use batch::{BatchQueue, BatchStream, BatchStreamProvider, BatchValidator, NextBatchProvider}; +pub use batch::{ + BatchProvider, BatchQueue, BatchStream, BatchStreamProvider, BatchValidator, NextBatchProvider, +}; mod attributes_queue; pub use attributes_queue::AttributesQueue; diff --git a/crates/derive/src/stages/multiplexed.rs b/crates/derive/src/stages/multiplexed.rs index 64527a7bf..1d7ab891b 100644 --- a/crates/derive/src/stages/multiplexed.rs +++ b/crates/derive/src/stages/multiplexed.rs @@ -65,17 +65,17 @@ pub enum MultiplexerError { /// [Debug]: core::fmt::Debug macro_rules! multiplexed_stage { ( - $provider_name:ident<$prev_type:ident$(, $provider_generic:ident: $($provider_generic_bound:ident)*)*>, + $provider_name:ident<$prev_type:ident$(, $provider_generic:ident: $($provider_generic_bound:ident)*)*>$(,)? $( additional_fields: { $(#[doc = $comment:expr])? $($field_name:ident: $field_type:ty,)+ } - )? + )?$(,)? stages: { - $($stage_name:ident$(($($input_name:ident$(,)?)+))? => $stage_condition:ident,)* - } - default_stage: $last_stage_name:ident$(($($last_input_name:ident$(,)?)+))? + $($stage_name:ident$(<$($stage_generic:ident$(, )?)+>)?$(($($input_name:ident$(,)?)+))? => $stage_condition:ident,)* + }$(,)? + default_stage: $last_stage_name:ident$(<$($last_stage_generic:ident$(, )?)+>)?$(($($last_input_name:ident$(,)?)+))? ) => { use $crate::{ pipeline::{OriginAdvancer, OriginProvider, SignalReceiver, Signal, PipelineError, PipelineResult}, @@ -88,15 +88,15 @@ macro_rules! multiplexed_stage { #[derive(Debug)] enum ActiveStage where - P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + Debug, + P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + core::fmt::Debug, { - $($stage_name($stage_name

,),)* - $last_stage_name($last_stage_name

), + $($stage_name($stage_name,),)* + $last_stage_name($last_stage_name), } - impl ActiveStage + impl ActiveStage where - P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + Debug, + P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + core::fmt::Debug, { /// Dissolves the active stage and returns the previous stage. pub(crate) fn into_prev(self) -> P { @@ -109,9 +109,9 @@ macro_rules! multiplexed_stage { #[doc = concat!("The ", stringify!($provider_name), " stage is responsible for multiplexing sub-stages.")] #[derive(Debug)] - pub struct $provider_name + pub struct $provider_name where - P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + Debug, + P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + core::fmt::Debug, { /// The rollup configuration. cfg: alloc::sync::Arc, @@ -135,9 +135,9 @@ macro_rules! multiplexed_stage { )? } - impl $provider_name + impl $provider_name where - P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + Debug, + P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + core::fmt::Debug, { /// Creates a new instance of the provider. pub const fn new(cfg: alloc::sync::Arc, prev: P$( $(, $field_name: $field_type)+ )?) -> Self { @@ -204,9 +204,9 @@ macro_rules! multiplexed_stage { } #[async_trait] - impl OriginAdvancer for $provider_name + impl OriginAdvancer for $provider_name where - P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, + P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + Send + core::fmt::Debug, { async fn advance_origin(&mut self) -> PipelineResult<()> { match self.active_stage_mut()? { @@ -216,9 +216,9 @@ macro_rules! multiplexed_stage { } } - impl OriginProvider for $provider_name + impl OriginProvider for $provider_name where - P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + Debug, + P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + core::fmt::Debug, { fn origin(&self) -> Option { match self.active_stage_ref() { @@ -234,9 +234,9 @@ macro_rules! multiplexed_stage { } #[async_trait] - impl SignalReceiver for $provider_name + impl SignalReceiver for $provider_name where - P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, + P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + Send + core::fmt::Debug, { async fn signal( &mut self, diff --git a/crates/derive/src/test_utils/batch_queue.rs b/crates/derive/src/test_utils/batch_provider.rs similarity index 87% rename from crates/derive/src/test_utils/batch_queue.rs rename to crates/derive/src/test_utils/batch_provider.rs index 396222a51..8cbdf3083 100644 --- a/crates/derive/src/test_utils/batch_queue.rs +++ b/crates/derive/src/test_utils/batch_provider.rs @@ -12,7 +12,7 @@ use op_alloy_protocol::{BlockInfo, L2BlockInfo}; /// A mock provider for the [BatchQueue] stage. #[derive(Debug, Default)] -pub struct TestBatchQueueProvider { +pub struct TestNextBatchProvider { /// The origin of the L1 block. pub origin: Option, /// A list of batches to return. @@ -23,21 +23,21 @@ pub struct TestBatchQueueProvider { pub reset: bool, } -impl TestBatchQueueProvider { +impl TestNextBatchProvider { /// Creates a new [MockBatchQueueProvider] with the given origin and batches. pub fn new(batches: Vec>) -> Self { Self { origin: Some(BlockInfo::default()), batches, flushed: false, reset: false } } } -impl OriginProvider for TestBatchQueueProvider { +impl OriginProvider for TestNextBatchProvider { fn origin(&self) -> Option { self.origin } } #[async_trait] -impl NextBatchProvider for TestBatchQueueProvider { +impl NextBatchProvider for TestNextBatchProvider { fn flush(&mut self) { self.flushed = true; } @@ -52,7 +52,7 @@ impl NextBatchProvider for TestBatchQueueProvider { } #[async_trait] -impl OriginAdvancer for TestBatchQueueProvider { +impl OriginAdvancer for TestNextBatchProvider { async fn advance_origin(&mut self) -> PipelineResult<()> { self.origin = self.origin.map(|mut origin| { origin.number += 1; @@ -63,7 +63,7 @@ impl OriginAdvancer for TestBatchQueueProvider { } #[async_trait] -impl SignalReceiver for TestBatchQueueProvider { +impl SignalReceiver for TestNextBatchProvider { async fn signal(&mut self, signal: Signal) -> PipelineResult<()> { match signal { Signal::Reset { .. } => self.reset = true, diff --git a/crates/derive/src/test_utils/channel_bank.rs b/crates/derive/src/test_utils/channel_provider.rs similarity index 100% rename from crates/derive/src/test_utils/channel_bank.rs rename to crates/derive/src/test_utils/channel_provider.rs diff --git a/crates/derive/src/test_utils/mod.rs b/crates/derive/src/test_utils/mod.rs index 0a4e7d2d4..55bb9795f 100644 --- a/crates/derive/src/test_utils/mod.rs +++ b/crates/derive/src/test_utils/mod.rs @@ -2,9 +2,9 @@ mod pipeline; pub use pipeline::{ - new_test_pipeline, TestAttributesQueue, TestBatchQueue, TestBatchStream, TestChannelProvider, - TestChannelReader, TestFrameQueue, TestL1Retrieval, TestL1Traversal, TestNextAttributes, - TestPipeline, + new_test_pipeline, TestAttributesQueue, TestBatchProvider, TestBatchStream, + TestChannelProvider, TestChannelReader, TestFrameQueue, TestL1Retrieval, TestL1Traversal, + TestNextAttributes, TestPipeline, }; mod blob_provider; @@ -16,8 +16,8 @@ pub use chain_providers::{TestChainProvider, TestL2ChainProvider, TestProviderEr mod data_availability_provider; pub use data_availability_provider::{TestDAP, TestIter}; -mod batch_queue; -pub use batch_queue::TestBatchQueueProvider; +mod batch_provider; +pub use batch_provider::TestNextBatchProvider; mod attributes_queue; pub use attributes_queue::{ @@ -27,8 +27,8 @@ pub use attributes_queue::{ mod batch_stream; pub use batch_stream::TestBatchStreamProvider; -mod channel_bank; -pub use channel_bank::TestNextFrameProvider; +mod channel_provider; +pub use channel_provider::TestNextFrameProvider; mod channel_reader; pub use channel_reader::TestChannelReaderProvider; diff --git a/crates/derive/src/test_utils/pipeline.rs b/crates/derive/src/test_utils/pipeline.rs index da1edaafc..8e721a30c 100644 --- a/crates/derive/src/test_utils/pipeline.rs +++ b/crates/derive/src/test_utils/pipeline.rs @@ -1,7 +1,10 @@ //! Test Utilities for the [crate::pipeline::DerivationPipeline] //! as well as its stages and providers. -use crate::test_utils::{TestChainProvider, TestL2ChainProvider}; +use crate::{ + stages::BatchProvider, + test_utils::{TestChainProvider, TestL2ChainProvider}, +}; use alloc::{boxed::Box, sync::Arc}; use op_alloy_genesis::RollupConfig; use op_alloy_protocol::{BlockInfo, L2BlockInfo}; @@ -12,8 +15,8 @@ use crate::{ errors::PipelineError, pipeline::{DerivationPipeline, PipelineBuilder, PipelineResult}, stages::{ - AttributesQueue, BatchQueue, BatchStream, ChannelProvider, ChannelReader, FrameQueue, - L1Retrieval, L1Traversal, + AttributesQueue, BatchStream, ChannelProvider, ChannelReader, FrameQueue, L1Retrieval, + L1Traversal, }, test_utils::{TestAttributesBuilder, TestDAP}, traits::{NextAttributes, OriginAdvancer, OriginProvider, Signal, SignalReceiver}, @@ -77,10 +80,10 @@ pub type TestChannelReader = ChannelReader; pub type TestBatchStream = BatchStream; /// A [BatchQueue] using test providers and sources. -pub type TestBatchQueue = BatchQueue; +pub type TestBatchProvider = BatchProvider; /// An [AttributesQueue] using test providers and sources. -pub type TestAttributesQueue = AttributesQueue; +pub type TestAttributesQueue = AttributesQueue; /// A [DerivationPipeline] using test providers and sources. pub type TestPipeline = DerivationPipeline; From dc52b84292d01d1bf798a37e5885e664b3fb02f3 Mon Sep 17 00:00:00 2001 From: clabby Date: Mon, 21 Oct 2024 16:14:40 -0400 Subject: [PATCH 2/6] tests --- .../derive/src/stages/batch/batch_provider.rs | 84 +++++++++++++++++++ crates/derive/src/stages/batch/batch_queue.rs | 14 ++-- .../src/stages/batch/batch_validator.rs | 8 +- 3 files changed, 95 insertions(+), 11 deletions(-) diff --git a/crates/derive/src/stages/batch/batch_provider.rs b/crates/derive/src/stages/batch/batch_provider.rs index e2a4e398a..c59ef76af 100644 --- a/crates/derive/src/stages/batch/batch_provider.rs +++ b/crates/derive/src/stages/batch/batch_provider.rs @@ -52,8 +52,10 @@ mod test { use crate::{ stages::batch::batch_provider::ActiveStage, test_utils::{TestL2ChainProvider, TestNextBatchProvider}, + traits::{OriginProvider, ResetSignal, SignalReceiver}, }; use op_alloy_genesis::RollupConfig; + use op_alloy_protocol::BlockInfo; use std::sync::Arc; #[test] @@ -77,4 +79,86 @@ mod test { let active_stage = batch_provider.active_stage_mut().unwrap(); assert!(matches!(active_stage, ActiveStage::BatchQueue(_))); } + + #[test] + fn test_batch_provider_transition_stage() { + let provider = TestNextBatchProvider::new(vec![]); + let l2_provider = TestL2ChainProvider::default(); + let cfg = Arc::new(RollupConfig { holocene_time: Some(2), ..Default::default() }); + let mut batch_provider = BatchProvider::new(cfg, provider, l2_provider); + + let active_stage = batch_provider.active_stage_mut().unwrap(); + + // Update the L1 origin to Holocene activation. + let ActiveStage::BatchQueue(stage) = active_stage else { + panic!("Expected BatchQueue"); + }; + stage.prev.origin = Some(BlockInfo { number: 1, timestamp: 2, ..Default::default() }); + + // Transition to the BatchValidator stage. + let active_stage = batch_provider.active_stage_mut().unwrap(); + assert!(matches!(active_stage, ActiveStage::BatchValidator(_))); + + assert_eq!(batch_provider.origin().unwrap().number, 1); + } + + #[test] + fn test_batch_provider_transition_stage_backwards() { + let provider = TestNextBatchProvider::new(vec![]); + let l2_provider = TestL2ChainProvider::default(); + let cfg = Arc::new(RollupConfig { holocene_time: Some(2), ..Default::default() }); + let mut batch_provider = BatchProvider::new(cfg, provider, l2_provider); + + let active_stage = batch_provider.active_stage_mut().unwrap(); + + // Update the L1 origin to Holocene activation. + let ActiveStage::BatchQueue(stage) = active_stage else { + panic!("Expected BatchQueue"); + }; + stage.prev.origin = Some(BlockInfo { number: 1, timestamp: 2, ..Default::default() }); + + // Transition to the BatchValidator stage. + let active_stage = batch_provider.active_stage_mut().unwrap(); + let ActiveStage::BatchValidator(stage) = active_stage else { + panic!("Expected ChannelBank"); + }; + + // Update the L1 origin to before Holocene activation, to simulate a re-org. + stage.prev.origin = Some(BlockInfo::default()); + + let active_stage = batch_provider.active_stage_mut().unwrap(); + assert!(matches!(active_stage, ActiveStage::BatchQueue(_))); + } + + #[tokio::test] + async fn test_batch_provider_reset_bq() { + let provider = TestNextBatchProvider::new(vec![]); + let l2_provider = TestL2ChainProvider::default(); + let cfg = Arc::new(RollupConfig::default()); + let mut batch_provider = BatchProvider::new(cfg, provider, l2_provider); + + // Reset the batch provider. + batch_provider.signal(ResetSignal::default().signal()).await.unwrap(); + + let Ok(ActiveStage::BatchQueue(batch_queue)) = batch_provider.active_stage_mut() else { + panic!("Expected "); + }; + assert!(batch_queue.l1_blocks.len() == 1); + } + + #[tokio::test] + async fn test_batch_provider_reset_validator() { + let provider = TestNextBatchProvider::new(vec![]); + let l2_provider = TestL2ChainProvider::default(); + let cfg = Arc::new(RollupConfig { holocene_time: Some(0), ..Default::default() }); + let mut batch_provider = BatchProvider::new(cfg, provider, l2_provider); + + // Reset the batch provider. + batch_provider.signal(ResetSignal::default().signal()).await.unwrap(); + + let Ok(ActiveStage::BatchValidator(validator)) = batch_provider.active_stage_mut() else { + panic!("Expected BatchValidator"); + }; + assert!(validator.l1_blocks.len() == 1); + } } diff --git a/crates/derive/src/stages/batch/batch_queue.rs b/crates/derive/src/stages/batch/batch_queue.rs index b82fe50cb..b66e2c93e 100644 --- a/crates/derive/src/stages/batch/batch_queue.rs +++ b/crates/derive/src/stages/batch/batch_queue.rs @@ -37,26 +37,26 @@ where BF: L2ChainProvider + Debug, { /// The rollup config. - cfg: Arc, + pub(crate) cfg: Arc, /// The previous stage of the derivation pipeline. - prev: P, + pub(crate) prev: P, /// The l1 block ref - origin: Option, + pub(crate) origin: Option, /// A consecutive, time-centric window of L1 Blocks. /// Every L1 origin of unsafe L2 Blocks must be included in this list. /// If every L2 Block corresponding to a single L1 Block becomes safe, /// the block is popped from this list. /// If new L2 Block's L1 origin is not included in this list, fetch and /// push it to the list. - l1_blocks: Vec, + pub(crate) l1_blocks: Vec, /// A set of batches in order from when we've seen them. - batches: Vec, + pub(crate) batches: Vec, /// A set of cached [SingleBatch]es derived from [SpanBatch]es. /// /// [SpanBatch]: crate::batch::SpanBatch - next_spans: Vec, + pub(crate) next_spans: Vec, /// Used to validate the batches. - fetcher: BF, + pub(crate) fetcher: BF, } impl BatchQueue diff --git a/crates/derive/src/stages/batch/batch_validator.rs b/crates/derive/src/stages/batch/batch_validator.rs index 1c4c3447f..a8bc38446 100644 --- a/crates/derive/src/stages/batch/batch_validator.rs +++ b/crates/derive/src/stages/batch/batch_validator.rs @@ -26,18 +26,18 @@ where P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, { /// The rollup configuration. - cfg: Arc, + pub(crate) cfg: Arc, /// The previous stage of the derivation pipeline. - prev: P, + pub(crate) prev: P, /// The L1 origin of the batch sequencer. - origin: Option, + pub(crate) origin: Option, /// A consecutive, time-centric window of L1 Blocks. /// Every L1 origin of unsafe L2 Blocks must be included in this list. /// If every L2 Block corresponding to a single L1 Block becomes safe, /// the block is popped from this list. /// If new L2 Block's L1 origin is not included in this list, fetch and /// push it to the list. - l1_blocks: Vec, + pub(crate) l1_blocks: Vec, } impl

BatchValidator

From 8b64e0b14ab86bc736d7256b9b8484699ec0144e Mon Sep 17 00:00:00 2001 From: clabby Date: Mon, 21 Oct 2024 18:58:59 -0400 Subject: [PATCH 3/6] =?UTF-8?q?monolithic=20mux=20macro=20->=20viking=20fu?= =?UTF-8?q?neral=20=E2=9B=B5=F0=9F=94=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crates/derive/src/errors.rs | 8 +- .../derive/src/stages/batch/batch_provider.rs | 230 ++++++++++++---- .../src/stages/channel/channel_provider.rs | 241 +++++++++++++---- crates/derive/src/stages/mod.rs | 4 - crates/derive/src/stages/multiplexed.rs | 254 ------------------ 5 files changed, 369 insertions(+), 368 deletions(-) delete mode 100644 crates/derive/src/stages/multiplexed.rs diff --git a/crates/derive/src/errors.rs b/crates/derive/src/errors.rs index 006285f3a..48219025e 100644 --- a/crates/derive/src/errors.rs +++ b/crates/derive/src/errors.rs @@ -1,9 +1,6 @@ //! This module contains derivation errors thrown within the pipeline. -use crate::{ - batch::{SpanBatchError, MAX_SPAN_BATCH_ELEMENTS}, - stages::MultiplexerError, -}; +use crate::batch::{SpanBatchError, MAX_SPAN_BATCH_ELEMENTS}; use alloc::string::String; use alloy_eips::BlockNumHash; use alloy_primitives::B256; @@ -116,9 +113,6 @@ pub enum PipelineError { /// [PipelineEncodingError] variant. #[error("Decode error: {0}")] BadEncoding(#[from] PipelineEncodingError), - /// A multiplexer stage error. - #[error("Multiplexer error: {0}")] - Multiplexer(#[from] MultiplexerError), /// Provider error variant. #[error("Blob provider error: {0}")] Provider(String), diff --git a/crates/derive/src/stages/batch/batch_provider.rs b/crates/derive/src/stages/batch/batch_provider.rs index c59ef76af..c343a774e 100644 --- a/crates/derive/src/stages/batch/batch_provider.rs +++ b/crates/derive/src/stages/batch/batch_provider.rs @@ -3,23 +3,149 @@ use super::NextBatchProvider; use crate::{ batch::SingleBatch, - stages::{multiplexed::multiplexed_stage, AttributesProvider, BatchQueue, BatchValidator}, - traits::L2ChainProvider, + errors::{PipelineError, PipelineResult}, + stages::{AttributesProvider, BatchQueue, BatchValidator}, + traits::{L2ChainProvider, OriginAdvancer, OriginProvider, Signal, SignalReceiver}, }; +use alloc::{boxed::Box, sync::Arc}; +use async_trait::async_trait; use core::fmt::Debug; -use op_alloy_protocol::L2BlockInfo; - -multiplexed_stage!( - BatchProvider, - additional_fields: { - /// The L2 chain fetcher. - fetcher: F, - }, - stages: { - BatchValidator => is_holocene_active, - }, - default_stage: BatchQueue(fetcher) -); +use op_alloy_genesis::RollupConfig; +use op_alloy_protocol::{BlockInfo, L2BlockInfo}; + +/// The [BatchProvider] stage is a mux between the [BatchQueue] and [BatchValidator] stages. +/// +/// Rules: +/// When Holocene is not active, the [BatchQueue] is used. +/// When Holocene is active, the [BatchValidator] is used. +/// +/// When transitioning between the two stages, the mux will reset the active stage, but +/// retain `l1_blocks`. +#[derive(Debug)] +pub struct BatchProvider +where + P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, + F: L2ChainProvider + Clone + Debug, +{ + /// The rollup configuration. + cfg: Arc, + /// The L2 chain provider. + provider: F, + /// The previous stage of the derivation pipeline. + /// + /// If this is set to [None], the multiplexer has been activated and the active stage + /// owns the previous stage. + /// + /// Must be [None] if `batch_queue` or `batch_validator` is [Some]. + prev: Option

, + /// The batch queue stage of the provider. + /// + /// Must be [None] if `prev` or `batch_validator` is [Some]. + batch_queue: Option>, + /// The channel assembler stage of the provider. + /// + /// Must be [None] if `prev` or `batch_queue` is [Some]. + batch_validator: Option>, +} + +impl BatchProvider +where + P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, + F: L2ChainProvider + Clone + Debug, +{ + /// Creates a new [BatchProvider] with the given configuration and previous stage. + pub const fn new(cfg: Arc, prev: P, provider: F) -> Self { + Self { cfg, provider, prev: Some(prev), batch_queue: None, batch_validator: None } + } + + /// Attempts to update the active stage of the mux. + pub(crate) fn attempt_update(&mut self) -> PipelineResult<()> { + let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?; + if let Some(prev) = self.prev.take() { + // On the first call to `attempt_update`, we need to determine the active stage to + // initialize the mux with. + if self.cfg.is_holocene_active(origin.timestamp) { + self.batch_validator = Some(BatchValidator::new(self.cfg.clone(), prev)); + } else { + self.batch_queue = + Some(BatchQueue::new(self.cfg.clone(), prev, self.provider.clone())); + } + } else if self.batch_queue.is_some() && self.cfg.is_holocene_active(origin.timestamp) { + // If the batch queue is active and Holocene is also active, transition to the batch + // validator. + let batch_queue = self.batch_queue.take().expect("Must have channel bank"); + let mut bv = BatchValidator::new(self.cfg.clone(), batch_queue.prev); + bv.l1_blocks = batch_queue.l1_blocks; + self.batch_validator = Some(bv); + } else if self.batch_validator.is_some() && !self.cfg.is_holocene_active(origin.timestamp) { + // If the batch validator is active, and Holocene is not active, it indicates an L1 + // reorg around Holocene activation. Transition back to the batch queue + // until Holocene re-activates. + let batch_validator = self.batch_validator.take().expect("Must have batch validator"); + let mut bq = + BatchQueue::new(self.cfg.clone(), batch_validator.prev, self.provider.clone()); + bq.l1_blocks = batch_validator.l1_blocks; + self.batch_queue = Some(bq); + } + Ok(()) + } +} + +#[async_trait] +impl OriginAdvancer for BatchProvider +where + P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, + F: L2ChainProvider + Clone + Send + Debug, +{ + async fn advance_origin(&mut self) -> PipelineResult<()> { + self.attempt_update()?; + + if let Some(batch_validator) = self.batch_validator.as_mut() { + batch_validator.advance_origin().await + } else if let Some(batch_queue) = self.batch_queue.as_mut() { + batch_queue.advance_origin().await + } else { + Err(PipelineError::NotEnoughData.temp()) + } + } +} + +impl OriginProvider for BatchProvider +where + P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, + F: L2ChainProvider + Clone + Debug, +{ + fn origin(&self) -> Option { + self.batch_validator.as_ref().map_or_else( + || { + self.batch_queue.as_ref().map_or_else( + || self.prev.as_ref().and_then(|prev| prev.origin()), + |batch_queue| batch_queue.origin(), + ) + }, + |batch_validator| batch_validator.origin(), + ) + } +} + +#[async_trait] +impl SignalReceiver for BatchProvider +where + P: NextBatchProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, + F: L2ChainProvider + Clone + Send + Debug, +{ + async fn signal(&mut self, signal: Signal) -> PipelineResult<()> { + self.attempt_update()?; + + if let Some(batch_validator) = self.batch_validator.as_mut() { + batch_validator.signal(signal).await + } else if let Some(batch_queue) = self.batch_queue.as_mut() { + batch_queue.signal(signal).await + } else { + Err(PipelineError::NotEnoughData.temp()) + } + } +} #[async_trait] impl AttributesProvider for BatchProvider @@ -28,20 +154,21 @@ where F: L2ChainProvider + Clone + Send + Debug, { fn is_last_in_span(&self) -> bool { - let Some(stage) = self.active_stage_ref() else { - return false; - }; - - match stage { - ActiveStage::BatchQueue(stage) => stage.is_last_in_span(), - ActiveStage::BatchValidator(stage) => stage.is_last_in_span(), - } + self.batch_validator.as_ref().map_or_else( + || self.batch_queue.as_ref().map_or(false, |batch_queue| batch_queue.is_last_in_span()), + |batch_validator| batch_validator.is_last_in_span(), + ) } async fn next_batch(&mut self, parent: L2BlockInfo) -> PipelineResult { - match self.active_stage_mut()? { - ActiveStage::BatchQueue(stage) => stage.next_batch(parent).await, - ActiveStage::BatchValidator(stage) => stage.next_batch(parent).await, + self.attempt_update()?; + + if let Some(batch_validator) = self.batch_validator.as_mut() { + batch_validator.next_batch(parent).await + } else if let Some(batch_queue) = self.batch_queue.as_mut() { + batch_queue.next_batch(parent).await + } else { + Err(PipelineError::NotEnoughData.temp()) } } } @@ -50,7 +177,6 @@ where mod test { use super::BatchProvider; use crate::{ - stages::batch::batch_provider::ActiveStage, test_utils::{TestL2ChainProvider, TestNextBatchProvider}, traits::{OriginProvider, ResetSignal, SignalReceiver}, }; @@ -65,8 +191,10 @@ mod test { let cfg = Arc::new(RollupConfig { holocene_time: Some(0), ..Default::default() }); let mut batch_provider = BatchProvider::new(cfg, provider, l2_provider); - let active_stage = batch_provider.active_stage_mut().unwrap(); - assert!(matches!(active_stage, ActiveStage::BatchValidator(_))); + assert!(batch_provider.attempt_update().is_ok()); + assert!(batch_provider.prev.is_none()); + assert!(batch_provider.batch_queue.is_none()); + assert!(batch_provider.batch_validator.is_some()); } #[test] @@ -76,8 +204,10 @@ mod test { let cfg = Arc::new(RollupConfig::default()); let mut batch_provider = BatchProvider::new(cfg, provider, l2_provider); - let active_stage = batch_provider.active_stage_mut().unwrap(); - assert!(matches!(active_stage, ActiveStage::BatchQueue(_))); + assert!(batch_provider.attempt_update().is_ok()); + assert!(batch_provider.prev.is_none()); + assert!(batch_provider.batch_queue.is_some()); + assert!(batch_provider.batch_validator.is_none()); } #[test] @@ -87,17 +217,18 @@ mod test { let cfg = Arc::new(RollupConfig { holocene_time: Some(2), ..Default::default() }); let mut batch_provider = BatchProvider::new(cfg, provider, l2_provider); - let active_stage = batch_provider.active_stage_mut().unwrap(); + batch_provider.attempt_update().unwrap(); // Update the L1 origin to Holocene activation. - let ActiveStage::BatchQueue(stage) = active_stage else { + let Some(ref mut stage) = batch_provider.batch_queue else { panic!("Expected BatchQueue"); }; stage.prev.origin = Some(BlockInfo { number: 1, timestamp: 2, ..Default::default() }); // Transition to the BatchValidator stage. - let active_stage = batch_provider.active_stage_mut().unwrap(); - assert!(matches!(active_stage, ActiveStage::BatchValidator(_))); + batch_provider.attempt_update().unwrap(); + assert!(batch_provider.batch_queue.is_none()); + assert!(batch_provider.batch_validator.is_some()); assert_eq!(batch_provider.origin().unwrap().number, 1); } @@ -109,25 +240,28 @@ mod test { let cfg = Arc::new(RollupConfig { holocene_time: Some(2), ..Default::default() }); let mut batch_provider = BatchProvider::new(cfg, provider, l2_provider); - let active_stage = batch_provider.active_stage_mut().unwrap(); + batch_provider.attempt_update().unwrap(); // Update the L1 origin to Holocene activation. - let ActiveStage::BatchQueue(stage) = active_stage else { + let Some(ref mut stage) = batch_provider.batch_queue else { panic!("Expected BatchQueue"); }; stage.prev.origin = Some(BlockInfo { number: 1, timestamp: 2, ..Default::default() }); // Transition to the BatchValidator stage. - let active_stage = batch_provider.active_stage_mut().unwrap(); - let ActiveStage::BatchValidator(stage) = active_stage else { - panic!("Expected ChannelBank"); - }; + batch_provider.attempt_update().unwrap(); + assert!(batch_provider.batch_queue.is_none()); + assert!(batch_provider.batch_validator.is_some()); // Update the L1 origin to before Holocene activation, to simulate a re-org. + let Some(ref mut stage) = batch_provider.batch_validator else { + panic!("Expected BatchValidator"); + }; stage.prev.origin = Some(BlockInfo::default()); - let active_stage = batch_provider.active_stage_mut().unwrap(); - assert!(matches!(active_stage, ActiveStage::BatchQueue(_))); + batch_provider.attempt_update().unwrap(); + assert!(batch_provider.batch_queue.is_some()); + assert!(batch_provider.batch_validator.is_none()); } #[tokio::test] @@ -140,10 +274,10 @@ mod test { // Reset the batch provider. batch_provider.signal(ResetSignal::default().signal()).await.unwrap(); - let Ok(ActiveStage::BatchQueue(batch_queue)) = batch_provider.active_stage_mut() else { - panic!("Expected "); + let Some(bq) = batch_provider.batch_queue else { + panic!("Expected BatchQueue"); }; - assert!(batch_queue.l1_blocks.len() == 1); + assert!(bq.l1_blocks.len() == 1); } #[tokio::test] @@ -156,9 +290,9 @@ mod test { // Reset the batch provider. batch_provider.signal(ResetSignal::default().signal()).await.unwrap(); - let Ok(ActiveStage::BatchValidator(validator)) = batch_provider.active_stage_mut() else { + let Some(bv) = batch_provider.batch_validator else { panic!("Expected BatchValidator"); }; - assert!(validator.l1_blocks.len() == 1); + assert!(bv.l1_blocks.len() == 1); } } diff --git a/crates/derive/src/stages/channel/channel_provider.rs b/crates/derive/src/stages/channel/channel_provider.rs index 49ca1d57f..75bbcf399 100644 --- a/crates/derive/src/stages/channel/channel_provider.rs +++ b/crates/derive/src/stages/channel/channel_provider.rs @@ -1,17 +1,137 @@ //! This module contains the [ChannelProvider] stage. use super::{ChannelAssembler, ChannelBank, ChannelReaderProvider, NextFrameProvider}; -use crate::stages::multiplexed::multiplexed_stage; +use crate::{ + errors::{PipelineError, PipelineResult}, + traits::{OriginAdvancer, OriginProvider, Signal, SignalReceiver}, +}; +use alloc::{boxed::Box, sync::Arc}; use alloy_primitives::Bytes; +use async_trait::async_trait; use core::fmt::Debug; +use op_alloy_genesis::RollupConfig; +use op_alloy_protocol::BlockInfo; + +/// The [ChannelProvider] stage is a mux between the [ChannelBank] and [ChannelAssembler] stages. +/// +/// Rules: +/// When Holocene is not active, the [ChannelBank] is used. +/// When Holocene is active, the [ChannelAssembler] is used. +#[derive(Debug)] +pub struct ChannelProvider

+where + P: NextFrameProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, +{ + /// The rollup configuration. + cfg: Arc, + /// The previous stage of the derivation pipeline. + /// + /// If this is set to [None], the multiplexer has been activated and the active stage + /// owns the previous stage. + /// + /// Must be [None] if `channel_bank` or `channel_assembler` is [Some]. + prev: Option

, + /// The channel bank stage of the provider. + /// + /// Must be [None] if `prev` or `channel_assembler` is [Some]. + channel_bank: Option>, + /// The channel assembler stage of the provider. + /// + /// Must be [None] if `prev` or `channel_bank` is [Some]. + channel_assembler: Option>, +} -multiplexed_stage!( - ChannelProvider - stages: { - ChannelAssembler => is_holocene_active, +impl

ChannelProvider

+where + P: NextFrameProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, +{ + /// Creates a new [ChannelProvider] with the given configuration and previous stage. + pub const fn new(cfg: Arc, prev: P) -> Self { + Self { cfg, prev: Some(prev), channel_bank: None, channel_assembler: None } } - default_stage: ChannelBank -); + + /// Attempts to update the active stage of the mux. + pub(crate) fn attempt_update(&mut self) -> PipelineResult<()> { + let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?; + if let Some(prev) = self.prev.take() { + // On the first call to `attempt_update`, we need to determine the active stage to + // initialize the mux with. + if self.cfg.is_holocene_active(origin.timestamp) { + self.channel_assembler = Some(ChannelAssembler::new(self.cfg.clone(), prev)); + } else { + self.channel_bank = Some(ChannelBank::new(self.cfg.clone(), prev)); + } + } else if self.channel_bank.is_some() && self.cfg.is_holocene_active(origin.timestamp) { + // If the channel bank is active and Holocene is also active, transition to the channel + // assembler. + let channel_bank = self.channel_bank.take().expect("Must have channel bank"); + self.channel_assembler = + Some(ChannelAssembler::new(self.cfg.clone(), channel_bank.prev)); + } else if self.channel_assembler.is_some() && !self.cfg.is_holocene_active(origin.timestamp) + { + // If the channel assembler is active, and Holocene is not active, it indicates an L1 + // reorg around Holocene activation. Transition back to the channel bank + // until Holocene re-activates. + let channel_assembler = + self.channel_assembler.take().expect("Must have channel assembler"); + self.channel_bank = Some(ChannelBank::new(self.cfg.clone(), channel_assembler.prev)); + } + Ok(()) + } +} + +#[async_trait] +impl

OriginAdvancer for ChannelProvider

+where + P: NextFrameProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, +{ + async fn advance_origin(&mut self) -> PipelineResult<()> { + self.attempt_update()?; + + if let Some(channel_assembler) = self.channel_assembler.as_mut() { + channel_assembler.advance_origin().await + } else if let Some(channel_bank) = self.channel_bank.as_mut() { + channel_bank.advance_origin().await + } else { + Err(PipelineError::NotEnoughData.temp()) + } + } +} + +impl

OriginProvider for ChannelProvider

+where + P: NextFrameProvider + OriginAdvancer + OriginProvider + SignalReceiver + Debug, +{ + fn origin(&self) -> Option { + self.channel_assembler.as_ref().map_or_else( + || { + self.channel_bank.as_ref().map_or_else( + || self.prev.as_ref().and_then(|prev| prev.origin()), + |channel_bank| channel_bank.origin(), + ) + }, + |channel_assembler| channel_assembler.origin(), + ) + } +} + +#[async_trait] +impl

SignalReceiver for ChannelProvider

+where + P: NextFrameProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, +{ + async fn signal(&mut self, signal: Signal) -> PipelineResult<()> { + self.attempt_update()?; + + if let Some(channel_assembler) = self.channel_assembler.as_mut() { + channel_assembler.signal(signal).await + } else if let Some(channel_bank) = self.channel_bank.as_mut() { + channel_bank.signal(signal).await + } else { + Err(PipelineError::NotEnoughData.temp()) + } + } +} #[async_trait] impl

ChannelReaderProvider for ChannelProvider

@@ -19,16 +139,21 @@ where P: NextFrameProvider + OriginAdvancer + OriginProvider + SignalReceiver + Send + Debug, { async fn next_data(&mut self) -> PipelineResult> { - match self.active_stage_mut()? { - ActiveStage::ChannelAssembler(stage) => stage.next_data().await, - ActiveStage::ChannelBank(stage) => stage.next_data().await, + self.attempt_update()?; + + if let Some(channel_assembler) = self.channel_assembler.as_mut() { + channel_assembler.next_data().await + } else if let Some(channel_bank) = self.channel_bank.as_mut() { + channel_bank.next_data().await + } else { + Err(PipelineError::NotEnoughData.temp()) } } } #[cfg(test)] mod test { - use super::{ActiveStage, ChannelProvider}; + use super::ChannelProvider; use crate::{ prelude::{OriginProvider, PipelineError}, stages::ChannelReaderProvider, @@ -45,8 +170,10 @@ mod test { let cfg = Arc::new(RollupConfig { holocene_time: Some(0), ..Default::default() }); let mut channel_provider = ChannelProvider::new(cfg, provider); - let active_stage = channel_provider.active_stage_mut().unwrap(); - assert!(matches!(active_stage, ActiveStage::ChannelAssembler(_))); + assert!(channel_provider.attempt_update().is_ok()); + assert!(channel_provider.prev.is_none()); + assert!(channel_provider.channel_bank.is_none()); + assert!(channel_provider.channel_assembler.is_some()); } #[test] @@ -55,8 +182,10 @@ mod test { let cfg = Arc::new(RollupConfig::default()); let mut channel_provider = ChannelProvider::new(cfg, provider); - let active_stage = channel_provider.active_stage_mut().unwrap(); - assert!(matches!(active_stage, ActiveStage::ChannelBank(_))); + assert!(channel_provider.attempt_update().is_ok()); + assert!(channel_provider.prev.is_none()); + assert!(channel_provider.channel_bank.is_some()); + assert!(channel_provider.channel_assembler.is_none()); } #[test] @@ -66,19 +195,20 @@ mod test { let mut channel_provider = ChannelProvider::new(cfg, provider); // Assert the multiplexer hasn't been initialized. - assert!(channel_provider.active_stage.is_none()); + assert!(channel_provider.channel_bank.is_none()); + assert!(channel_provider.channel_assembler.is_none()); assert!(channel_provider.prev.is_some()); // Load in the active stage. - assert!(matches!( - channel_provider.active_stage_mut().unwrap(), - ActiveStage::ChannelBank(_) - )); + channel_provider.attempt_update().unwrap(); + assert!(channel_provider.channel_bank.is_some()); + assert!(channel_provider.channel_assembler.is_none()); + assert!(channel_provider.prev.is_none()); // Ensure the active stage is retained on the second call. - assert!(matches!( - channel_provider.active_stage_mut().unwrap(), - ActiveStage::ChannelBank(_) - )); + channel_provider.attempt_update().unwrap(); + assert!(channel_provider.channel_bank.is_some()); + assert!(channel_provider.channel_assembler.is_none()); + assert!(channel_provider.prev.is_none()); } #[test] @@ -88,19 +218,20 @@ mod test { let mut channel_provider = ChannelProvider::new(cfg, provider); // Assert the multiplexer hasn't been initialized. - assert!(channel_provider.active_stage.is_none()); + assert!(channel_provider.channel_bank.is_none()); + assert!(channel_provider.channel_assembler.is_none()); assert!(channel_provider.prev.is_some()); // Load in the active stage. - assert!(matches!( - channel_provider.active_stage_mut().unwrap(), - ActiveStage::ChannelAssembler(_) - )); + channel_provider.attempt_update().unwrap(); + assert!(channel_provider.channel_bank.is_none()); + assert!(channel_provider.channel_assembler.is_some()); + assert!(channel_provider.prev.is_none()); // Ensure the active stage is retained on the second call. - assert!(matches!( - channel_provider.active_stage_mut().unwrap(), - ActiveStage::ChannelAssembler(_) - )); + channel_provider.attempt_update().unwrap(); + assert!(channel_provider.channel_bank.is_none()); + assert!(channel_provider.channel_assembler.is_some()); + assert!(channel_provider.prev.is_none()); } #[test] @@ -109,17 +240,18 @@ mod test { let cfg = Arc::new(RollupConfig { holocene_time: Some(2), ..Default::default() }); let mut channel_provider = ChannelProvider::new(cfg, provider); - let active_stage = channel_provider.active_stage_mut().unwrap(); + channel_provider.attempt_update().unwrap(); // Update the L1 origin to Holocene activation. - let ActiveStage::ChannelBank(stage) = active_stage else { + let Some(ref mut stage) = channel_provider.channel_bank else { panic!("Expected ChannelBank"); }; stage.prev.block_info = Some(BlockInfo { number: 1, timestamp: 2, ..Default::default() }); // Transition to the ChannelAssembler stage. - let active_stage = channel_provider.active_stage_mut().unwrap(); - assert!(matches!(active_stage, ActiveStage::ChannelAssembler(_))); + channel_provider.attempt_update().unwrap(); + assert!(channel_provider.channel_bank.is_none()); + assert!(channel_provider.channel_assembler.is_some()); assert_eq!(channel_provider.origin().unwrap().number, 1); } @@ -130,25 +262,28 @@ mod test { let cfg = Arc::new(RollupConfig { holocene_time: Some(2), ..Default::default() }); let mut channel_provider = ChannelProvider::new(cfg, provider); - let active_stage = channel_provider.active_stage_mut().unwrap(); + channel_provider.attempt_update().unwrap(); // Update the L1 origin to Holocene activation. - let ActiveStage::ChannelBank(stage) = active_stage else { + let Some(ref mut stage) = channel_provider.channel_bank else { panic!("Expected ChannelBank"); }; stage.prev.block_info = Some(BlockInfo { number: 1, timestamp: 2, ..Default::default() }); // Transition to the ChannelAssembler stage. - let active_stage = channel_provider.active_stage_mut().unwrap(); - let ActiveStage::ChannelAssembler(stage) = active_stage else { - panic!("Expected ChannelBank"); - }; + channel_provider.attempt_update().unwrap(); + assert!(channel_provider.channel_bank.is_none()); + assert!(channel_provider.channel_assembler.is_some()); // Update the L1 origin to before Holocene activation, to simulate a re-org. + let Some(ref mut stage) = channel_provider.channel_assembler else { + panic!("Expected ChannelAssembler"); + }; stage.prev.block_info = Some(BlockInfo::default()); - let active_stage = channel_provider.active_stage_mut().unwrap(); - assert!(matches!(active_stage, ActiveStage::ChannelBank(_))); + channel_provider.attempt_update().unwrap(); + assert!(channel_provider.channel_bank.is_some()); + assert!(channel_provider.channel_assembler.is_none()); } #[tokio::test] @@ -166,7 +301,7 @@ mod test { channel_provider.next_data().await.unwrap_err(), PipelineError::NotEnoughData.temp() ); - let Ok(ActiveStage::ChannelBank(channel_bank)) = channel_provider.active_stage_mut() else { + let Some(channel_bank) = channel_provider.channel_bank.as_mut() else { panic!("Expected ChannelBank"); }; // Ensure a channel is in the queue. @@ -176,7 +311,7 @@ mod test { channel_provider.signal(ResetSignal::default().signal()).await.unwrap(); // Ensure the channel queue is empty after reset. - let Ok(ActiveStage::ChannelBank(channel_bank)) = channel_provider.active_stage_mut() else { + let Some(channel_bank) = channel_provider.channel_bank.as_mut() else { panic!("Expected ChannelBank"); }; assert!(channel_bank.channel_queue.is_empty()); @@ -197,10 +332,8 @@ mod test { channel_provider.next_data().await.unwrap_err(), PipelineError::NotEnoughData.temp() ); - let Ok(ActiveStage::ChannelAssembler(channel_assembler)) = - channel_provider.active_stage_mut() - else { - panic!("Expected ChannelBank"); + let Some(channel_assembler) = channel_provider.channel_assembler.as_mut() else { + panic!("Expected ChannelAssembler"); }; // Ensure a channel is being built. assert!(channel_assembler.channel.is_some()); @@ -209,10 +342,8 @@ mod test { channel_provider.signal(ResetSignal::default().signal()).await.unwrap(); // Ensure the channel assembler is empty after reset. - let Ok(ActiveStage::ChannelAssembler(channel_assembler)) = - channel_provider.active_stage_mut() - else { - panic!("Expected ChannelBank"); + let Some(channel_assembler) = channel_provider.channel_assembler.as_mut() else { + panic!("Expected ChannelAssembler"); }; assert!(channel_assembler.channel.is_none()); } diff --git a/crates/derive/src/stages/mod.rs b/crates/derive/src/stages/mod.rs index b5bceb949..cec0b6341 100644 --- a/crates/derive/src/stages/mod.rs +++ b/crates/derive/src/stages/mod.rs @@ -38,9 +38,5 @@ pub use batch::{ mod attributes_queue; pub use attributes_queue::AttributesQueue; -#[macro_use] -mod multiplexed; -pub use multiplexed::MultiplexerError; - mod utils; pub use utils::decompress_brotli; diff --git a/crates/derive/src/stages/multiplexed.rs b/crates/derive/src/stages/multiplexed.rs deleted file mode 100644 index 1d7ab891b..000000000 --- a/crates/derive/src/stages/multiplexed.rs +++ /dev/null @@ -1,254 +0,0 @@ -//! Contains the [multiplexed_stage] macro. - -use thiserror::Error; - -/// An error type for the multiplexer stages. -#[derive(Error, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub enum MultiplexerError { - /// Thrown when the multiplexer has not yet been activated, but a sub-stage is being accessed. - #[error("The multiplexer has not been activated.")] - NotActivated, -} - -/// The [multiplexed_stage] macro generates a new stage that swaps its underlying functionality -/// depending on the active hardfork. -/// -/// By default, the stage struct generated by this macro: -/// - Implements [OriginAdvancer], [OriginProvider], and [SignalReceiver]. -/// - Contains an enum that represents the active stage, in the `stages` key. -/// - Activates stages based on the conditions provided in the `stages` key. -/// -/// When a new fork with a stage definition activates, relative to the pipeline origin, the active -/// stage is dissolved and the ownership of the previous stage is transferred to the new stage. -/// -/// Stage requirements: -/// - The previous stage must implement [OriginAdvancer], [OriginProvider], [SignalReceiver], and -/// [Debug]. -/// - The stages must implement an `into_prev` method that returns the owned previous stage. -/// -/// ## Example Usage -/// ```rust,ignore -/// multiplexed_stage!( -/// MyStage, -/// stages: { -/// EcotoneStage => is_ecotone_active, -/// } -/// default_stage: BedrockStage -/// ); -/// ``` -/// -/// To add additional fields to the multiplexer stage, that must be passed to the `new` function of -/// the multiplexer and sub-stages: -/// ```rust,ignore -/// multiplexed_stage!( -/// MyStage, -/// additional_fields: { -/// /// The number of blocks to wait before advancing the origin. -/// block_wait: u64, -/// } -/// stages: { -/// EcotoneStage(block_wait) => is_ecotone_active, -/// } -/// default_stage: BedrockStage -/// ); -/// -/// // -- snip -- -/// -/// let cfg = Arc::new(RollupConfig::default()); -/// let prev = MyPrevStage::default(); -/// MyStage::new(cfg.clone(), prev, 10); -/// ``` -/// -/// [OriginAdvancer]: crate::pipeline::OriginAdvancer -/// [OriginProvider]: crate::pipeline::OriginProvider -/// [SignalReceiver]: crate::pipeline::SignalReceiver -/// [Debug]: core::fmt::Debug -macro_rules! multiplexed_stage { - ( - $provider_name:ident<$prev_type:ident$(, $provider_generic:ident: $($provider_generic_bound:ident)*)*>$(,)? - $( - additional_fields: { - $(#[doc = $comment:expr])? - $($field_name:ident: $field_type:ty,)+ - } - )?$(,)? - stages: { - $($stage_name:ident$(<$($stage_generic:ident$(, )?)+>)?$(($($input_name:ident$(,)?)+))? => $stage_condition:ident,)* - }$(,)? - default_stage: $last_stage_name:ident$(<$($last_stage_generic:ident$(, )?)+>)?$(($($last_input_name:ident$(,)?)+))? - ) => { - use $crate::{ - pipeline::{OriginAdvancer, OriginProvider, SignalReceiver, Signal, PipelineError, PipelineResult}, - stages::MultiplexerError - }; - use async_trait::async_trait; - use alloc::boxed::Box; - - #[doc = concat!("The active stage of the ", stringify!($provider_name), ".")] - #[derive(Debug)] - enum ActiveStage - where - P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + core::fmt::Debug, - { - $($stage_name($stage_name,),)* - $last_stage_name($last_stage_name), - } - - impl ActiveStage - where - P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + core::fmt::Debug, - { - /// Dissolves the active stage and returns the previous stage. - pub(crate) fn into_prev(self) -> P { - match self { - $(ActiveStage::$stage_name(stage) => stage.into_prev(),)* - ActiveStage::$last_stage_name(stage) => stage.into_prev(), - } - } - } - - #[doc = concat!("The ", stringify!($provider_name), " stage is responsible for multiplexing sub-stages.")] - #[derive(Debug)] - pub struct $provider_name - where - P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + core::fmt::Debug, - { - /// The rollup configuration. - cfg: alloc::sync::Arc, - /// The previous stage of the derivation pipeline. - /// - /// If this is set to [None], the multiplexer has been activated and the active stage - /// owns the previous stage. - /// - /// Must be [None] if `active_stage` is [Some]. - prev: Option

, - /// The active stage of the provider. - /// - /// If this is set to [None], the multiplexer has not been activated and the previous - /// stage is owned by the multiplexer. - /// - /// Must be [None] if `prev` is [Some]. - active_stage: Option>, - $( - $(#[doc = $comment])? - $($field_name: $field_type,)+ - )? - } - - impl $provider_name - where - P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + core::fmt::Debug, - { - /// Creates a new instance of the provider. - pub const fn new(cfg: alloc::sync::Arc, prev: P$( $(, $field_name: $field_type)+ )?) -> Self { - Self { - cfg, - prev: Some(prev), - active_stage: None, - $( - $($field_name,)+ - )? - } - } - - #[doc = concat!("Returns a mutable ref to the active stage of the ", stringify!($provider_name), ".")] - const fn active_stage_ref(&self) -> Option<&ActiveStage> { - self.active_stage.as_ref() - } - - #[doc = concat!("Returns a mutable ref to the active stage of the ", stringify!($provider_name), ".")] - fn active_stage_mut(&mut self) -> PipelineResult<&mut ActiveStage> { - // If the multiplexer has not been activated, activate the correct stage. - if let Some(prev) = self.prev.take() { - let origin = prev.origin().ok_or(PipelineError::MissingOrigin.crit())?; - - self.active_stage = Some( - $(if self.cfg.$stage_condition(origin.timestamp) { - ActiveStage::$stage_name($stage_name::new(self.cfg.clone(), prev$($(, self.$input_name.clone())*)?)) - } else)* { - ActiveStage::$last_stage_name($last_stage_name::new(self.cfg.clone(), prev$($(, self.$last_input_name.clone())*)?)) - } - ); - return self.active_stage.as_mut().ok_or(PipelineError::from(MultiplexerError::NotActivated).crit()); - } else { - // Otherwise, check if the active stage should be changed. - let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?; - let active_stage = self.active_stage.take().ok_or(PipelineError::from(MultiplexerError::NotActivated).crit())?; - - // If a new stage has activated, transfer ownership of the previous stage to the new stage to - // re-link the pipeline at runtime. - $(if self.cfg.$stage_condition(origin.timestamp) { - // If the correct stage is already active, return it. - if matches!(active_stage, ActiveStage::$stage_name(_)) { - self.active_stage = Some(active_stage); - return self.active_stage.as_mut().ok_or(PipelineError::from(MultiplexerError::NotActivated).crit()); - } - - // Otherwise, dissolve the active stage and create a new one, granting ownership of - // the previous stage to the new stage. - let prev = active_stage.into_prev(); - self.active_stage = Some(ActiveStage::$stage_name($stage_name::new(self.cfg.clone(), prev$($(, self.$input_name.clone())*)?))); - } else)* { - // If the correct stage is already active, return it. - if matches!(active_stage, ActiveStage::$last_stage_name(_)) { - self.active_stage = Some(active_stage); - return self.active_stage.as_mut().ok_or(PipelineError::from(MultiplexerError::NotActivated).crit()); - } - - self.active_stage = Some(ActiveStage::$last_stage_name($last_stage_name::new(self.cfg.clone(), active_stage.into_prev()$($(, self.$last_input_name.clone())*)?))); - } - } - - self.active_stage.as_mut().ok_or(PipelineError::from(MultiplexerError::NotActivated).crit()) - } - } - - #[async_trait] - impl OriginAdvancer for $provider_name - where - P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + Send + core::fmt::Debug, - { - async fn advance_origin(&mut self) -> PipelineResult<()> { - match self.active_stage_mut()? { - $(ActiveStage::$stage_name(stage) => stage.advance_origin().await,)* - ActiveStage::$last_stage_name(stage) => stage.advance_origin().await, - } - } - } - - impl OriginProvider for $provider_name - where - P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + core::fmt::Debug, - { - fn origin(&self) -> Option { - match self.active_stage_ref() { - Some(stage) => { - match stage { - $(ActiveStage::$stage_name(stage) => stage.origin(),)* - ActiveStage::$last_stage_name(stage) => stage.origin(), - } - } - None => self.prev.as_ref().map(|prev| prev.origin()).flatten(), - } - } - } - - #[async_trait] - impl SignalReceiver for $provider_name - where - P: $prev_type + OriginAdvancer + OriginProvider + SignalReceiver + Send + core::fmt::Debug, - { - async fn signal( - &mut self, - signal: Signal, - ) -> PipelineResult<()> { - match self.active_stage_mut()? { - $(ActiveStage::$stage_name(stage) => stage.signal(signal).await,)* - ActiveStage::$last_stage_name(stage) => stage.signal(signal).await, - } - } - } - } -} - -pub(crate) use multiplexed_stage; From a8e496fe6c8bc094f9920309c2c8be6da0d21ee8 Mon Sep 17 00:00:00 2001 From: clabby Date: Mon, 21 Oct 2024 19:03:37 -0400 Subject: [PATCH 4/6] rebase --- crates/derive/src/stages/batch/batch_provider.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/derive/src/stages/batch/batch_provider.rs b/crates/derive/src/stages/batch/batch_provider.rs index c343a774e..cb6f8519a 100644 --- a/crates/derive/src/stages/batch/batch_provider.rs +++ b/crates/derive/src/stages/batch/batch_provider.rs @@ -4,8 +4,10 @@ use super::NextBatchProvider; use crate::{ batch::SingleBatch, errors::{PipelineError, PipelineResult}, - stages::{AttributesProvider, BatchQueue, BatchValidator}, - traits::{L2ChainProvider, OriginAdvancer, OriginProvider, Signal, SignalReceiver}, + stages::{BatchQueue, BatchValidator}, + traits::{ + AttributesProvider, L2ChainProvider, OriginAdvancer, OriginProvider, Signal, SignalReceiver, + }, }; use alloc::{boxed::Box, sync::Arc}; use async_trait::async_trait; From 3990307ccbc05fc7397892f12511bf75ebaae016 Mon Sep 17 00:00:00 2001 From: clabby Date: Mon, 21 Oct 2024 19:16:42 -0400 Subject: [PATCH 5/6] updates --- crates/derive/src/stages/batch/batch_provider.rs | 4 ++-- crates/derive/src/stages/batch/batch_validator.rs | 8 +++----- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/crates/derive/src/stages/batch/batch_provider.rs b/crates/derive/src/stages/batch/batch_provider.rs index cb6f8519a..b480a9e96 100644 --- a/crates/derive/src/stages/batch/batch_provider.rs +++ b/crates/derive/src/stages/batch/batch_provider.rs @@ -44,7 +44,7 @@ where /// /// Must be [None] if `prev` or `batch_validator` is [Some]. batch_queue: Option>, - /// The channel assembler stage of the provider. + /// The batch validator stage of the provider. /// /// Must be [None] if `prev` or `batch_queue` is [Some]. batch_validator: Option>, @@ -75,7 +75,7 @@ where } else if self.batch_queue.is_some() && self.cfg.is_holocene_active(origin.timestamp) { // If the batch queue is active and Holocene is also active, transition to the batch // validator. - let batch_queue = self.batch_queue.take().expect("Must have channel bank"); + let batch_queue = self.batch_queue.take().expect("Must have batch queue"); let mut bv = BatchValidator::new(self.cfg.clone(), batch_queue.prev); bv.l1_blocks = batch_queue.l1_blocks; self.batch_validator = Some(bv); diff --git a/crates/derive/src/stages/batch/batch_validator.rs b/crates/derive/src/stages/batch/batch_validator.rs index a8bc38446..62212c449 100644 --- a/crates/derive/src/stages/batch/batch_validator.rs +++ b/crates/derive/src/stages/batch/batch_validator.rs @@ -319,12 +319,10 @@ mod test { use super::BatchValidator; use crate::{ batch::{Batch, SingleBatch, SpanBatch}, - errors::{PipelineErrorKind, ResetError}, - pipeline::{PipelineResult, SignalReceiver}, - prelude::PipelineError, + errors::{PipelineError, PipelineErrorKind, PipelineResult, ResetError}, stages::NextBatchProvider, - test_utils::{CollectingLayer, TestBatchQueueProvider, TraceStorage}, - traits::{AttributesProvider, OriginAdvancer, ResetSignal, Signal}, + test_utils::{CollectingLayer, TestNextBatchProvider, TraceStorage}, + traits::{AttributesProvider, OriginAdvancer, ResetSignal, Signal, SignalReceiver}, }; use alloc::sync::Arc; use alloy_eips::{BlockNumHash, NumHash}; From bc9421863d72aee95aeda89cc96cfb14c63e7f05 Mon Sep 17 00:00:00 2001 From: clabby Date: Mon, 21 Oct 2024 19:25:16 -0400 Subject: [PATCH 6/6] remove `into_prev` --- crates/derive/src/stages/batch/batch_queue.rs | 5 ----- .../derive/src/stages/batch/batch_validator.rs | 5 ----- .../src/stages/channel/channel_assembler.rs | 5 ----- .../derive/src/stages/channel/channel_bank.rs | 17 ----------------- 4 files changed, 32 deletions(-) diff --git a/crates/derive/src/stages/batch/batch_queue.rs b/crates/derive/src/stages/batch/batch_queue.rs index b66e2c93e..0a320f367 100644 --- a/crates/derive/src/stages/batch/batch_queue.rs +++ b/crates/derive/src/stages/batch/batch_queue.rs @@ -78,11 +78,6 @@ where } } - /// Consumes [Self] and returns the inner previous stage. - pub fn into_prev(self) -> P { - self.prev - } - /// Pops the next batch from the current queued up span-batch cache. /// The parent is used to set the parent hash of the batch. /// The parent is verified when the batch is later validated. diff --git a/crates/derive/src/stages/batch/batch_validator.rs b/crates/derive/src/stages/batch/batch_validator.rs index 62212c449..5b05fc376 100644 --- a/crates/derive/src/stages/batch/batch_validator.rs +++ b/crates/derive/src/stages/batch/batch_validator.rs @@ -49,11 +49,6 @@ where Self { cfg, prev, origin: None, l1_blocks: Vec::new() } } - /// Consumes [Self] and returns the previous stage. - pub fn into_prev(self) -> P { - self.prev - } - /// Returns `true` if the pipeline origin is behind the parent origin. /// /// ## Takes diff --git a/crates/derive/src/stages/channel/channel_assembler.rs b/crates/derive/src/stages/channel/channel_assembler.rs index 2e48de481..3ee827f72 100644 --- a/crates/derive/src/stages/channel/channel_assembler.rs +++ b/crates/derive/src/stages/channel/channel_assembler.rs @@ -44,11 +44,6 @@ where Self { cfg, prev, channel: None } } - /// Consumes [Self] and returns the previous stage. - pub fn into_prev(self) -> P { - self.prev - } - /// Returns whether or not the channel currently being assembled has timed out. pub fn is_timed_out(&self) -> PipelineResult { let origin = self.origin().ok_or(PipelineError::MissingOrigin.crit())?; diff --git a/crates/derive/src/stages/channel/channel_bank.rs b/crates/derive/src/stages/channel/channel_bank.rs index 7f2b05c6b..ded3a6b9d 100644 --- a/crates/derive/src/stages/channel/channel_bank.rs +++ b/crates/derive/src/stages/channel/channel_bank.rs @@ -56,11 +56,6 @@ where Self { cfg, channels: HashMap::new(), channel_queue: VecDeque::new(), prev } } - /// Consumes [Self] and returns the previous stage. - pub fn into_prev(self) -> P { - self.prev - } - /// Returns the size of the channel bank by accumulating over all channels. pub fn size(&self) -> usize { self.channels.iter().fold(0, |acc, (_, c)| acc + c.size()) @@ -284,18 +279,6 @@ mod tests { use tracing::Level; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; - #[test] - fn test_channel_bank_into_prev() { - let frames = [crate::frame!(0xFF, 0, vec![0xDD; 50], true)]; - let mock = TestNextFrameProvider::new(frames.into_iter().map(Ok).collect()); - let cfg = Arc::new(RollupConfig::default()); - let channel_bank = ChannelBank::new(cfg, mock); - - let prev = channel_bank.into_prev(); - assert_eq!(prev.origin(), Some(BlockInfo::default())); - assert_eq!(prev.data.len(), 1) - } - #[test] fn test_try_read_channel_at_index_missing_channel() { let mock = TestNextFrameProvider::new(vec![]);