diff --git a/crates/derive/src/errors.rs b/crates/derive/src/errors.rs index 1e6579ceb..a168b31f2 100644 --- a/crates/derive/src/errors.rs +++ b/crates/derive/src/errors.rs @@ -127,6 +127,16 @@ pub enum ResetError { /// Attributes builder error variant, with [BuilderError]. #[error("Attributes builder error: {0}")] AttributesBuilder(#[from] BuilderError), + /// A Holocene activation temporary error. + #[error("Holocene activation error")] + HoloceneActivation, +} + +impl ResetError { + /// Wrap [self] as a [PipelineErrorKind::Reset]. + pub fn reset(self) -> PipelineErrorKind { + PipelineErrorKind::Reset(self) + } } /// A decoding error. diff --git a/crates/derive/src/stages/l1_traversal.rs b/crates/derive/src/stages/l1_traversal.rs index 2f0a6fab6..02897f17f 100644 --- a/crates/derive/src/stages/l1_traversal.rs +++ b/crates/derive/src/stages/l1_traversal.rs @@ -108,8 +108,19 @@ impl OriginAdvancer for L1Traversal { } crate::set!(ORIGIN_GAUGE, next_l1_origin.number as i64); + self.block = Some(next_l1_origin); self.done = false; + + // If the prev block is not holocene, but the next is, we need to flag this + // so the pipeline driver will reset the pipeline for holocene activation. + let prev_block_holocene = + self.rollup_config.is_holocene_active(self.block.unwrap().timestamp); + let next_block_holocene = self.rollup_config.is_holocene_active(next_l1_origin.timestamp); + if !prev_block_holocene && next_block_holocene { + return Err(ResetError::HoloceneActivation.reset()); + } + Ok(()) } } diff --git a/examples/trusted-sync/src/main.rs b/examples/trusted-sync/src/main.rs index da226785c..2986b711b 100644 --- a/examples/trusted-sync/src/main.rs +++ b/examples/trusted-sync/src/main.rs @@ -214,32 +214,40 @@ async fn sync(cli: cli::Cli) -> Result<()> { metrics::PIPELINE_STEPS.with_label_values(&["origin_advance"]).inc(); trace!(target: "loop", "Advanced origin"); } - StepResult::OriginAdvanceErr(e) => { - metrics::PIPELINE_STEPS.with_label_values(&["origin_advance_failure"]).inc(); - warn!(target: "loop", "Could not advance origin: {:?}", e); - } - StepResult::StepFailed(e) => match e { - PipelineErrorKind::Temporary(e) => { - if matches!(e, PipelineError::NotEnoughData) { - metrics::PIPELINE_STEPS.with_label_values(&["not_enough_data"]).inc(); - debug!(target: "loop", "Not enough data to step derivation pipeline"); - } - } - PipelineErrorKind::Reset(_) => { - metrics::PIPELINE_STEPS.with_label_values(&["reset"]).inc(); - warn!(target: "loop", "Resetting pipeline: {:?}", e); - pipeline - .reset( - cursor.block_info, - pipeline.origin().ok_or(anyhow::anyhow!("Missing origin"))?, - ) - .await?; + sr => { + if let StepResult::OriginAdvanceErr(ref e) = sr { + metrics::PIPELINE_STEPS.with_label_values(&["origin_advance_failure"]).inc(); + warn!(target: "loop", "Could not advance origin: {:?}", e); } - _ => { - metrics::PIPELINE_STEPS.with_label_values(&["failure"]).inc(); - error!(target: "loop", "Error stepping derivation pipeline: {:?}", e); + + match sr { + StepResult::PreparedAttributes | StepResult::AdvancedOrigin => {} + StepResult::OriginAdvanceErr(e) | StepResult::StepFailed(e) => match e { + PipelineErrorKind::Temporary(e) => { + if matches!(e, PipelineError::NotEnoughData) { + metrics::PIPELINE_STEPS + .with_label_values(&["not_enough_data"]) + .inc(); + debug!(target: "loop", "Not enough data to step derivation pipeline"); + } + } + PipelineErrorKind::Reset(_) => { + metrics::PIPELINE_STEPS.with_label_values(&["reset"]).inc(); + warn!(target: "loop", "Resetting pipeline: {:?}", e); + pipeline + .reset( + cursor.block_info, + pipeline.origin().ok_or(anyhow::anyhow!("Missing origin"))?, + ) + .await?; + } + PipelineErrorKind::Critical(_) => { + metrics::PIPELINE_STEPS.with_label_values(&["failure"]).inc(); + error!(target: "loop", "Error stepping derivation pipeline: {:?}", e); + } + }, } - }, + } } // Peek at the next prepared attributes and validate them.