Skip to content

Commit

Permalink
perf: batch json decode checkpoint actions when writing to parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
alexwilcoxson-rel committed Nov 8, 2024
1 parent bba9d8d commit 12abf00
Showing 1 changed file with 10 additions and 6 deletions.
16 changes: 10 additions & 6 deletions crates/core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,18 +353,22 @@ fn parquet_bytes_from_state(
// Count of actions
let mut total_actions = 0;

for j in jsons {
let buf = serde_json::to_string(&j?).unwrap();
let _ = decoder.decode(buf.as_bytes())?;

total_actions += 1;
let span = tracing::debug_span!("serialize_checkpoint").entered();
for chunk in &jsons.chunks(CHECKPOINT_RECORD_BATCH_SIZE) {
let mut buf = Vec::new();
for j in chunk {
serde_json::to_writer(&mut buf, &j?)?;
total_actions += 1;
}
let _ = decoder.decode(&buf)?;
while let Some(batch) = decoder.flush()? {
writer.write(&batch)?;
}
}
drop(span);

let _ = writer.close()?;
debug!("Finished writing checkpoint parquet buffer.");
debug!(total_actions, "Finished writing checkpoint parquet buffer.");

let checkpoint = CheckPointBuilder::new(state.version(), total_actions)
.with_size_in_bytes(bytes.len() as i64)
Expand Down

0 comments on commit 12abf00

Please sign in to comment.