Skip to content

Commit

Permalink
feat: bundle all before store
Browse files Browse the repository at this point in the history
Because some payloads appear out of slot order, we first process the
full day of payloads into slot bundle files. There are too many to keep
in memory. Once this is done, we push all the files to object storage.
  • Loading branch information
alextes committed Oct 27, 2023
1 parent e543c38 commit f7f765f
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 38 deletions.
4 changes: 2 additions & 2 deletions src/bin/migrate-from-gcs/execution_payloads.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::fmt;

use csv::ByteRecord;
use serde::Serialize;
use serde::{Deserialize, Serialize};

use crate::slots::Slot;

#[derive(Serialize)]
#[derive(Deserialize, Serialize)]
pub struct ExecutionPayload {
block_hash: String,
#[serde(skip_serializing)]
Expand Down
110 changes: 76 additions & 34 deletions src/bin/migrate-from-gcs/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ mod progress;
mod slots;

use std::{
collections::{HashMap, VecDeque},
io::Write,
io::{Read, Write},
sync::{atomic::AtomicUsize, Arc, Mutex},
time::{Duration, SystemTime},
};
Expand All @@ -26,11 +25,11 @@ use tokio::{
time::interval,
};
use tokio_util::io::{StreamReader, SyncIoBridge};
use tracing::{debug, error, info, trace, warn};
use tracing::{debug, error, info, warn};

fn print_migration_rate(payloads_migrated: usize, duration_secs: u64) {
let rate = payloads_migrated as f64 / duration_secs as f64;
info!("migration rate: {:.2} payloads per second", rate);
fn print_rate(count: usize, duration_secs: u64) {
let rate = count as f64 / duration_secs as f64;
info!("rate: {:.2} payloads per second", rate);
}

use crate::execution_payloads::ExecutionPayload;
Expand Down Expand Up @@ -66,43 +65,77 @@ fn run_decode_payloads(
})
}

const INCOMPLETE_SLOT_BUNDLE_LIMIT: usize = 8;

