Skip to content

Commit

Permalink
chore: rename pipeline references to backfill sync (#9223)
Browse files Browse the repository at this point in the history
  • Loading branch information
fgimenez authored Jul 1, 2024
1 parent 9d4722e commit 158377f
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 45 deletions.
2 changes: 1 addition & 1 deletion crates/engine/tree/src/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ mod tests {
.build(),
);

// force the pipeline to be "done" after 5 blocks
// force the pipeline to be "done" after `pipeline_done_after` blocks
let pipeline = TestPipelineBuilder::new()
.with_pipeline_exec_outputs(VecDeque::from([Ok(ExecOutput {
checkpoint: StageCheckpoint::new(BlockNumber::from(pipeline_done_after)),
Expand Down
74 changes: 37 additions & 37 deletions crates/engine/tree/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ use std::{
///
/// ## Control flow
///
/// The [`ChainOrchestrator`] is responsible for controlling the pipeline sync and additional hooks.
/// The [`ChainOrchestrator`] is responsible for controlling the backfill sync and additional hooks.
/// It polls the given `handler`, which is responsible for advancing the chain, how is up to the
/// handler. However, due to database restrictions (e.g. exclusive write access), following
/// invariants apply:
/// - If the handler requests a backfill run (e.g. [`BackfillAction::Start`]), the handler must
/// ensure that while the pipeline is running, no other write access is granted.
/// ensure that while the backfill sync is running, no other write access is granted.
/// - At any time the [`ChainOrchestrator`] can request exclusive write access to the database
/// (e.g. if pruning is required), but will not do so until the handler has acknowledged the
/// request for write access.
Expand All @@ -35,18 +35,18 @@ where
{
/// The handler for advancing the chain.
handler: T,
/// Controls pipeline sync.
pipeline: P,
/// Controls backfill sync.
backfill_sync: P,
}

impl<T, P> ChainOrchestrator<T, P>
where
T: ChainHandler + Unpin,
P: BackfillSync + Unpin,
{
/// Creates a new [`ChainOrchestrator`] with the given handler and pipeline.
pub const fn new(handler: T, pipeline: P) -> Self {
Self { handler, pipeline }
/// Creates a new [`ChainOrchestrator`] with the given handler and backfill sync.
pub const fn new(handler: T, backfill_sync: P) -> Self {
Self { handler, backfill_sync }
}

/// Returns the handler
Expand All @@ -68,34 +68,34 @@ where

// This loop polls the components
//
// 1. Polls the pipeline to completion, if active.
// 1. Polls the backfill sync to completion, if active.
// 2. Advances the chain by polling the handler.
'outer: loop {
// try to poll the pipeline to completion, if active
match this.pipeline.poll(cx) {
Poll::Ready(pipeline_event) => match pipeline_event {
// try to poll the backfill sync to completion, if active
match this.backfill_sync.poll(cx) {
Poll::Ready(backfill_sync_event) => match backfill_sync_event {
BackfillEvent::Idle => {}
BackfillEvent::Started(_) => {
// notify handler that pipeline started
this.handler.on_event(FromOrchestrator::PipelineStarted);
return Poll::Ready(ChainEvent::PipelineStarted);
// notify handler that backfill sync started
this.handler.on_event(FromOrchestrator::BackfillSyncStarted);
return Poll::Ready(ChainEvent::BackfillSyncStarted);
}
BackfillEvent::Finished(res) => {
return match res {
Ok(event) => {
tracing::debug!(?event, "pipeline finished");
// notify handler that pipeline finished
this.handler.on_event(FromOrchestrator::PipelineFinished);
Poll::Ready(ChainEvent::PipelineFinished)
tracing::debug!(?event, "backfill sync finished");
// notify handler that backfill sync finished
this.handler.on_event(FromOrchestrator::BackfillSyncFinished);
Poll::Ready(ChainEvent::BackfillSyncFinished)
}
Err(err) => {
tracing::error!( %err, "pipeline failed");
tracing::error!( %err, "backfill sync failed");
Poll::Ready(ChainEvent::FatalError)
}
}
}
BackfillEvent::TaskDropped(err) => {
tracing::error!( %err, "pipeline task dropped");
tracing::error!( %err, "backfill sync task dropped");
return Poll::Ready(ChainEvent::FatalError);
}
},
Expand All @@ -106,9 +106,9 @@ where
match this.handler.poll(cx) {
Poll::Ready(handler_event) => {
match handler_event {
HandlerEvent::Pipeline(target) => {
// trigger pipeline and start polling it
this.pipeline.on_action(BackfillAction::Start(target));
HandlerEvent::BackfillSync(target) => {
// trigger backfill sync and start polling it
this.backfill_sync.on_action(BackfillAction::Start(target));
continue 'outer
}
HandlerEvent::Event(ev) => {
Expand Down Expand Up @@ -153,10 +153,10 @@ enum SyncMode {
/// These are meant to be used for observability and debugging purposes.
#[derive(Debug)]
pub enum ChainEvent<T> {
/// Pipeline sync started
PipelineStarted,
/// Pipeline sync finished
PipelineFinished,
/// Backfill sync started
BackfillSyncStarted,
/// Backfill sync finished
BackfillSyncFinished,
/// Fatal error
FatalError,
/// Event emitted by the handler
Expand All @@ -180,35 +180,35 @@ pub trait ChainHandler: Send + Sync {
/// Events/Requests that the [`ChainHandler`] can emit to the [`ChainOrchestrator`].
#[derive(Clone, Debug)]
pub enum HandlerEvent<T> {
/// Request to start a pipeline sync
Pipeline(PipelineTarget),
/// Request to start a backfill sync
BackfillSync(PipelineTarget),
/// Other event emitted by the handler
Event(T),
}

/// Internal events issued by the [`ChainOrchestrator`].
#[derive(Clone, Debug)]
pub enum FromOrchestrator {
/// Invoked when pipeline sync finished
PipelineFinished,
/// Invoked when pipeline started
PipelineStarted,
/// Invoked when backfill sync finished
BackfillSyncFinished,
/// Invoked when backfill sync started
BackfillSyncStarted,
}

/// Represents the state of the chain.
#[derive(Clone, Copy, PartialEq, Eq, Default, Debug)]
pub enum OrchestratorState {
/// Orchestrator has exclusive write access to the database.
PipelineActive,
BackfillSyncActive,
/// Node is actively processing the chain.
#[default]
Idle,
}

impl OrchestratorState {
/// Returns `true` if the state is [`OrchestratorState::PipelineActive`].
pub const fn is_pipeline_active(&self) -> bool {
matches!(self, Self::PipelineActive)
/// Returns `true` if the state is [`OrchestratorState::BackfillSyncActive`].
pub const fn is_backfill_sync_active(&self) -> bool {
matches!(self, Self::BackfillSyncActive)
}

/// Returns `true` if the state is [`OrchestratorState::Idle`].
Expand Down
8 changes: 4 additions & 4 deletions crates/engine/tree/src/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use tracing::trace;
/// A trait that can download blocks on demand.
pub trait BlockDownloader: Send + Sync {
/// Handle an action.
fn on_action(&mut self, event: DownloadAction);
fn on_action(&mut self, action: DownloadAction);

/// Advance in progress requests if any
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<DownloadOutcome>;
Expand Down Expand Up @@ -65,7 +65,7 @@ where
Client: HeadersClient + BodiesClient + Clone + Unpin + 'static,
{
/// Create a new instance
pub(crate) fn new(client: Client, consensus: Arc<dyn Consensus>) -> Self {
pub fn new(client: Client, consensus: Arc<dyn Consensus>) -> Self {
Self {
full_block_client: FullBlockClient::new(client, consensus),
inflight_full_block_requests: Vec::new(),
Expand Down Expand Up @@ -154,8 +154,8 @@ where
Client: HeadersClient + BodiesClient + Clone + Unpin + 'static,
{
/// Handles incoming download actions.
fn on_action(&mut self, event: DownloadAction) {
match event {
fn on_action(&mut self, action: DownloadAction) {
match action {
DownloadAction::Clear => self.clear(),
DownloadAction::Download(request) => self.download(request),
}
Expand Down
6 changes: 3 additions & 3 deletions crates/engine/tree/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ where
RequestHandlerEvent::Idle => break,
RequestHandlerEvent::HandlerEvent(ev) => {
return match ev {
HandlerEvent::Pipeline(target) => {
// bubble up pipeline request
HandlerEvent::BackfillSync(target) => {
// bubble up backfill sync request request
self.downloader.on_action(DownloadAction::Clear);
Poll::Ready(HandlerEvent::Pipeline(target))
Poll::Ready(HandlerEvent::BackfillSync(target))
}
HandlerEvent::Event(ev) => {
// bubble up the event
Expand Down

0 comments on commit 158377f

Please sign in to comment.