Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(derive): Pipeline Builder #127

Merged
merged 15 commits into from
Apr 27, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions crates/derive/src/builder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
//! Contains a concrete implementation of the [DerivationPipeline].

use crate::{
stages::{
AttributesBuilder, AttributesQueue, BatchQueue, ChannelBank, ChannelReader, FrameQueue,
L1Retrieval, L1Traversal, NextAttributes,
},
traits::{ChainProvider, DataAvailabilityProvider, L2ChainProvider, OriginProvider},
types::{L2AttributesWithParent, L2BlockInfo, RollupConfig, StageResult},
};
use alloc::sync::Arc;
use core::fmt::Debug;

/// The derivation pipeline is responsible for deriving L2 inputs from L1 data.
#[derive(Debug)]
pub struct DerivationPipeline<N: NextAttributes + Debug> {
/// The attributes queue to retrieve the next attributes.
pub attributes: N,
/// A cursor for the [L2BlockInfo] parent to be used when pulling the next attributes.
pub cursor: L2BlockInfo,
}

impl<N: NextAttributes + Debug + Send> DerivationPipeline<N> {
/// Creates a new instance of the [DerivationPipeline].
pub fn new(attributes: N, cursor: L2BlockInfo) -> Self {
Self { attributes, cursor }
}

/// Set the [L2BlockInfo] cursor to be used when pulling the next attributes.
pub fn set_cursor(&mut self, cursor: L2BlockInfo) {
self.cursor = cursor;
}

/// Get the next attributes from the pipeline.
pub async fn next(&mut self) -> StageResult<L2AttributesWithParent> {
self.attributes.next_attributes(self.cursor).await
}
}

impl<P, DAP, F, B> DerivationPipeline<KonaAttributes<P, DAP, F, B>>
where
P: ChainProvider + Clone + Debug + Send,
DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send,
F: L2ChainProvider + Clone + Debug + Send,
B: AttributesBuilder + Clone + Debug + Send,
{
/// Creates a new instance of the [DerivationPipeline] from the given attributes.
pub fn new_online_pipeline(
attributes: KonaAttributes<P, DAP, F, B>,
cursor: L2BlockInfo,
) -> Self {
Self::new(attributes, cursor)
}
}

/// [KonaDerivationPipeline] is a concrete [DerivationPipeline] type.
pub type KonaDerivationPipeline<P, DAP, F, B> = DerivationPipeline<KonaAttributes<P, DAP, F, B>>;

/// [KonaAttributes] is a concrete [NextAttributes] type.
pub type KonaAttributes<P, DAP, F, B> = AttributesQueue<
BatchQueue<ChannelReader<ChannelBank<FrameQueue<L1Retrieval<DAP, L1Traversal<P>>>>>, F>,
B,
>;

/// Creates a new [KonaAttributes] instance.
pub fn new_online_pipeline<P, DAP, F, B>(
rollup_config: Arc<RollupConfig>,
chain_provider: P,
dap_source: DAP,
fetcher: F,
builder: B,
) -> KonaAttributes<P, DAP, F, B>
where
P: ChainProvider + Clone + Debug + Send,
DAP: DataAvailabilityProvider + OriginProvider + Clone + Debug + Send,
F: L2ChainProvider + Clone + Debug + Send,
B: AttributesBuilder + Clone + Debug + Send,
{
let l1_traversal = L1Traversal::new(chain_provider, rollup_config.clone());
let l1_retrieval = L1Retrieval::new(l1_traversal, dap_source);
let frame_queue = FrameQueue::new(l1_retrieval);
let channel_bank = ChannelBank::new(rollup_config.clone(), frame_queue);
let channel_reader = ChannelReader::new(channel_bank, rollup_config.clone());
let batch_queue = BatchQueue::new(rollup_config.clone(), channel_reader, fetcher);
AttributesQueue::new(*rollup_config, batch_queue, builder)
}
27 changes: 9 additions & 18 deletions crates/derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@

extern crate alloc;

use alloc::sync::Arc;
use core::fmt::Debug;
use traits::ChainProvider;
use types::RollupConfig;
/// Prelude exports common types and traits.
pub mod prelude {
refcell marked this conversation as resolved.
Show resolved Hide resolved
pub use super::{builder::DerivationPipeline, params::*};
// pub use super::traits::prelude::*;
// pub use super::types::prelude::*;
// pub use super::stages::prelude::*;
// pub use super::sources::prelude::*;
}

mod params;
pub use params::{
Expand All @@ -19,6 +23,7 @@ pub use params::{
MAX_SPAN_BATCH_BYTES, SEQUENCER_FEE_VAULT_ADDRESS,
};

pub mod builder;
pub mod sources;
pub mod stages;
pub mod traits;
Expand All @@ -28,17 +33,3 @@ pub mod types;
mod online;
#[cfg(feature = "online")]
pub use online::prelude::*;

/// The derivation pipeline is responsible for deriving L2 inputs from L1 data.
#[derive(Debug, Clone, Copy)]
pub struct DerivationPipeline;

impl DerivationPipeline {
/// Creates a new instance of the [DerivationPipeline].
pub fn new<P>(_rollup_config: Arc<RollupConfig>, _chain_provider: P) -> Self
where
P: ChainProvider + Clone + Debug + Send,
{
unimplemented!("TODO: High-level pipeline composition helper.")
}
}
22 changes: 22 additions & 0 deletions crates/derive/src/stages/attributes_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ pub trait AttributesProvider {
fn is_last_in_span(&self) -> bool;
}

/// [NextAttributes] is a trait abstraction that generalizes the [AttributesQueue] stage.
#[async_trait]
pub trait NextAttributes {
/// Returns the next [L2AttributesWithParent] from the current batch.
async fn next_attributes(&mut self, parent: L2BlockInfo)
-> StageResult<L2AttributesWithParent>;
}

/// [AttributesQueue] accepts batches from the [BatchQueue] stage
/// and transforms them into [L2PayloadAttributes]. The outputted payload
/// attributes cannot be buffered because each batch->attributes transformation
Expand Down Expand Up @@ -139,6 +147,20 @@ where
}
}

#[async_trait]
impl<P, AB> NextAttributes for AttributesQueue<P, AB>
where
P: AttributesProvider + OriginProvider + Debug + Send,
AB: AttributesBuilder + Debug + Send,
{
async fn next_attributes(
&mut self,
parent: L2BlockInfo,
) -> StageResult<L2AttributesWithParent> {
self.next_attributes(parent).await
}
}

impl<P, AB> OriginProvider for AttributesQueue<P, AB>
where
P: AttributesProvider + OriginProvider + Debug,
Expand Down
Loading
Loading