Skip to content

Commit

Permalink
fix(exex): use thresholds in stream backfill (#11478)
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin authored Oct 4, 2024
1 parent 0ce1dd6 commit 227e293
Showing 1 changed file with 9 additions and 2 deletions.
11 changes: 9 additions & 2 deletions crates/exex/exex/src/backfill/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub struct StreamBackfillJob<E, P, T> {
tasks: BackfillTasks<T>,
parallelism: usize,
batch_size: usize,
thresholds: ExecutionStageThresholds,
}

impl<E, P, T> StreamBackfillJob<E, P, T> {
Expand Down Expand Up @@ -124,7 +125,7 @@ where
executor: this.executor.clone(),
provider: this.provider.clone(),
prune_modes: this.prune_modes.clone(),
thresholds: ExecutionStageThresholds::default(),
thresholds: this.thresholds.clone(),
range,
stream_parallelism: this.parallelism,
};
Expand All @@ -150,20 +151,26 @@ impl<E, P> From<SingleBlockBackfillJob<E, P>> for StreamBackfillJob<E, P, Single
tasks: FuturesOrdered::new(),
parallelism: job.stream_parallelism,
batch_size: 1,
thresholds: ExecutionStageThresholds { max_blocks: Some(1), ..Default::default() },
}
}
}

impl<E, P> From<BackfillJob<E, P>> for StreamBackfillJob<E, P, BatchBlockStreamItem> {
fn from(job: BackfillJob<E, P>) -> Self {
let batch_size = job.thresholds.max_blocks.map_or(DEFAULT_BATCH_SIZE, |max| max as usize);
Self {
executor: job.executor,
provider: job.provider,
prune_modes: job.prune_modes,
range: job.range,
tasks: FuturesOrdered::new(),
parallelism: job.stream_parallelism,
batch_size: job.thresholds.max_blocks.map_or(DEFAULT_BATCH_SIZE, |max| max as usize),
batch_size,
thresholds: ExecutionStageThresholds {
max_blocks: Some(batch_size as u64),
..job.thresholds
},
}
}
}
Expand Down

0 comments on commit 227e293

Please sign in to comment.