Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Indexer] A few improvements to backfill tool #19441

Merged
merged 1 commit into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions crates/sui-indexer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,27 @@ impl Default for IngestionConfig {
}
}

#[derive(Args, Debug, Clone)]
pub struct SqlBackFillConfig {
/// Maximum number of concurrent tasks to run.
#[arg(
long,
default_value_t = Self::DEFAULT_MAX_CONCURRENCY,
)]
pub max_concurrency: usize,
/// Number of checkpoints to backfill in a single SQL command.
#[arg(
long,
default_value_t = Self::DEFAULT_CHUNK_SIZE,
)]
pub chunk_size: usize,
}

impl SqlBackFillConfig {
const DEFAULT_MAX_CONCURRENCY: usize = 10;
const DEFAULT_CHUNK_SIZE: usize = 1000;
}

#[derive(Subcommand, Clone, Debug)]
pub enum Command {
Indexer {
Expand Down Expand Up @@ -177,6 +198,8 @@ pub enum Command {
checkpoint_column_name: String,
first_checkpoint: u64,
last_checkpoint: u64,
#[command(flatten)]
backfill_config: SqlBackFillConfig,
},
}

Expand Down
2 changes: 2 additions & 0 deletions crates/sui-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,15 @@ async fn main() -> anyhow::Result<()> {
checkpoint_column_name,
first_checkpoint,
last_checkpoint,
backfill_config,
} => {
run_sql_backfill(
&sql,
&checkpoint_column_name,
first_checkpoint,
last_checkpoint,
pool,
backfill_config,
)
.await;
}
Expand Down
33 changes: 25 additions & 8 deletions crates/sui-indexer/src/sql_backfill.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,55 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::config::SqlBackFillConfig;
use crate::database::ConnectionPool;
use diesel_async::RunQueryDsl;
use futures::{stream, StreamExt};
use std::collections::BTreeSet;
use std::sync::Arc;
use std::time::Instant;

const CHUNK_SIZE: u64 = 10000;
const MAX_CONCURRENCY: usize = 100;
use tokio::sync::Mutex;

pub async fn run_sql_backfill(
sql: &str,
checkpoint_column_name: &str,
first_checkpoint: u64,
last_checkpoint: u64,
pool: ConnectionPool,
backfill_config: SqlBackFillConfig,
) {
let cur_time = Instant::now();
// Keeps track of the checkpoint ranges (using starting checkpoint number)
// that are in progress.
let in_progress = Arc::new(Mutex::new(BTreeSet::new()));
let chunks: Vec<(u64, u64)> = (first_checkpoint..=last_checkpoint)
.step_by(CHUNK_SIZE as usize)
.step_by(backfill_config.chunk_size)
.map(|chunk_start| {
let chunk_end = std::cmp::min(chunk_start + CHUNK_SIZE - 1, last_checkpoint);
let chunk_end = std::cmp::min(
chunk_start + backfill_config.chunk_size as u64 - 1,
last_checkpoint,
);
(chunk_start, chunk_end)
})
.collect();

stream::iter(chunks)
.for_each_concurrent(MAX_CONCURRENCY, |(start_id, end_id)| {
.for_each_concurrent(backfill_config.max_concurrency, |(start_id, end_id)| {
let pool_clone = pool.clone(); // Clone the pool for async operation
let in_progress_clone = in_progress.clone();
async move {
in_progress_clone.lock().await.insert(start_id);
// Run the copy in a batch and add a delay
backfill_data_batch(sql, checkpoint_column_name, start_id, end_id, pool_clone)
.await;
println!("Finished checkpoint range: {} - {}", start_id, end_id);
println!("Finished checkpoint range: {} - {}.", start_id, end_id);
in_progress_clone.lock().await.remove(&start_id);
let cur_min_in_progress = in_progress_clone.lock().await.iter().next().cloned();
println!(
"Minimum checkpoint number still in progress: {:?}.\
If the binary ever fails, you can restart from this checkpoint",
cur_min_in_progress
);
}
})
.await;
Expand All @@ -49,7 +66,7 @@ async fn backfill_data_batch(
let mut conn = pool.get().await.unwrap();

let query = format!(
"{} WHERE {} BETWEEN {} AND {}",
"{} WHERE {} BETWEEN {} AND {} ON CONFLICT DO NOTHING",
sql, checkpoint_column_name, first_checkpoint, last_checkpoint
);

Expand Down
Loading