Skip to content

Commit

Permalink
Switch checkpoints over to use the new Decoder API
Browse files Browse the repository at this point in the history
  • Loading branch information
rtyler committed May 12, 2023
1 parent 428be2e commit e94128d
Showing 1 changed file with 4 additions and 17 deletions.
21 changes: 4 additions & 17 deletions rust/src/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use regex::Regex;
use serde_json::Value;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::io::Write;
use std::iter::Iterator;
use std::ops::Add;

Expand Down Expand Up @@ -393,23 +392,11 @@ fn parquet_bytes_from_state(state: &DeltaTableState) -> Result<bytes::Bytes, Che
let mut decoder = ReaderBuilder::new(arrow_schema)
.with_batch_size(CHECKPOINT_RECORD_BATCH_SIZE)
.build_decoder()?;
let jsons: Vec<serde_json::Value> = jsons.map(|r| r.unwrap()).collect();
decoder.serialize(&jsons)?;

let mut buf = vec![];
for res in jsons {
let json = res?;
buf.write_all(serde_json::to_string(&json)?.as_bytes())?;
}
let mut consumed = 0;

loop {
let read_bytes = decoder.decode(&buf)?;
consumed += read_bytes;
if let Some(batch) = decoder.flush()? {
writer.write(&batch)?;
}
if consumed == buf.len() {
break;
}
while let Some(batch) = decoder.flush()? {
writer.write(&batch)?;
}

let _ = writer.close()?;
Expand Down

0 comments on commit e94128d

Please sign in to comment.