Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Checkpoint improvement #29

Merged
merged 4 commits into from
Nov 11, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 81 additions & 9 deletions crates/core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ fn parquet_bytes_from_state(
remove.extended_file_metadata = Some(false);
}
}
let files = state.file_actions().unwrap();
let files = state.file_actions_iter().unwrap();
// protocol
let jsons = std::iter::once(Action::Protocol(Protocol {
min_reader_version: state.protocol().min_reader_version,
Expand Down Expand Up @@ -323,8 +323,8 @@ fn parquet_bytes_from_state(
}))
.map(|a| serde_json::to_value(a).map_err(ProtocolError::from))
// adds
.chain(files.iter().map(|f| {
checkpoint_add_from_state(f, partition_col_data_types.as_slice(), &stats_conversions)
.chain(files.map(|f| {
checkpoint_add_from_state(&f, partition_col_data_types.as_slice(), &stats_conversions)
}));

// Create the arrow schema that represents the Checkpoint parquet file.
Expand All @@ -349,17 +349,28 @@ fn parquet_bytes_from_state(
let mut decoder = ReaderBuilder::new(arrow_schema)
.with_batch_size(CHECKPOINT_RECORD_BATCH_SIZE)
.build_decoder()?;
let jsons = jsons.collect::<Result<Vec<serde_json::Value>, _>>()?;
decoder.serialize(&jsons)?;

while let Some(batch) = decoder.flush()? {
writer.write(&batch)?;
// Count of actions
let mut total_actions = 0;

let span = tracing::debug_span!("serialize_checkpoint").entered();
for chunk in &jsons.chunks(CHECKPOINT_RECORD_BATCH_SIZE) {
let mut buf = Vec::new();
for j in chunk {
serde_json::to_writer(&mut buf, &j?)?;
total_actions += 1;
}
let _ = decoder.decode(&buf)?;
while let Some(batch) = decoder.flush()? {
writer.write(&batch)?;
}
}
drop(span);

let _ = writer.close()?;
debug!("Finished writing checkpoint parquet buffer.");
debug!(total_actions, "Finished writing checkpoint parquet buffer.");

let checkpoint = CheckPointBuilder::new(state.version(), jsons.len() as i64)
let checkpoint = CheckPointBuilder::new(state.version(), total_actions)
.with_size_in_bytes(bytes.len() as i64)
.build();
Ok((checkpoint, bytes::Bytes::from(bytes)))
Expand Down Expand Up @@ -1088,4 +1099,65 @@ mod tests {
}
});
}

#[ignore = "This test is only useful if the batch size has been made small"]
#[tokio::test]
async fn test_checkpoint_large_table() -> crate::DeltaResult<()> {
use crate::writer::test_utils::get_arrow_schema;

let table_schema = get_delta_schema();
let temp_dir = tempfile::tempdir()?;
let table_path = temp_dir.path().to_str().unwrap();
let mut table = DeltaOps::try_from_uri(&table_path)
.await?
.create()
.with_columns(table_schema.fields().cloned())
.await
.unwrap();
assert_eq!(table.version(), 0);
let count = 20;

for _ in 0..count {
table.load().await?;
let batch = RecordBatch::try_new(
Arc::clone(&get_arrow_schema(&None)),
vec![
Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "C"])),
Arc::new(arrow::array::Int32Array::from(vec![0, 20, 10, 100])),
Arc::new(arrow::array::StringArray::from(vec![
"2021-02-02",
"2021-02-03",
"2021-02-02",
"2021-02-04",
])),
],
)
.unwrap();
let _ = DeltaOps(table.clone()).write(vec![batch]).await?;
}

table.load().await?;
assert_eq!(table.version(), count, "Expected {count} transactions");
let pre_checkpoint_actions = table.snapshot()?.file_actions()?;

let before = table.version();
let res = create_checkpoint(&table).await;
assert!(res.is_ok(), "Failed to create the checkpoint! {res:#?}");

let table = crate::open_table(&table_path).await?;
assert_eq!(
before,
table.version(),
"Why on earth did a checkpoint creata version?"
);

let post_checkpoint_actions = table.snapshot()?.file_actions()?;

assert_eq!(
pre_checkpoint_actions.len(),
post_checkpoint_actions.len(),
"The number of actions read from the table after checkpointing is wrong!"
);
Ok(())
}
}
Loading