Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Record Batch support for delta log stats_parsed (#435) #454

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 187 additions & 2 deletions rust/src/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@

#![allow(non_snake_case, non_camel_case_types)]

use super::schema::*;
use arrow::array::ArrayRef;
use arrow::array::Int64Array;
use arrow::datatypes::Field;
use arrow::record_batch::RecordBatch;
use parquet::record::{ListAccessor, MapAccessor, RowAccessor};
use percent_encoding::percent_decode;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
use std::collections::hash_map::Entry;
use std::collections::HashMap;

use super::schema::*;
use std::sync::Arc;

/// Error returned when an invalid Delta log action is encountered.
#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -150,6 +155,21 @@ pub struct StatsParsed {
pub null_count: HashMap<String, DeltaDataTypeLong>,
}

/// RecordBatch representation of the stats from parquet
#[derive(Debug)]
pub struct StatRecordBatch {
/// Number of records in the file associated with the log action.
pub num_records: DeltaDataTypeLong,

// start of per column stats
/// Contains a value smaller than all values present in the file for all columns.
pub min_values: RecordBatch,
/// Contains a value larger than all values present in the file for all columns.
pub max_values: RecordBatch,
/// The number of null values for all columns.
pub null_counts: RecordBatch,
}

/// Delta log action that describes a parquet data file that is part of the table.
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -302,6 +322,124 @@ impl Add {
.map_or(Ok(None), |s| serde_json::from_str(s))
}

fn get_stat_row_rb(
&self,
row: &parquet::record::Row,
cols: &[String],
) -> arrow::error::Result<RecordBatch> {
let mut stat_col_map: HashMap<&String, Vec<Option<String>>> = HashMap::new();
for col in cols {
let data: Vec<Option<String>> = vec![];
stat_col_map.insert(col, data);
}
for (_, (name, value)) in row.get_column_iter().enumerate() {
for col in cols {
if name == col {
match stat_col_map.entry(name) {
Entry::Occupied(mut e) => {
let val = value.to_string(); // all values of type String for now
e.get_mut().push(Some(val));
}
Entry::Vacant(_) => {}
}
}
}
}
// now we have column name and corresponding vector inside a hashmap
// we can create a record batch out of it
let mut rb_vec: Vec<(&str, ArrayRef)> = vec![];
for col in cols {
match stat_col_map.entry(col) {
Entry::Occupied(e) => {
// TODO: i64s only for now. should support all possible column types
let my_vec: Vec<i64> = e
.get()
.iter()
.map(|v| v.as_ref().unwrap().parse::<i64>().unwrap())
.collect::<Vec<_>>();
let a: ArrayRef = Arc::new(Int64Array::from(my_vec));
rb_vec.push((col.as_str(), a))
}
Entry::Vacant(_) => {}
}
}
RecordBatch::try_from_iter(rb_vec)
}

/// The stats_parsed can be converted into the record batches and the original mem-heavy data structure
/// can eventually be eliminated entirely.
pub fn get_stats_as_record_batch(
&self,
batch_columns: &[String],
) -> Result<Option<StatRecordBatch>, parquet::errors::ParquetError> {
self.stats_parsed.as_ref().map_or(Ok(None), |record| {
let mut min_val_vec = vec![];
let mut max_val_vec = vec![];
let mut num_records = 0;
for (i, (name, _)) in record.get_column_iter().enumerate() {
match name.as_str() {
"numRecords" => match record.get_long(i) {
Ok(v) => {
num_records = v;
}
_ => {
log::error!("Expect type of stats_parsed field numRecords to be long, got: {}", record);
}
}
"minValues" => match record.get_group(i) {
Ok(row) => {
min_val_vec.push(
self.get_stat_row_rb(row, batch_columns).unwrap()
);
}
_ => {
log::error!("Expect type of stats_parsed field minRecords to be struct, got: {}", record);
}
}
"maxValues" => match record.get_group(i) {
Ok(row) => {
max_val_vec.push(
self.get_stat_row_rb(row, batch_columns).unwrap()
);
}
_ => {
log::error!("Expect type of stats_parsed field maxRecords to be struct, got: {}", record);
}
}
_ => {
log::warn!(
"Unexpected field name `{}` for stats_parsed: {:?}",
name,
record,
);
}
}
}

let schema = Arc::new(self.new_schema(batch_columns));
let stats = StatRecordBatch{
num_records,
min_values: RecordBatch::concat(&schema, &min_val_vec).unwrap(),
max_values: RecordBatch::concat(&schema, &max_val_vec).unwrap(),
null_counts: RecordBatch::new_empty(schema)
};
Ok(Some(stats))
})
}

