Skip to content

Commit

Permalink
perf: batch json decode checkpoint actions when writing to parquet
Browse files Browse the repository at this point in the history
(cherry picked from commit 12abf00)
Signed-off-by: Alex Wilcoxson <alex.wilcoxson@relativity.com>
  • Loading branch information
alexwilcoxson-rel authored and rtyler committed Nov 13, 2024
1 parent 8c7b019 commit 95395cb
Showing 1 changed file with 10 additions and 6 deletions.
16 changes: 10 additions & 6 deletions crates/core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,18 +353,22 @@ fn parquet_bytes_from_state(
// Count of actions
let mut total_actions = 0;

for j in jsons {
let buf = serde_json::to_string(&j?).unwrap();
let _ = decoder.decode(buf.as_bytes())?;

total_actions += 1;
let span = tracing::debug_span!("serialize_checkpoint").entered();
for chunk in &jsons.chunks(CHECKPOINT_RECORD_BATCH_SIZE) {
let mut buf = Vec::new();
for j in chunk {
serde_json::to_writer(&mut buf, &j?)?;
total_actions += 1;
}
let _ = decoder.decode(&buf)?;
while let Some(batch) = decoder.flush()? {
writer.write(&batch)?;
}
}
drop(span);

let _ = writer.close()?;
debug!("Finished writing checkpoint parquet buffer.");
debug!(total_actions, "Finished writing checkpoint parquet buffer.");

let checkpoint = CheckPointBuilder::new(state.version(), total_actions)
.with_size_in_bytes(bytes.len() as i64)
Expand Down

0 comments on commit 95395cb

Please sign in to comment.