Skip to content

Commit

Permalink
feat(migrate-from-gcs): drop progress id
Browse files Browse the repository at this point in the history
Keep it simple. Avoids a bug where subsequent bundles skip all
payloads.
  • Loading branch information
alextes committed Oct 16, 2023
1 parent 88c9065 commit 05baedd
Showing 1 changed file with 13 additions and 49 deletions.
62 changes: 13 additions & 49 deletions src/bin/migrate-from-gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,42 +18,32 @@ use object_store::{
use serde::Serialize;
use tokio::{task::spawn_blocking, time::interval};
use tokio_util::io::{StreamReader, SyncIoBridge};
use tracing::{debug, info, trace};
use tracing::{debug, info};

const PROGRESS_FILE_PATH: &str = "progress.json";

fn read_progress() -> anyhow::Result<Option<(String, String)>> {
fn read_progress() -> anyhow::Result<Option<String>> {
let progress_file_path = Path::new(PROGRESS_FILE_PATH);
if !progress_file_path.exists() {
// No progress file found, returning empty string
info!("no progress file found");
debug!("no progress file found");
return Ok(None);
}
let mut file = File::open(progress_file_path)?;
let mut last_file = String::new();
file.read_to_string(&mut last_file)?;
let mut iter = last_file.split(':');
let progress = (
iter.next().unwrap().to_string(),
iter.next().unwrap().to_string(),
);
info!(last_file = %progress.0, progress_id = %progress.1, "found progress file");
Ok(Some(progress))
debug!(last_file, "found progress file");
Ok(Some(last_file))
}

fn write_progress(last_file: &ObjectMeta, payload_id: &str) -> anyhow::Result<()> {
info!(last_file = %last_file.location, payload_id, "writing progress");
fn write_progress(last_file: &ObjectMeta) -> anyhow::Result<()> {
info!(last_file = %last_file.location, "writing progress");
let mut file = File::create(Path::new(PROGRESS_FILE_PATH))?;
let progress = format!("{}:{}", last_file.location.to_string(), payload_id);
let progress = format!("{}", last_file.location.to_string());

Check failure on line 42 in src/bin/migrate-from-gcs.rs

View workflow job for this annotation

GitHub Actions / Lints

useless use of `format!`

Check failure on line 42 in src/bin/migrate-from-gcs.rs

View workflow job for this annotation

GitHub Actions / Lints

`to_string` applied to a type that implements `Display` in `format!` args
file.write_all(progress.as_bytes())?;
Ok(())
}

fn cleanup_last_file() -> anyhow::Result<()> {
std::fs::remove_file(PROGRESS_FILE_PATH)?;
Ok(())
}

fn print_migration_rate(payloads_migrated: u64, duration_secs: u64) {
let rate = payloads_migrated as f64 / duration_secs as f64;
info!("migration rate: {:.2} payloads per second", rate);
Expand Down Expand Up @@ -169,8 +159,8 @@ async fn main() -> anyhow::Result<()> {

let progress = read_progress()?;

if let Some((last_file, last_id)) = progress.as_ref() {
info!(last_file = %last_file, last_id, "resuming migration");
if let Some(last_file) = progress.as_ref() {
info!(last_file = %last_file, "resuming migration");
day_bundle_metas.retain(|file| file.location.to_string() >= *last_file);
} else {
info!("starting migration from scratch");
Expand Down Expand Up @@ -228,27 +218,7 @@ async fn main() -> anyhow::Result<()> {

const CONCURRENT_PUT_LIMIT: usize = 32;

// Skip payloads that have already been processed.
decoded_rx
.skip_while(|payload| {
match progress.as_ref() {
// If there was previous progress
Some((_last_file, last_id)) => {
// And the current payload matches our last progress, process remaining payloads in
// the stream.
if payload.id == *last_id {
debug!(payload_id = %payload.id, "found last processed payload");
futures::future::ready(false)
} else {
// Otherwise, skip this one.
trace!(payload_id = %payload.id, "skipping payload");
futures::future::ready(true)
}
}
// If there was no previous progress (first run), process all payloads in the stream.
None => futures::future::ready(false),
}
})
.map(Ok)
.try_for_each_concurrent(CONCURRENT_PUT_LIMIT, |payload| async move {
let block_hash = payload.block_hash.clone();
Expand Down Expand Up @@ -292,9 +262,6 @@ async fn main() -> anyhow::Result<()> {

print_migration_rate(payloads_migrated_count, elapsed);

// As we process concurrently on a sudden shut down, we may lose payloads we
// processed before this one by skipping over them when we resume.
write_progress(&object_meta, &payload_id)?;
}


Expand All @@ -305,13 +272,10 @@ async fn main() -> anyhow::Result<()> {
.await?;

handle.await?;
}

// Migration complete, clean up the progress file
if let Some((last_file, _row)) = progress {
if last_file == day_bundle_metas.last().unwrap().location.to_string() {
cleanup_last_file()?;
}
// As we process concurrently on a sudden shut down, we may lose payloads we
// processed before this one by skipping over them when we resume.
write_progress(&object_meta)?;

Check failure on line 278 in src/bin/migrate-from-gcs.rs

View workflow job for this annotation

GitHub Actions / Lints

this expression creates a reference which is immediately dereferenced by the compiler
}

Ok(())
Expand Down

0 comments on commit 05baedd

Please sign in to comment.