diff --git a/crates/core/src/protocol/checkpoints.rs b/crates/core/src/protocol/checkpoints.rs index 4d39c90275..606642a3e5 100644 --- a/crates/core/src/protocol/checkpoints.rs +++ b/crates/core/src/protocol/checkpoints.rs @@ -353,18 +353,22 @@ fn parquet_bytes_from_state( // Count of actions let mut total_actions = 0; - for j in jsons { - let buf = serde_json::to_string(&j?).unwrap(); - let _ = decoder.decode(buf.as_bytes())?; - - total_actions += 1; + let span = tracing::debug_span!("serialize_checkpoint").entered(); + for chunk in &jsons.chunks(CHECKPOINT_RECORD_BATCH_SIZE) { + let mut buf = Vec::new(); + for j in chunk { + serde_json::to_writer(&mut buf, &j?)?; + total_actions += 1; + } + let _ = decoder.decode(&buf)?; while let Some(batch) = decoder.flush()? { writer.write(&batch)?; } } + drop(span); let _ = writer.close()?; - debug!("Finished writing checkpoint parquet buffer."); + debug!(total_actions, "Finished writing checkpoint parquet buffer."); let checkpoint = CheckPointBuilder::new(state.version(), total_actions) .with_size_in_bytes(bytes.len() as i64)