From 4add802421b5f44e678a60c41807e0263aec4052 Mon Sep 17 00:00:00 2001 From: Alex Wilcoxson Date: Thu, 7 Nov 2024 20:24:30 -0600 Subject: [PATCH] perf: batch json decode checkpoint actions when writing to parquet (cherry picked from commit 12abf0067413d10e59acadeba6c32e67e54eb7d0) Signed-off-by: Alex Wilcoxson --- crates/core/src/protocol/checkpoints.rs | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/crates/core/src/protocol/checkpoints.rs b/crates/core/src/protocol/checkpoints.rs index 4d39c90275..606642a3e5 100644 --- a/crates/core/src/protocol/checkpoints.rs +++ b/crates/core/src/protocol/checkpoints.rs @@ -353,18 +353,22 @@ fn parquet_bytes_from_state( // Count of actions let mut total_actions = 0; - for j in jsons { - let buf = serde_json::to_string(&j?).unwrap(); - let _ = decoder.decode(buf.as_bytes())?; - - total_actions += 1; + let span = tracing::debug_span!("serialize_checkpoint").entered(); + for chunk in &jsons.chunks(CHECKPOINT_RECORD_BATCH_SIZE) { + let mut buf = Vec::new(); + for j in chunk { + serde_json::to_writer(&mut buf, &j?)?; + total_actions += 1; + } + let _ = decoder.decode(&buf)?; while let Some(batch) = decoder.flush()? { writer.write(&batch)?; } } + drop(span); let _ = writer.close()?; - debug!("Finished writing checkpoint parquet buffer."); + debug!(total_actions, "Finished writing checkpoint parquet buffer."); let checkpoint = CheckPointBuilder::new(state.version(), total_actions) .with_size_in_bytes(bytes.len() as i64)