Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(driver,client): Pipeline Cursor Refactor #798

Merged
merged 2 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions bin/client/src/kona.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use kona_client::{
executor::KonaExecutorConstructor,
l1::{OracleBlobProvider, OracleL1ChainProvider, OraclePipeline},
l2::OracleL2ChainProvider,
sync::SyncStart,
sync::new_pipeline_cursor,
BootInfo, CachingOracle,
};
use kona_common::io;
Expand Down Expand Up @@ -72,7 +72,7 @@ fn main() -> Result<(), String> {

// Create a new derivation driver with the given boot information and oracle.

let Ok(sync_start) = SyncStart::from(
let Ok(cursor) = new_pipeline_cursor(
oracle.clone(),
&boot,
&mut l1_provider.clone(),
Expand All @@ -87,7 +87,7 @@ fn main() -> Result<(), String> {
let cfg = Arc::new(boot.rollup_config.clone());
let pipeline = OraclePipeline::new(
cfg.clone(),
sync_start.clone(),
cursor.clone(),
oracle.clone(),
beacon,
l1_provider.clone(),
Expand All @@ -99,7 +99,7 @@ fn main() -> Result<(), String> {
l2_provider,
fpvm_handle_register,
);
let mut driver = Driver::new(sync_start.cursor, executor, pipeline);
let mut driver = Driver::new(cursor, executor, pipeline);

// Run the derivation pipeline until we are able to produce the output root of the claimed
// L2 block.
Expand Down
10 changes: 4 additions & 6 deletions bin/client/src/l1/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@ use kona_derive::{
traits::{BlobProvider, OriginProvider, Pipeline, SignalReceiver},
types::{PipelineResult, Signal, StepResult},
};
use kona_driver::DriverPipeline;
use kona_driver::{DriverPipeline, PipelineCursor};
use kona_preimage::CommsClient;
use op_alloy_genesis::{RollupConfig, SystemConfig};
use op_alloy_protocol::{BlockInfo, L2BlockInfo};
use op_alloy_rpc_types_engine::OpAttributesWithParent;

use crate::{
l1::OracleL1ChainProvider, l2::OracleL2ChainProvider, sync::SyncStart, FlushableCache,
};
use crate::{l1::OracleL1ChainProvider, l2::OracleL2ChainProvider, FlushableCache};

/// An oracle-backed derivation pipeline.
pub type OracleDerivationPipeline<O, B> = DerivationPipeline<
Expand Down Expand Up @@ -76,7 +74,7 @@ where
/// Constructs a new oracle-backed derivation pipeline.
pub fn new(
cfg: Arc<RollupConfig>,
sync_start: SyncStart,
sync_start: PipelineCursor,
caching_oracle: Arc<O>,
blob_provider: B,
chain_provider: OracleL1ChainProvider<O>,
Expand All @@ -95,7 +93,7 @@ where
.l2_chain_provider(l2_chain_provider)
.chain_provider(chain_provider)
.builder(attributes)
.origin(sync_start.origin)
.origin(sync_start.origin())
.build();
Self { pipeline, caching_oracle }
}
Expand Down
107 changes: 45 additions & 62 deletions bin/client/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,72 +8,55 @@ use alloc::sync::Arc;
use alloy_consensus::Sealed;
use core::fmt::Debug;
use kona_derive::traits::ChainProvider;
use kona_driver::SyncCursor;
use kona_driver::{PipelineCursor, TipCursor};
use kona_mpt::TrieProvider;
use kona_preimage::{CommsClient, PreimageKey, PreimageKeyType};
use op_alloy_protocol::{BatchValidationProvider, BlockInfo};
use op_alloy_protocol::BatchValidationProvider;

/// Sync Start
#[derive(Debug, Clone)]
pub struct SyncStart {
/// The l1 origin block info.
pub origin: BlockInfo,
/// The sync cursor used for the derivation driver.
pub cursor: SyncCursor,
}

impl SyncStart {
/// Constructs the [`SyncStart`] from the caching oracle, boot info, and providers.
pub async fn from<O>(
caching_oracle: Arc<O>,
boot_info: &BootInfo,
chain_provider: &mut OracleL1ChainProvider<O>,
l2_chain_provider: &mut OracleL2ChainProvider<O>,
) -> Result<Self, OracleProviderError>
where
O: CommsClient + FlushableCache + FlushableCache + Send + Sync + Debug,
{
// Find the initial safe head, based off of the starting L2 block number in the boot info.
caching_oracle
.write(
&HintType::StartingL2Output
.encode_with(&[boot_info.agreed_l2_output_root.as_ref()]),
)
.await
.map_err(OracleProviderError::Preimage)?;
let mut output_preimage = [0u8; 128];
caching_oracle
.get_exact(
PreimageKey::new(*boot_info.agreed_l2_output_root, PreimageKeyType::Keccak256),
&mut output_preimage,
)
.await
.map_err(OracleProviderError::Preimage)?;

let safe_hash =
output_preimage[96..128].try_into().map_err(OracleProviderError::SliceConversion)?;
let safe_header = l2_chain_provider.header_by_hash(safe_hash)?;
let safe_head_info = l2_chain_provider.l2_block_info_by_number(safe_header.number).await?;
let l1_origin =
chain_provider.block_info_by_number(safe_head_info.l1_origin.number).await?;
/// Constructs a [`PipelineCursor`] from the caching oracle, boot info, and providers.
pub async fn new_pipeline_cursor<O>(
caching_oracle: Arc<O>,
boot_info: &BootInfo,
chain_provider: &mut OracleL1ChainProvider<O>,
l2_chain_provider: &mut OracleL2ChainProvider<O>,
) -> Result<PipelineCursor, OracleProviderError>
where
O: CommsClient + FlushableCache + FlushableCache + Send + Sync + Debug,
{
// Find the initial safe head, based off of the starting L2 block number in the boot info.
caching_oracle
.write(&HintType::StartingL2Output.encode_with(&[boot_info.agreed_l2_output_root.as_ref()]))
.await
.map_err(OracleProviderError::Preimage)?;
let mut output_preimage = [0u8; 128];
caching_oracle
.get_exact(
PreimageKey::new(*boot_info.agreed_l2_output_root, PreimageKeyType::Keccak256),
&mut output_preimage,
)
.await
.map_err(OracleProviderError::Preimage)?;

// Construct the sync cursor for the pipeline driver.
let cursor = SyncCursor::new(
safe_head_info,
Sealed::new_unchecked(safe_header, safe_hash),
boot_info.agreed_l2_output_root,
);
let safe_hash =
output_preimage[96..128].try_into().map_err(OracleProviderError::SliceConversion)?;
let safe_header = l2_chain_provider.header_by_hash(safe_hash)?;
let safe_head_info = l2_chain_provider.l2_block_info_by_number(safe_header.number).await?;
let l1_origin = chain_provider.block_info_by_number(safe_head_info.l1_origin.number).await?;

// Walk back the starting L1 block by `channel_timeout` to ensure that the full channel is
// captured.
let channel_timeout =
boot_info.rollup_config.channel_timeout(safe_head_info.block_info.timestamp);
let mut l1_origin_number = l1_origin.number.saturating_sub(channel_timeout);
if l1_origin_number < boot_info.rollup_config.genesis.l1.number {
l1_origin_number = boot_info.rollup_config.genesis.l1.number;
}
let origin = chain_provider.block_info_by_number(l1_origin_number).await?;

Ok(Self { origin, cursor })
// Walk back the starting L1 block by `channel_timeout` to ensure that the full channel is
// captured.
let channel_timeout =
boot_info.rollup_config.channel_timeout(safe_head_info.block_info.timestamp);
let mut l1_origin_number = l1_origin.number.saturating_sub(channel_timeout);
if l1_origin_number < boot_info.rollup_config.genesis.l1.number {
l1_origin_number = boot_info.rollup_config.genesis.l1.number;
}
let origin = chain_provider.block_info_by_number(l1_origin_number).await?;

// Construct the cursor.
let safe_header = Sealed::new_unchecked(safe_header, safe_hash);
let mut cursor = PipelineCursor::new(channel_timeout, origin);
let tip = TipCursor::new(safe_head_info, safe_header, boot_info.agreed_l2_output_root);
cursor.advance(origin, tip);
Ok(cursor)
}
25 changes: 16 additions & 9 deletions crates/driver/src/core.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//! The driver of the Derivation Pipeline.
//! The driver of the kona derivation pipeline.

use alloc::vec::Vec;
use alloy_consensus::{BlockBody, Sealable};
Expand All @@ -16,7 +16,10 @@
use op_alloy_rpc_types_engine::OpAttributesWithParent;
use tracing::{error, info, warn};

use crate::{DriverError, DriverPipeline, DriverResult, Executor, ExecutorConstructor, SyncCursor};
use crate::{
DriverError, DriverPipeline, DriverResult, Executor, ExecutorConstructor, PipelineCursor,
TipCursor,
};

/// The Rollup Driver entrypoint.
#[derive(Debug)]
Expand All @@ -34,7 +37,7 @@
/// A pipeline abstraction.
pipeline: DP,
/// Cursor to keep track of the L2 tip
cursor: SyncCursor,
cursor: PipelineCursor,
/// Executor constructor.
executor: EC,
}
Expand All @@ -47,7 +50,7 @@
P: Pipeline + SignalReceiver + Send + Sync + Debug,
{
/// Creates a new [Driver].
pub const fn new(cursor: SyncCursor, executor: EC, pipeline: DP) -> Self {
pub const fn new(cursor: PipelineCursor, executor: EC, pipeline: DP) -> Self {

Check warning on line 53 in crates/driver/src/core.rs

View check run for this annotation

Codecov / codecov/patch

crates/driver/src/core.rs#L53

Added line #L53 was not covered by tests
Self {
_marker: core::marker::PhantomData,
_marker2: core::marker::PhantomData,
Expand Down Expand Up @@ -161,14 +164,18 @@
},
};

// Update the safe head.
self.cursor.l2_safe_head = L2BlockInfo::from_block_and_genesis(
// Get the pipeline origin and update the cursor.
let origin = self.pipeline.origin().ok_or(PipelineError::MissingOrigin.crit())?;
let l2_info = L2BlockInfo::from_block_and_genesis(

Check warning on line 169 in crates/driver/src/core.rs

View check run for this annotation

Codecov / codecov/patch

crates/driver/src/core.rs#L168-L169

Added lines #L168 - L169 were not covered by tests
&block,
&self.pipeline.rollup_config().genesis,
)?;
self.cursor.l2_safe_head_header = header.clone().seal_slow();
self.cursor.l2_safe_head_output_root =
executor.compute_output_root().map_err(DriverError::Executor)?;
let cursor = TipCursor::new(
l2_info,
header.clone().seal_slow(),
executor.compute_output_root().map_err(DriverError::Executor)?,

Check warning on line 176 in crates/driver/src/core.rs

View check run for this annotation

Codecov / codecov/patch

crates/driver/src/core.rs#L173-L176

Added lines #L173 - L176 were not covered by tests
);
self.cursor.advance(origin, cursor);

Check warning on line 178 in crates/driver/src/core.rs

View check run for this annotation

Codecov / codecov/patch

crates/driver/src/core.rs#L178

Added line #L178 was not covered by tests
}
}
}
121 changes: 96 additions & 25 deletions crates/driver/src/cursor.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,113 @@
//! Contains the cursor for the derivation driver.
//! Contains the cursor for the derivation pipeline.

use crate::TipCursor;
use alloc::collections::{btree_map::BTreeMap, vec_deque::VecDeque};
use alloy_consensus::{Header, Sealed};
use alloy_primitives::B256;
use op_alloy_protocol::L2BlockInfo;
use alloy_primitives::{map::HashMap, B256};
use op_alloy_protocol::{BlockInfo, L2BlockInfo};

/// A cursor that keeps track of the L2 tip block.
/// A cursor that tracks the pipeline tip.
#[derive(Debug, Clone)]
pub struct SyncCursor {
/// The current L2 safe head.
pub l2_safe_head: L2BlockInfo,
/// The header of the L2 safe head.
pub l2_safe_head_header: Sealed<Header>,
/// The output root of the L2 safe head.
pub l2_safe_head_output_root: B256,
pub struct PipelineCursor {
/// The block cache capacity before evicting old entries
/// (to avoid unbounded memory growth)
capacity: usize,
/// The channel timeout used to create the cursor.
channel_timeout: u64,
/// The l1 Origin of the pipeline.
origin: BlockInfo,
/// The L1 origin block numbers for which we have an L2 block in the cache.
/// Used to keep track of the order of insertion and evict the oldest entry.
origins: VecDeque<u64>,
/// The L1 origin block info for which we have an L2 block in the cache.
origin_infos: HashMap<u64, BlockInfo>,
/// A map from the l1 origin block number to its L2 tip.
tips: BTreeMap<u64, TipCursor>,
}

impl SyncCursor {
/// Instantiates a new `SyncCursor`.
pub const fn new(
l2_safe_head: L2BlockInfo,
l2_safe_head_header: Sealed<Header>,
l2_safe_head_output_root: B256,
) -> Self {
Self { l2_safe_head, l2_safe_head_header, l2_safe_head_output_root }
impl PipelineCursor {
/// Create a new cursor with the default cache capacity
pub fn new(channel_timeout: u64, origin: BlockInfo) -> Self {
// NOTE: capacity must be greater than the `channel_timeout` to allow
// for derivation to proceed through a deep reorg.
// Ref: <https://specs.optimism.io/protocol/derivation.html#timeouts>
let capacity = channel_timeout as usize + 5;

let mut origins = VecDeque::with_capacity(capacity);
origins.push_back(origin.number);
let mut origin_infos = HashMap::default();
origin_infos.insert(origin.number, origin);
Self { capacity, channel_timeout, origin, origins, origin_infos, tips: Default::default() }
}

Check warning on line 41 in crates/driver/src/cursor.rs

View check run for this annotation

Codecov / codecov/patch

crates/driver/src/cursor.rs#L30-L41

Added lines #L30 - L41 were not covered by tests

/// Returns the current origin of the pipeline.
pub const fn origin(&self) -> BlockInfo {
self.origin

Check warning on line 45 in crates/driver/src/cursor.rs

View check run for this annotation

Codecov / codecov/patch

crates/driver/src/cursor.rs#L44-L45

Added lines #L44 - L45 were not covered by tests
}

/// Returns the current L2 safe head.
pub const fn l2_safe_head(&self) -> &L2BlockInfo {
&self.l2_safe_head
pub fn l2_safe_head(&self) -> &L2BlockInfo {
&self.tip().l2_safe_head

Check warning on line 50 in crates/driver/src/cursor.rs

View check run for this annotation

Codecov / codecov/patch

crates/driver/src/cursor.rs#L49-L50

Added lines #L49 - L50 were not covered by tests
}

/// Returns the header of the L2 safe head.
pub const fn l2_safe_head_header(&self) -> &Sealed<Header> {
&self.l2_safe_head_header
pub fn l2_safe_head_header(&self) -> &Sealed<Header> {
&self.tip().l2_safe_head_header

Check warning on line 55 in crates/driver/src/cursor.rs

View check run for this annotation

Codecov / codecov/patch

crates/driver/src/cursor.rs#L54-L55

Added lines #L54 - L55 were not covered by tests
}

/// Returns the output root of the L2 safe head.
pub const fn l2_safe_head_output_root(&self) -> &B256 {
&self.l2_safe_head_output_root
pub fn l2_safe_head_output_root(&self) -> &B256 {
&self.tip().l2_safe_head_output_root
}

Check warning on line 61 in crates/driver/src/cursor.rs

View check run for this annotation

Codecov / codecov/patch

crates/driver/src/cursor.rs#L59-L61

Added lines #L59 - L61 were not covered by tests

/// Get the current L2 tip
pub fn tip(&self) -> &TipCursor {
if let Some((_, l2_tip)) = self.tips.last_key_value() {
l2_tip

Check warning on line 66 in crates/driver/src/cursor.rs

View check run for this annotation

Codecov / codecov/patch

crates/driver/src/cursor.rs#L64-L66

Added lines #L64 - L66 were not covered by tests
} else {
unreachable!("cursor must be initialized with one block before advancing")

Check warning on line 68 in crates/driver/src/cursor.rs

View check run for this annotation

Codecov / codecov/patch

crates/driver/src/cursor.rs#L68

Added line #L68 was not covered by tests
}
}

Check warning on line 70 in crates/driver/src/cursor.rs

View check run for this annotation

Codecov / codecov/patch

crates/driver/src/cursor.rs#L70

Added line #L70 was not covered by tests

/// Advance the cursor to the provided L2 block, given the corresponding L1 origin block.
///
/// If the cache is full, the oldest entry is evicted.
pub fn advance(&mut self, origin: BlockInfo, l2_tip_block: TipCursor) {
if self.tips.len() >= self.capacity {
let key = self.origins.pop_front().unwrap();
self.tips.remove(&key);
}

Check warning on line 79 in crates/driver/src/cursor.rs

View check run for this annotation

Codecov / codecov/patch

crates/driver/src/cursor.rs#L75-L79

Added lines #L75 - L79 were not covered by tests

self.origin = origin;
self.origins.push_back(origin.number);
self.origin_infos.insert(origin.number, origin);
self.tips.insert(origin.number, l2_tip_block);
}

Check warning on line 85 in crates/driver/src/cursor.rs

View check run for this annotation

Codecov / codecov/patch

crates/driver/src/cursor.rs#L81-L85

Added lines #L81 - L85 were not covered by tests

/// When the L1 undergoes a reorg, we need to reset the cursor to the fork block minus
/// the channel timeout, because an L2 block might have started to be derived at the
/// beginning of the channel.
///
/// Returns the (L2 block info, L1 origin block info) tuple for the new cursor state.
pub fn reset(&mut self, fork_block: u64) -> (TipCursor, BlockInfo) {
let channel_start = fork_block - self.channel_timeout;

match self.tips.get(&channel_start) {
Some(l2_safe_tip) => {
// The channel start block is in the cache, we can use it to reset the cursor.
(l2_safe_tip.clone(), self.origin_infos[&channel_start])

Check warning on line 98 in crates/driver/src/cursor.rs

View check run for this annotation

Codecov / codecov/patch

crates/driver/src/cursor.rs#L92-L98

Added lines #L92 - L98 were not covered by tests
}
None => {
// If the channel start block is not in the cache, we reset the cursor
// to the closest known L1 block for which we have a corresponding L2 block.
let (last_l1_known_tip, l2_known_tip) = self
.tips
.range(..=channel_start)
.next_back()
.expect("walked back to genesis without finding anchor origin block");

(l2_known_tip.clone(), self.origin_infos[last_l1_known_tip])

Check warning on line 109 in crates/driver/src/cursor.rs

View check run for this annotation

Codecov / codecov/patch

crates/driver/src/cursor.rs#L103-L109

Added lines #L103 - L109 were not covered by tests
}
}
}
}
5 changes: 4 additions & 1 deletion crates/driver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,7 @@ mod core;
pub use core::Driver;

mod cursor;
pub use cursor::SyncCursor;
pub use cursor::PipelineCursor;

mod tip;
pub use tip::TipCursor;
Loading
Loading