From 05baedd1b10fc58062c9b43f66584b91904f7ae3 Mon Sep 17 00:00:00 2001 From: Alexander Tesfamichael Date: Mon, 16 Oct 2023 11:42:19 +0200 Subject: [PATCH] feat(migrate-from-gcs): drop progress id Keep it simple. Avoids a bug where subsequent bundles skip all payloads. --- src/bin/migrate-from-gcs.rs | 62 ++++++++----------------------------- 1 file changed, 13 insertions(+), 49 deletions(-) diff --git a/src/bin/migrate-from-gcs.rs b/src/bin/migrate-from-gcs.rs index 27267a7..cf831eb 100644 --- a/src/bin/migrate-from-gcs.rs +++ b/src/bin/migrate-from-gcs.rs @@ -18,42 +18,32 @@ use object_store::{ use serde::Serialize; use tokio::{task::spawn_blocking, time::interval}; use tokio_util::io::{StreamReader, SyncIoBridge}; -use tracing::{debug, info, trace}; +use tracing::{debug, info}; const PROGRESS_FILE_PATH: &str = "progress.json"; -fn read_progress() -> anyhow::Result> { +fn read_progress() -> anyhow::Result> { let progress_file_path = Path::new(PROGRESS_FILE_PATH); if !progress_file_path.exists() { // No progress file found, returning empty string - info!("no progress file found"); + debug!("no progress file found"); return Ok(None); } let mut file = File::open(progress_file_path)?; let mut last_file = String::new(); file.read_to_string(&mut last_file)?; - let mut iter = last_file.split(':'); - let progress = ( - iter.next().unwrap().to_string(), - iter.next().unwrap().to_string(), - ); - info!(last_file = %progress.0, progress_id = %progress.1, "found progress file"); - Ok(Some(progress)) + debug!(last_file, "found progress file"); + Ok(Some(last_file)) } -fn write_progress(last_file: &ObjectMeta, payload_id: &str) -> anyhow::Result<()> { - info!(last_file = %last_file.location, payload_id, "writing progress"); +fn write_progress(last_file: &ObjectMeta) -> anyhow::Result<()> { + info!(last_file = %last_file.location, "writing progress"); let mut file = File::create(Path::new(PROGRESS_FILE_PATH))?; - let progress = format!("{}:{}", last_file.location.to_string(), payload_id); + let progress = format!("{}", last_file.location.to_string()); file.write_all(progress.as_bytes())?; Ok(()) } -fn cleanup_last_file() -> anyhow::Result<()> { - std::fs::remove_file(PROGRESS_FILE_PATH)?; - Ok(()) -} - fn print_migration_rate(payloads_migrated: u64, duration_secs: u64) { let rate = payloads_migrated as f64 / duration_secs as f64; info!("migration rate: {:.2} payloads per second", rate); @@ -169,8 +159,8 @@ async fn main() -> anyhow::Result<()> { let progress = read_progress()?; - if let Some((last_file, last_id)) = progress.as_ref() { - info!(last_file = %last_file, last_id, "resuming migration"); + if let Some(last_file) = progress.as_ref() { + info!(last_file = %last_file, "resuming migration"); day_bundle_metas.retain(|file| file.location.to_string() >= *last_file); } else { info!("starting migration from scratch"); @@ -228,27 +218,7 @@ async fn main() -> anyhow::Result<()> { const CONCURRENT_PUT_LIMIT: usize = 32; - // Skip payloads that have already been processed. decoded_rx - .skip_while(|payload| { - match progress.as_ref() { - // If there was previous progress - Some((_last_file, last_id)) => { - // And the current payload matches our last progress, process remaining payloads in - // the stream. - if payload.id == *last_id { - debug!(payload_id = %payload.id, "found last processed payload"); - futures::future::ready(false) - } else { - // Otherwise, skip this one. - trace!(payload_id = %payload.id, "skipping payload"); - futures::future::ready(true) - } - } - // If there was no previous progress (first run), process all payloads in the stream. - None => futures::future::ready(false), - } - }) .map(Ok) .try_for_each_concurrent(CONCURRENT_PUT_LIMIT, |payload| async move { let block_hash = payload.block_hash.clone(); @@ -292,9 +262,6 @@ async fn main() -> anyhow::Result<()> { print_migration_rate(payloads_migrated_count, elapsed); - // As we process concurrently on a sudden shut down, we may lose payloads we - // processed before this one by skipping over them when we resume. - write_progress(&object_meta, &payload_id)?; } @@ -305,13 +272,10 @@ async fn main() -> anyhow::Result<()> { .await?; handle.await?; - } - // Migration complete, clean up the progress file - if let Some((last_file, _row)) = progress { - if last_file == day_bundle_metas.last().unwrap().location.to_string() { - cleanup_last_file()?; - } + // As we process concurrently on a sudden shut down, we may lose payloads we + // processed before this one by skipping over them when we resume. + write_progress(&object_meta)?; } Ok(())