Skip to content

Commit

Permalink
fix: dramatically reduce checkpoint memory consumption (delta-io#2956)
Browse files Browse the repository at this point in the history
Both commits describe the specific fixes, but basically our checkpoint
code was collecting too much into memory when it could iterate!
:roller_coaster:

With a test table:

Before

`Maximum resident set size (kbytes): 19964728`

After

`Maximum resident set size (kbytes): 4017132`

Sponsored-by: [Scribd Inc](https://tech.scribd.com)

---------

Signed-off-by: R. Tyler Croy <rtyler@brokenco.de>
(cherry picked from commit c05931a)
  • Loading branch information
rtyler authored and alexwilcoxson-rel committed Nov 7, 2024
1 parent 32dee66 commit cd4c099
Showing 1 changed file with 12 additions and 6 deletions.
18 changes: 12 additions & 6 deletions crates/core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ fn parquet_bytes_from_state(
remove.extended_file_metadata = Some(false);
}
}
let files = state.file_actions().unwrap();
let files = state.file_actions_iter().unwrap();
// protocol
let jsons = std::iter::once(Action::Protocol(Protocol {
min_reader_version: state.protocol().min_reader_version,
Expand Down Expand Up @@ -323,8 +323,8 @@ fn parquet_bytes_from_state(
}))
.map(|a| serde_json::to_value(a).map_err(ProtocolError::from))
// adds
.chain(files.iter().map(|f| {
checkpoint_add_from_state(f, partition_col_data_types.as_slice(), &stats_conversions)
.chain(files.map(|f| {
checkpoint_add_from_state(&f, partition_col_data_types.as_slice(), &stats_conversions)
}));

// Create the arrow schema that represents the Checkpoint parquet file.
Expand All @@ -349,17 +349,23 @@ fn parquet_bytes_from_state(
let mut decoder = ReaderBuilder::new(arrow_schema)
.with_batch_size(CHECKPOINT_RECORD_BATCH_SIZE)
.build_decoder()?;
let jsons = jsons.collect::<Result<Vec<serde_json::Value>, _>>()?;
decoder.serialize(&jsons)?;

// Count of actions
let mut total_actions = 0;

for j in jsons {
let buf = serde_json::to_string(&j?).unwrap();
let _ = decoder.decode(buf.as_bytes())?;
total_actions += 1;
}
while let Some(batch) = decoder.flush()? {
writer.write(&batch)?;
}

let _ = writer.close()?;
debug!("Finished writing checkpoint parquet buffer.");

let checkpoint = CheckPointBuilder::new(state.version(), jsons.len() as i64)
let checkpoint = CheckPointBuilder::new(state.version(), total_actions)
.with_size_in_bytes(bytes.len() as i64)
.build();
Ok((checkpoint, bytes::Bytes::from(bytes)))
Expand Down

0 comments on commit cd4c099

Please sign in to comment.