diff --git a/Cargo.lock b/Cargo.lock index 931318673..ada919454 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4797,6 +4797,7 @@ dependencies = [ "alloy-transport", "anyhow", "clap", + "kona-derive", "kona-primitives", "kona-providers-alloy", "lazy_static", diff --git a/bin/client/src/l1/driver.rs b/bin/client/src/l1/driver.rs index fdda5b9f7..8568bbe1e 100644 --- a/bin/client/src/l1/driver.rs +++ b/bin/client/src/l1/driver.rs @@ -19,7 +19,7 @@ use kona_derive::{ AttributesQueue, BatchQueue, BatchStream, ChannelBank, ChannelReader, FrameQueue, L1Retrieval, L1Traversal, }, - traits::{BlobProvider, OriginProvider}, + traits::{BlobProvider, OriginProvider, Signal}, }; use kona_executor::{KonaHandleRegister, StatelessL2BlockExecutor}; use kona_mpt::{TrieHinter, TrieProvider}; @@ -229,12 +229,13 @@ where // Reset the pipeline to the initial L2 safe head and L1 origin, // and try again. self.pipeline - .reset( - self.l2_safe_head.block_info, - self.pipeline + .signal(Signal::Reset { + l2_safe_head: self.l2_safe_head, + l1_origin: self + .pipeline .origin() .ok_or_else(|| anyhow!("Missing L1 origin"))?, - ) + }) .await?; } PipelineErrorKind::Critical(_) => return Err(e.into()), diff --git a/crates/derive/src/pipeline/core.rs b/crates/derive/src/pipeline/core.rs index 1586aff36..9dd489c60 100644 --- a/crates/derive/src/pipeline/core.rs +++ b/crates/derive/src/pipeline/core.rs @@ -4,7 +4,7 @@ use super::{ NextAttributes, OriginAdvancer, OriginProvider, Pipeline, PipelineError, PipelineResult, ResettableStage, StepResult, }; -use crate::errors::PipelineErrorKind; +use crate::{errors::PipelineErrorKind, traits::Signal}; use alloc::{boxed::Box, collections::VecDeque, string::ToString, sync::Arc}; use async_trait::async_trait; use core::fmt::Debug; @@ -95,26 +95,30 @@ where /// /// The `l1_block_info` is the new L1 origin set in the [crate::stages::L1Traversal] /// stage. - async fn reset( - &mut self, - l2_block_info: BlockInfo, - l1_block_info: BlockInfo, - ) -> PipelineResult<()> { - let system_config = self - .l2_chain_provider - .system_config_by_number(l2_block_info.number, Arc::clone(&self.rollup_config)) - .await - .map_err(|e| PipelineError::Provider(e.to_string()).temp())?; - match self.attributes.reset(l1_block_info, &system_config).await { - Ok(()) => trace!(target: "pipeline", "Stages reset"), - Err(err) => { - if let PipelineErrorKind::Temporary(PipelineError::Eof) = err { - trace!(target: "pipeline", "Stages reset with EOF"); - } else { - error!(target: "pipeline", "Stage reset errored: {:?}", err); - return Err(err); + async fn signal(&mut self, signal: Signal) -> PipelineResult<()> { + match signal { + Signal::Reset { l2_safe_head, l1_origin } => { + let system_config = self + .l2_chain_provider + .system_config_by_number( + l2_safe_head.block_info.number, + Arc::clone(&self.rollup_config), + ) + .await + .map_err(|e| PipelineError::Provider(e.to_string()).temp())?; + match self.attributes.reset(l1_origin, &system_config).await { + Ok(()) => trace!(target: "pipeline", "Stages reset"), + Err(err) => { + if let PipelineErrorKind::Temporary(PipelineError::Eof) = err { + trace!(target: "pipeline", "Stages reset with EOF"); + } else { + error!(target: "pipeline", "Stage reset errored: {:?}", err); + return Err(err); + } + } } } + _ => unimplemented!("Signal not implemented"), } Ok(()) } diff --git a/crates/derive/src/traits/mod.rs b/crates/derive/src/traits/mod.rs index d4b01ff04..ebee59385 100644 --- a/crates/derive/src/traits/mod.rs +++ b/crates/derive/src/traits/mod.rs @@ -2,7 +2,7 @@ //! pipeline. mod pipeline; -pub use pipeline::{Pipeline, StepResult}; +pub use pipeline::{Pipeline, Signal, StepResult}; mod attributes; pub use attributes::{AttributesBuilder, NextAttributes}; diff --git a/crates/derive/src/traits/pipeline.rs b/crates/derive/src/traits/pipeline.rs index 3732d81eb..7c50a9ce8 100644 --- a/crates/derive/src/traits/pipeline.rs +++ b/crates/derive/src/traits/pipeline.rs @@ -21,6 +21,21 @@ pub enum StepResult { StepFailed(PipelineErrorKind), } +/// A signal to send to the pipeline. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[allow(clippy::large_enum_variant)] +pub enum Signal { + /// Reset the pipeline. + Reset { + /// The L2 safe head to reset to. + l2_safe_head: L2BlockInfo, + /// The L1 origin to reset to. + l1_origin: BlockInfo, + }, + /// Flush the currently active channel. + FlushChannel, +} + /// This trait defines the interface for interacting with the derivation pipeline. #[async_trait] pub trait Pipeline: OriginProvider + Iterator { @@ -28,7 +43,7 @@ pub trait Pipeline: OriginProvider + Iterator Option<&OptimismAttributesWithParent>; /// Resets the pipeline on the next [Pipeline::step] call. - async fn reset(&mut self, l2_block_info: BlockInfo, origin: BlockInfo) -> PipelineResult<()>; + async fn signal(&mut self, signal: Signal) -> PipelineResult<()>; /// Attempts to progress the pipeline. async fn step(&mut self, cursor: L2BlockInfo) -> StepResult; diff --git a/examples/trusted-sync/Cargo.toml b/examples/trusted-sync/Cargo.toml index c63eb6828..128c34d3d 100644 --- a/examples/trusted-sync/Cargo.toml +++ b/examples/trusted-sync/Cargo.toml @@ -11,6 +11,7 @@ homepage.workspace = true [dependencies] # Workspace +kona-derive.workspace = true kona-primitives = { workspace = true, features = ["serde"] } kona-providers-alloy = { workspace = true, features = ["metrics"] } diff --git a/examples/trusted-sync/src/main.rs b/examples/trusted-sync/src/main.rs index ccdc2b677..2c9c5df9a 100644 --- a/examples/trusted-sync/src/main.rs +++ b/examples/trusted-sync/src/main.rs @@ -1,5 +1,6 @@ use anyhow::Result; use clap::Parser; +use kona_derive::traits::Signal; use kona_providers_alloy::prelude::*; use std::sync::Arc; use superchain::ROLLUP_CONFIGS; @@ -142,7 +143,10 @@ async fn sync(cli: cli::Cli) -> Result<()> { continue; }; info!(target: LOG_TARGET, "Resetting pipeline with l1 block info: {:?}", l1_block_info); - if let Err(e) = pipeline.reset(c.block_info, l1_block_info).await { + if let Err(e) = pipeline + .signal(Signal::Reset { l2_safe_head: c, l1_origin: l1_block_info }) + .await + { error!(target: LOG_TARGET, "Failed to reset pipeline: {:?}", e); continue; } @@ -169,7 +173,10 @@ async fn sync(cli: cli::Cli) -> Result<()> { continue; }; info!(target: LOG_TARGET, "Resetting pipeline with l1 block info: {:?}", l1_block_info); - if let Err(e) = pipeline.reset(c.block_info, l1_block_info).await { + if let Err(e) = pipeline + .signal(Signal::Reset { l2_safe_head: c, l1_origin: l1_block_info }) + .await + { error!(target: LOG_TARGET, "Failed to reset pipeline: {:?}", e); continue; } @@ -232,10 +239,12 @@ async fn sync(cli: cli::Cli) -> Result<()> { metrics::PIPELINE_STEPS.with_label_values(&["reset"]).inc(); warn!(target: "loop", "Resetting pipeline: {:?}", e); pipeline - .reset( - cursor.block_info, - pipeline.origin().ok_or(anyhow::anyhow!("Missing origin"))?, - ) + .signal(Signal::Reset { + l2_safe_head: cursor, + l1_origin: pipeline + .origin() + .ok_or(anyhow::anyhow!("Missing origin"))?, + }) .await?; } PipelineErrorKind::Critical(_) => {