Skip to content

Commit

Permalink
feat(migrate-from-gcs): exp backoff for storing
Browse files Browse the repository at this point in the history
  • Loading branch information
alextes committed Oct 16, 2023
1 parent d6f919e commit 859b60e
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 2 deletions.
25 changes: 25 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ anyhow = { version = "1.0.75", features = [
"backtrace",
"std",
], default-features = false }
backoff = { version = "0.4.0", default-features = false, features = ["tokio"] }
bytes = "1.5.0"
chrono = { version = "0.4.31", default-features = false, features = ["std"] }
csv = { version = "1.3.0", default-features = false }
flate2 = { version = "1.0.28", default-features = false, features = [
Expand Down
17 changes: 15 additions & 2 deletions src/bin/migrate-from-gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use std::{
time::{Duration, SystemTime},
};

use backoff::{backoff::Backoff, ExponentialBackoff};
use bytes::Bytes;
use chrono::{DateTime, Datelike, Timelike, Utc};
use flate2::{read::GzDecoder, write::GzEncoder, Compression};
use futures::{channel::mpsc::channel, FutureExt, SinkExt, StreamExt, TryStreamExt};
Expand Down Expand Up @@ -248,7 +250,7 @@ async fn main() -> anyhow::Result<()> {
}
});

const CONCURRENT_PUT_LIMIT: usize = 16;
const CONCURRENT_PUT_LIMIT: usize = 8;

slot_bundle_rx
.map(Ok)
Expand Down Expand Up @@ -282,7 +284,18 @@ async fn main() -> anyhow::Result<()> {
.await
.unwrap();

ovh.put(&path, bytes_gz.into()).await.unwrap();

let mut backoff = ExponentialBackoff::default();
let bytes_gz_shared = Bytes::from(bytes_gz);
while let Err(err) = ovh.put(&path, bytes_gz_shared.clone()).await {
if let Some(wait) = backoff.next_backoff() {
tokio::time::sleep(wait).await;
continue;
}
eprintln!("failed to execute OVH put operation: {}", err);
break;
}


let payloads_migrated_count = payloads_migrated_counter.fetch_add(payloads_count, std::sync::atomic::Ordering::Relaxed);

Expand Down

0 comments on commit 859b60e

Please sign in to comment.