Skip to content

Commit

Permalink
feat(migrate-from-gcs): trace incomplete bundles
Browse files Browse the repository at this point in the history
  • Loading branch information
alextes committed Oct 16, 2023
1 parent 6c5e502 commit 5a50c86
Showing 1 changed file with 9 additions and 5 deletions.
14 changes: 9 additions & 5 deletions src/bin/migrate-from-gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use object_store::{
use serde::Serialize;
use tokio::{spawn, task::spawn_blocking, time::interval};
use tokio_util::io::{StreamReader, SyncIoBridge};
use tracing::{debug, error, info, warn};
use tracing::{debug, error, info, trace, warn};

const PROGRESS_FILE_PATH: &str = "progress.json";

Expand Down Expand Up @@ -232,6 +232,8 @@ async fn main() -> anyhow::Result<()> {
const SLOT_BUNDLE_BUFFER_SIZE: usize = 8;
let (mut slot_bundle_tx, slot_bundle_rx) = channel(SLOT_BUNDLE_BUFFER_SIZE);

const INCOMPLETE_SLOT_BUNDLE_LIMIT: usize = 8;

spawn(async move {
let mut slot_bundles = HashMap::new();

Expand All @@ -240,10 +242,12 @@ async fn main() -> anyhow::Result<()> {
let entry = slot_bundles.entry(payload_slot).or_insert_with(Vec::new);
entry.push(payload);

// Whenever we have more than 4 slots of payloads in the hashmap, we flush the
// oldest slot to the next stage of the pipeline.
if slot_bundles.len() > 4 {
let oldest_slot = *slot_bundles.keys().min().unwrap();
// Whenever we have more than INCOMPLETE_SLOT_BUNDLE_LIMIT slots of payloads in the
// hashmap, we flush the oldest slot to the next stage of the pipeline.
if slot_bundles.len() > INCOMPLETE_SLOT_BUNDLE_LIMIT {
let keys = slot_bundles.keys().cloned().collect::<Vec<_>>();
trace!("incomplete slot bundles hit limit: {:?}", keys);
let oldest_slot = keys.iter().min().unwrap();
let oldest_slot_payloads = slot_bundles.remove(&oldest_slot).unwrap();

Check failure on line 251 in src/bin/migrate-from-gcs.rs

View workflow job for this annotation

GitHub Actions / Lints

this expression creates a reference which is immediately dereferenced by the compiler
debug!(slot = %oldest_slot, payloads_count = oldest_slot_payloads.len(), "flushing slot bundle");
slot_bundle_tx.send(oldest_slot_payloads).await.unwrap();
Expand Down

0 comments on commit 5a50c86

Please sign in to comment.