/// Creates the schema for the Record Batch for the columns provided
fn new_schema(&self, batch_columns: &[String]) -> arrow::datatypes::Schema {
let mut fields: Vec<Field> = vec![];
for col_name in batch_columns {
fields.push(Field::new(
col_name,
arrow::datatypes::DataType::Int64,
false,
))
}
arrow::datatypes::Schema::new(fields)
}

/// Returns the composite HashMap representation of stats contained in the action if present.
/// Since stats are defined as optional in the protocol, this may be None.
pub fn get_stats_parsed(&self) -> Result<Option<StatsParsed>, parquet::errors::ParquetError> {
Expand Down Expand Up @@ -892,6 +1030,53 @@ mod tests {
assert_eq!(add_action.stats, None);
}

#[test]
fn test_load_table_stats_as_record_batch() {
let path = "./tests/data/simple_table_with_stats_parsed_optimized/_delta_log/00000000000000000003.checkpoint.parquet";
let preader = SerializedFileReader::new(File::open(path).unwrap()).unwrap();
let mut iter = preader.get_row_iter(None).unwrap();
let record = iter.nth(3).unwrap();
let add_record = record.get_group(1).unwrap();
let action = Add::from_parquet_record(&add_record).unwrap();

// Verify normal stats
let stats = action.get_stats().unwrap().unwrap();
assert_eq!(stats.num_records, 10);

// Verify stats rb
let cols = vec!["id".to_string()];
let schema = Arc::new(arrow::datatypes::Schema::new(vec![Field::new(
"id",
arrow::datatypes::DataType::Int64,
false,
)]));

let stats_rb = action.get_stats_as_record_batch(&cols).unwrap().unwrap();
assert_eq!(stats_rb.num_records, 10);

let min_batch = stats_rb.min_values;
let min_arr = Int64Array::from(vec![1]);
let expected_min_batch =
RecordBatch::try_new(schema.clone(), vec![Arc::new(min_arr)]).unwrap();
assert_eq!(min_batch.num_columns(), 1);
assert_eq!(min_batch.num_rows(), 1);
assert_eq!(min_batch, expected_min_batch);

let max_batch = stats_rb.max_values;
let max_arr = Int64Array::from(vec![10]);
let expected_max_batch =
RecordBatch::try_new(schema.clone(), vec![Arc::new(max_arr)]).unwrap();
assert_eq!(max_batch, expected_max_batch);

// TODO
let null_count_batch = stats_rb.null_counts;
assert_eq!(null_count_batch.num_columns(), 1);
let expected_null_count_batch = RecordBatch::new_empty(schema.clone());
assert_eq!(null_count_batch.num_columns(), 1);
assert_eq!(null_count_batch.num_rows(), 0);
assert_eq!(null_count_batch, expected_null_count_batch);
}

#[test]
fn test_load_table_stats() {
let action = Add {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"tableSizeBytes":0,"numFiles":0,"numMetadata":1,"numProtocol":1,"numTransactions":0}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"86813337-feec-4527-886b-384bdd6f7f6a","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1633794687045}}
{"commitInfo":{"timestamp":1633794687150,"userId":"906424503339340","userName":"contact@manishgill.com","operation":"CREATE TABLE","operationParameters":{"isManaged":"false","description":null,"partitionBy":"[]","properties":"{}"},"notebook":{"notebookId":"2167422995023007"},"clusterId":"0604-105630-ajar222","isolationLevel":"SnapshotIsolation","isBlindAppend":true,"operationMetrics":{}}}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"tableSizeBytes":0,"numFiles":0,"numMetadata":1,"numProtocol":1,"numTransactions":0}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"metaData":{"id":"86813337-feec-4527-886b-384bdd6f7f6a","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.checkpoint.writeStatsAsStruct":"true"},"createdTime":1633794687045}}
{"commitInfo":{"timestamp":1633794691673,"userId":"906424503339340","userName":"contact@manishgill.com","operation":"SET TBLPROPERTIES","operationParameters":{"properties":"{\"delta.checkpoint.writeStatsAsStruct\":\"true\"}"},"notebook":{"notebookId":"2167422995023007"},"clusterId":"0604-105630-ajar222","readVersion":0,"isolationLevel":"SnapshotIsolation","isBlindAppend":true,"operationMetrics":{}}}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"tableSizeBytes":4126,"numFiles":8,"numMetadata":1,"numProtocol":1,"numTransactions":0}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{"add":{"path":"part-00000-5d345c6a-525d-4520-a078-12f3357a77f5-c000.snappy.parquet","partitionValues":{},"size":514,"modificationTime":1633794713000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":1},\"maxValues\":{\"id\":1},\"nullCount\":{\"id\":0}}","tags":{"INSERTION_TIME":"1633794713000000","OPTIMIZE_TARGET_SIZE":"268435456"}}}
{"add":{"path":"part-00001-f9dc9a5d-8503-4f3d-a188-8317bba46a58-c000.snappy.parquet","partitionValues":{},"size":514,"modificationTime":1633794713000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":2},\"maxValues\":{\"id\":2},\"nullCount\":{\"id\":0}}","tags":{"INSERTION_TIME":"1633794713000001","OPTIMIZE_TARGET_SIZE":"268435456"}}}
{"add":{"path":"part-00002-930505f3-0d84-4d8c-8595-727f54980600-c000.snappy.parquet","partitionValues":{},"size":514,"modificationTime":1633794713000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":3},\"maxValues\":{\"id\":3},\"nullCount\":{\"id\":0}}","tags":{"INSERTION_TIME":"1633794713000002","OPTIMIZE_TARGET_SIZE":"268435456"}}}
{"add":{"path":"part-00003-5bafc8ea-a7a9-4ce8-8aa2-533924d86f26-c000.snappy.parquet","partitionValues":{},"size":521,"modificationTime":1633794714000,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"id\":4},\"maxValues\":{\"id\":5},\"nullCount\":{\"id\":0}}","tags":{"INSERTION_TIME":"1633794713000003","OPTIMIZE_TARGET_SIZE":"268435456"}}}
{"add":{"path":"part-00004-19ec0ebc-a688-4004-8e65-18c44af19d07-c000.snappy.parquet","partitionValues":{},"size":514,"modificationTime":1633794713000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":6},\"maxValues\":{\"id\":6},\"nullCount\":{\"id\":0}}","tags":{"INSERTION_TIME":"1633794713000004","OPTIMIZE_TARGET_SIZE":"268435456"}}}
{"add":{"path":"part-00005-e1389522-dd44-49e5-9c4f-e8e9760cf2c2-c000.snappy.parquet","partitionValues":{},"size":514,"modificationTime":1633794713000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":7},\"maxValues\":{\"id\":7},\"nullCount\":{\"id\":0}}","tags":{"INSERTION_TIME":"1633794713000005","OPTIMIZE_TARGET_SIZE":"268435456"}}}
{"add":{"path":"part-00006-a928894d-b601-4739-bf99-7ece54e88b8d-c000.snappy.parquet","partitionValues":{},"size":514,"modificationTime":1633794713000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":8},\"maxValues\":{\"id\":8},\"nullCount\":{\"id\":0}}","tags":{"INSERTION_TIME":"1633794713000006","OPTIMIZE_TARGET_SIZE":"268435456"}}}
{"add":{"path":"part-00007-b41961a2-330a-4c20-84ce-ce06c9007223-c000.snappy.parquet","partitionValues":{},"size":521,"modificationTime":1633794713000,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"id\":9},\"maxValues\":{\"id\":10},\"nullCount\":{\"id\":0}}","tags":{"INSERTION_TIME":"1633794713000007","OPTIMIZE_TARGET_SIZE":"268435456"}}}
{"commitInfo":{"timestamp":1633794714030,"userId":"906424503339340","userName":"contact@manishgill.com","operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"notebook":{"notebookId":"2167422995023007"},"clusterId":"0604-105630-ajar222","readVersion":1,"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"8","numOutputBytes":"4126","numOutputRows":"10"}}}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"tableSizeBytes":552,"numFiles":1,"numMetadata":1,"numProtocol":1,"numTransactions":0}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{"remove":{"path":"part-00000-5d345c6a-525d-4520-a078-12f3357a77f5-c000.snappy.parquet","deletionTimestamp":1633796137327,"dataChange":false,"extendedFileMetadata":true,"partitionValues":{},"size":514,"tags":{"INSERTION_TIME":"1633794713000000","OPTIMIZE_TARGET_SIZE":"268435456"}}}
{"remove":{"path":"part-00001-f9dc9a5d-8503-4f3d-a188-8317bba46a58-c000.snappy.parquet","deletionTimestamp":1633796137327,"dataChange":false,"extendedFileMetadata":true,"partitionValues":{},"size":514,"tags":{"INSERTION_TIME":"1633794713000001","OPTIMIZE_TARGET_SIZE":"268435456"}}}
{"remove":{"path":"part-00002-930505f3-0d84-4d8c-8595-727f54980600-c000.snappy.parquet","deletionTimestamp":1633796137327,"dataChange":false,"extendedFileMetadata":true,"partitionValues":{},"size":514,"tags":{"INSERTION_TIME":"1633794713000002","OPTIMIZE_TARGET_SIZE":"268435456"}}}
{"remove":{"path":"part-00003-5bafc8ea-a7a9-4ce8-8aa2-533924d86f26-c000.snappy.parquet","deletionTimestamp":1633796137327,"dataChange":false,"extendedFileMetadata":true,"partitionValues":{},"size":521,"tags":{"INSERTION_TIME":"1633794713000003","OPTIMIZE_TARGET_SIZE":"268435456"}}}
{"remove":{"path":"part-00004-19ec0ebc-a688-4004-8e65-18c44af19d07-c000.snappy.parquet","deletionTimestamp":1633796137327,"dataChange":false,"extendedFileMetadata":true,"partitionValues":{},"size":514,"tags":{"INSERTION_TIME":"1633794713000004","OPTIMIZE_TARGET_SIZE":"268435456"}}}
{"remove":{"path":"part-00005-e1389522-dd44-49e5-9c4f-e8e9760cf2c2-c000.snappy.parquet","deletionTimestamp":1633796137327,"dataChange":false,"extendedFileMetadata":true,"partitionValues":{},"size":514,"tags":{"INSERTION_TIME":"1633794713000005","OPTIMIZE_TARGET_SIZE":"268435456"}}}
{"remove":{"path":"part-00006-a928894d-b601-4739-bf99-7ece54e88b8d-c000.snappy.parquet","deletionTimestamp":1633796137327,"dataChange":false,"extendedFileMetadata":true,"partitionValues":{},"size":514,"tags":{"INSERTION_TIME":"1633794713000006","OPTIMIZE_TARGET_SIZE":"268435456"}}}
{"remove":{"path":"part-00007-b41961a2-330a-4c20-84ce-ce06c9007223-c000.snappy.parquet","deletionTimestamp":1633796137327,"dataChange":false,"extendedFileMetadata":true,"partitionValues":{},"size":521,"tags":{"INSERTION_TIME":"1633794713000007","OPTIMIZE_TARGET_SIZE":"268435456"}}}
{"add":{"path":"part-00000-e5b7d6de-4830-45fd-843d-22dc39b6486a-c000.snappy.parquet","partitionValues":{},"size":552,"modificationTime":1633796137000,"dataChange":false,"stats":"{\"numRecords\":10,\"minValues\":{\"id\":1},\"maxValues\":{\"id\":10},\"nullCount\":{\"id\":0}}","tags":{"INSERTION_TIME":"1633794713000000","OPTIMIZE_TARGET_SIZE":"268435456"}}}
{"commitInfo":{"timestamp":1633796137867,"userId":"906424503339340","userName":"contact@manishgill.com","operation":"OPTIMIZE","operationParameters":{"predicate":"[]","zOrderBy":"[]","batchId":"0","auto":false},"notebook":{"notebookId":"2167422995023007"},"clusterId":"0604-105630-ajar222","readVersion":2,"isolationLevel":"SnapshotIsolation","isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"8","numRemovedBytes":"4126","p25FileSize":"552","minFileSize":"552","numAddedFiles":"1","maxFileSize":"552","p75FileSize":"552","p50FileSize":"552","numAddedBytes":"552"}}}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"version":3,"size":11}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
2 changes: 1 addition & 1 deletion rust/tests/write_exploration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ pub fn create_add(
partition_values: partition_values.to_owned(),
partition_values_parsed: None,

modification_time: modification_time,
modification_time,
data_change: true,

// TODO: calculate additional stats
Expand Down