// Here we take payloads as we decode them and bundle them by slot.
// Our initial approach used a simple, bundle by slot, when a payload has been inserted for
// a ninth bundle, flush the bundle with the lowest slot number. Unfortunately it appears
// at least bundle 6325428 appears when we're processing bundles from ten thousand slots
// later. This means we can't rely on the slot number to determine the order of the
// bundles. Instead we remember the order we inserted them in, and flush the oldest of
// those.
// Unfortunately it appears the payloads are not always ordered by slot, and so bundles cannot be
// considered complete until the full file is processed. We write all slot bundles to file, and
// append continuously.
fn run_bundle_payloads(
day_bundle_id: String,
mut decoded_rx: Receiver<ExecutionPayload>,
mut slot_bundle_tx: Sender<Vec<ExecutionPayload>>,
) -> JoinHandle<()> {
spawn(async move {
let mut slot_bundles = HashMap::new();
let mut insertion_order = VecDeque::new();
// Create a directory for the bundles, wipe it if it already exists.
let path = format!("bundles_{}", day_bundle_id);
let bundle_dir = std::path::Path::new(&path);
if bundle_dir.exists() {
std::fs::remove_dir_all(bundle_dir).unwrap();
}
std::fs::create_dir(bundle_dir).unwrap();

let mut count = 0;
let start = SystemTime::now();

while let Some(payload) = decoded_rx.next().await {
let payload_slot = payload.slot();
let entry = slot_bundles.entry(payload_slot).or_insert_with(Vec::new);

if entry.is_empty() {
trace!("slot bundle entry empty, bundling new slot");
insertion_order.push_back(payload_slot);
}
// Create or open the slot bundle ndjson file.
let slot_bundle_path = bundle_dir.join(format!("{}.ndjson", payload_slot));
let mut slot_bundle_file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(slot_bundle_path)
.unwrap();

entry.push(payload);
// Write the payload to the slot bundle file.
let payload = serde_json::to_vec(&payload).unwrap();
slot_bundle_file.write_all(&payload).unwrap();
slot_bundle_file.write_all(b"\n").unwrap();

count += 1;

// Whenever we have more than INCOMPLETE_SLOT_BUNDLE_LIMIT slots of payloads in the
// queue, we flush the oldest slot to the next stage of the pipeline.
if insertion_order.len() > INCOMPLETE_SLOT_BUNDLE_LIMIT {
let oldest_slot = insertion_order.pop_front().unwrap(); // Get the oldest slot
let oldest_slot_payloads = slot_bundles.remove(&oldest_slot).unwrap();
debug!(slot = %oldest_slot, payloads_count = oldest_slot_payloads.len(), "slot bundle complete, sending to storage");
slot_bundle_tx.send(oldest_slot_payloads).await.unwrap();
// Every 1000 payloads report on the rate
if count % 8000 == 0 {
let elapsed = start.elapsed().unwrap().as_secs();
print_rate(count, elapsed);
}
}

info!(day_bundle_id, "full day bundle processed to slot bundles");

// Iterate through all created files and send the slot bundles.
let mut slot_bundle_paths = std::fs::read_dir(bundle_dir)
.unwrap()
.map(|res| res.map(|e| e.path()))
.collect::<Result<Vec<_>, std::io::Error>>()
.unwrap();
slot_bundle_paths.sort();

for slot_bundle_path in slot_bundle_paths {
let mut slot_bundle_file = std::fs::File::open(slot_bundle_path).unwrap();
let mut file_str = String::new();
slot_bundle_file.read_to_string(&mut file_str).unwrap();
let slot_bundle = file_str
.lines()
.map(|line| serde_json::from_str::<ExecutionPayload>(line).unwrap())
.collect::<Vec<_>>();

slot_bundle_tx.feed(slot_bundle).await.unwrap();
}
slot_bundle_tx.flush().await.unwrap();

// Remove the bundle directory.
std::fs::remove_dir_all(bundle_dir).unwrap();
})
}

Expand Down Expand Up @@ -179,7 +212,7 @@ async fn store_bundle(
.unwrap()
.as_secs();

print_migration_rate(payloads_migrated_count, elapsed);
print_rate(payloads_migrated_count, elapsed);
}

Ok::<_, anyhow::Error>(())
Expand Down Expand Up @@ -240,17 +273,26 @@ async fn main() -> anyhow::Result<()> {
for object_meta in &day_bundle_metas {
info!(object = %object_meta.location, size_mib=object_meta.size / 1_000_000, "migrating bundle");

let payload_stream = gcs.get(&object_meta.location).await?.into_stream();
// 2023-04-22.csv.gz -> 2023-04-22
let bundle_id = object_meta
.location
.to_string()
.split('.')
.next()
.unwrap()
.to_string();

let payload_day_stream = gcs.get(&object_meta.location).await?.into_stream();

const DECODED_BUFFER_SIZE: usize = 256;
let (decoded_tx, decoded_rx) = channel(DECODED_BUFFER_SIZE);
let (payload_tx, payload_rx) = channel(DECODED_BUFFER_SIZE);

const SLOT_BUNDLE_BUFFER_SIZE: usize = 8;
let (slot_bundle_tx, slot_bundle_rx) = channel(SLOT_BUNDLE_BUFFER_SIZE);

try_join!(
run_decode_payloads(payload_stream, decoded_tx),
run_bundle_payloads(decoded_rx, slot_bundle_tx),
run_decode_payloads(payload_day_stream, payload_tx),
run_bundle_payloads(bundle_id, payload_rx, slot_bundle_tx),
run_store_bundles(slot_bundle_rx),
)?;

Expand Down
4 changes: 2 additions & 2 deletions src/bin/migrate-from-gcs/slots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ use std::fmt;

use chrono::{DateTime, Utc};
use lazy_static::lazy_static;
use serde::Serialize;
use serde::{Deserialize, Serialize};

lazy_static! {
static ref GENESIS_TIMESTAMP: DateTime<Utc> = "2020-12-01T12:00:23Z".parse().unwrap();
}

#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Serialize)]
#[derive(Clone, Copy, Debug, Deserialize, Eq, Hash, PartialEq, Serialize)]
pub struct Slot(pub i32);

impl fmt::Display for Slot {
Expand Down

0 comments on commit f7f765f

Please sign in to comment.