diff --git a/crates/sui-indexer/src/config.rs b/crates/sui-indexer/src/config.rs index d96c719ea10b8..648f9eba06baf 100644 --- a/crates/sui-indexer/src/config.rs +++ b/crates/sui-indexer/src/config.rs @@ -143,6 +143,27 @@ impl Default for IngestionConfig { } } +#[derive(Args, Debug, Clone)] +pub struct SqlBackFillConfig { + /// Maximum number of concurrent tasks to run. + #[arg( + long, + default_value_t = Self::DEFAULT_MAX_CONCURRENCY, + )] + pub max_concurrency: usize, + /// Number of checkpoints to backfill in a single SQL command. + #[arg( + long, + default_value_t = Self::DEFAULT_CHUNK_SIZE, + )] + pub chunk_size: usize, +} + +impl SqlBackFillConfig { + const DEFAULT_MAX_CONCURRENCY: usize = 10; + const DEFAULT_CHUNK_SIZE: usize = 1000; +} + #[derive(Subcommand, Clone, Debug)] pub enum Command { Indexer { @@ -177,6 +198,8 @@ pub enum Command { checkpoint_column_name: String, first_checkpoint: u64, last_checkpoint: u64, + #[command(flatten)] + backfill_config: SqlBackFillConfig, }, } diff --git a/crates/sui-indexer/src/main.rs b/crates/sui-indexer/src/main.rs index 1c1523efbbb31..038c0f3c8af0a 100644 --- a/crates/sui-indexer/src/main.rs +++ b/crates/sui-indexer/src/main.rs @@ -79,6 +79,7 @@ async fn main() -> anyhow::Result<()> { checkpoint_column_name, first_checkpoint, last_checkpoint, + backfill_config, } => { run_sql_backfill( &sql, @@ -86,6 +87,7 @@ async fn main() -> anyhow::Result<()> { first_checkpoint, last_checkpoint, pool, + backfill_config, ) .await; } diff --git a/crates/sui-indexer/src/sql_backfill.rs b/crates/sui-indexer/src/sql_backfill.rs index a594e19d2be91..ea8103b6a2539 100644 --- a/crates/sui-indexer/src/sql_backfill.rs +++ b/crates/sui-indexer/src/sql_backfill.rs @@ -1,13 +1,14 @@ // Copyright (c) Mysten Labs, Inc. // SPDX-License-Identifier: Apache-2.0 +use crate::config::SqlBackFillConfig; use crate::database::ConnectionPool; use diesel_async::RunQueryDsl; use futures::{stream, StreamExt}; +use std::collections::BTreeSet; +use std::sync::Arc; use std::time::Instant; - -const CHUNK_SIZE: u64 = 10000; -const MAX_CONCURRENCY: usize = 100; +use tokio::sync::Mutex; pub async fn run_sql_backfill( sql: &str, @@ -15,24 +16,40 @@ pub async fn run_sql_backfill( first_checkpoint: u64, last_checkpoint: u64, pool: ConnectionPool, + backfill_config: SqlBackFillConfig, ) { let cur_time = Instant::now(); + // Keeps track of the checkpoint ranges (using starting checkpoint number) + // that are in progress. + let in_progress = Arc::new(Mutex::new(BTreeSet::new())); let chunks: Vec<(u64, u64)> = (first_checkpoint..=last_checkpoint) - .step_by(CHUNK_SIZE as usize) + .step_by(backfill_config.chunk_size) .map(|chunk_start| { - let chunk_end = std::cmp::min(chunk_start + CHUNK_SIZE - 1, last_checkpoint); + let chunk_end = std::cmp::min( + chunk_start + backfill_config.chunk_size as u64 - 1, + last_checkpoint, + ); (chunk_start, chunk_end) }) .collect(); stream::iter(chunks) - .for_each_concurrent(MAX_CONCURRENCY, |(start_id, end_id)| { + .for_each_concurrent(backfill_config.max_concurrency, |(start_id, end_id)| { let pool_clone = pool.clone(); // Clone the pool for async operation + let in_progress_clone = in_progress.clone(); async move { + in_progress_clone.lock().await.insert(start_id); // Run the copy in a batch and add a delay backfill_data_batch(sql, checkpoint_column_name, start_id, end_id, pool_clone) .await; - println!("Finished checkpoint range: {} - {}", start_id, end_id); + println!("Finished checkpoint range: {} - {}.", start_id, end_id); + in_progress_clone.lock().await.remove(&start_id); + let cur_min_in_progress = in_progress_clone.lock().await.iter().next().cloned(); + println!( + "Minimum checkpoint number still in progress: {:?}.\ + If the binary ever fails, you can restart from this checkpoint", + cur_min_in_progress + ); } }) .await; @@ -49,7 +66,7 @@ async fn backfill_data_batch( let mut conn = pool.get().await.unwrap(); let query = format!( - "{} WHERE {} BETWEEN {} AND {}", + "{} WHERE {} BETWEEN {} AND {} ON CONFLICT DO NOTHING", sql, checkpoint_column_name, first_checkpoint, last_checkpoint );