Skip to content

Commit

Permalink
refactor: prefer usage of metadata and protocol fields (#1935)
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap authored Dec 4, 2023
1 parent d441940 commit 0827fea
Show file tree
Hide file tree
Showing 26 changed files with 146 additions and 97 deletions.
5 changes: 3 additions & 2 deletions crates/deltalake-core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ fn get_prune_stats(table: &DeltaTable, column: &Column, get_max: bool) -> Option
}

let data_type = field.data_type().try_into().ok()?;
let partition_columns = &table.get_metadata().ok()?.partition_columns;
let partition_columns = &table.metadata().ok()?.partition_columns;

let values = table.get_state().files().iter().map(|add| {
if partition_columns.contains(&column.name) {
Expand Down Expand Up @@ -310,7 +310,7 @@ impl PruningStatistics for DeltaTable {
///
/// Note: the returned array must contain `num_containers()` rows.
fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
let partition_columns = &self.get_metadata().ok()?.partition_columns;
let partition_columns = &self.metadata().ok()?.partition_columns;

let values = self.get_state().files().iter().map(|add| {
if let Ok(Some(statistics)) = add.get_stats() {
Expand Down Expand Up @@ -1602,6 +1602,7 @@ mod tests {
tags: None,
base_row_id: None,
default_row_commit_version: None,
clustering_provider: None,
};
let schema = ArrowSchema::new(vec![
Field::new("year", ArrowDataType::Int64, true),
Expand Down
1 change: 1 addition & 0 deletions crates/deltalake-core/src/kernel/actions/schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ lazy_static! {
deletion_vector_field(),
StructField::new("baseRowId", DataType::long(), true),
StructField::new("defaultRowCommitVersion", DataType::long(), true),
StructField::new("clusteringProvider", DataType::string(), true),
]))),
true,
);
Expand Down
3 changes: 3 additions & 0 deletions crates/deltalake-core/src/kernel/actions/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,9 @@ pub struct Add {
/// First commit version in which an add action with the same path was committed to the table.
pub default_row_commit_version: Option<i64>,

/// The name of the clustering implementation
pub clustering_provider: Option<String>,

// TODO remove migration filds added to not do too many business logic changes in one PR
/// Partition values stored in raw parquet struct format. In this struct, the column names
/// correspond to the partition columns and the values are stored in their corresponding data
Expand Down
52 changes: 27 additions & 25 deletions crates/deltalake-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ pub fn crate_version() -> &'static str {

#[cfg(test)]
mod tests {
use itertools::Itertools;

use super::*;
use crate::table::PeekCommit;
use std::collections::HashMap;
Expand All @@ -204,10 +206,10 @@ mod tests {
async fn read_delta_2_0_table_without_version() {
let table = crate::open_table("./tests/data/delta-0.2.0").await.unwrap();
assert_eq!(table.version(), 3);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(table.protocol().min_writer_version, 2);
assert_eq!(table.protocol().min_reader_version, 1);
assert_eq!(
table.get_files(),
table.get_files_iter().collect_vec(),
vec![
Path::from("part-00000-cb6b150b-30b8-4662-ad28-ff32ddab96d2-c000.snappy.parquet"),
Path::from("part-00000-7c2deba3-1994-4fb8-bc07-d46c948aa415-c000.snappy.parquet"),
Expand Down Expand Up @@ -241,8 +243,8 @@ mod tests {
table_to_update.update().await.unwrap();

assert_eq!(
table_newest_version.get_files(),
table_to_update.get_files()
table_newest_version.get_files_iter().collect_vec(),
table_to_update.get_files_iter().collect_vec()
);
}
#[tokio::test]
Expand All @@ -251,10 +253,10 @@ mod tests {
.await
.unwrap();
assert_eq!(table.version(), 0);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(table.protocol().min_writer_version, 2);
assert_eq!(table.protocol().min_reader_version, 1);
assert_eq!(
table.get_files(),
table.get_files_iter().collect_vec(),
vec![
Path::from("part-00000-b44fcdb0-8b06-4f3a-8606-f8311a96f6dc-c000.snappy.parquet"),
Path::from("part-00001-185eca06-e017-4dea-ae49-fc48b973e37e-c000.snappy.parquet"),
Expand All @@ -265,10 +267,10 @@ mod tests {
.await
.unwrap();
assert_eq!(table.version(), 2);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(table.protocol().min_writer_version, 2);
assert_eq!(table.protocol().min_reader_version, 1);
assert_eq!(
table.get_files(),
table.get_files_iter().collect_vec(),
vec![
Path::from("part-00000-7c2deba3-1994-4fb8-bc07-d46c948aa415-c000.snappy.parquet"),
Path::from("part-00001-c373a5bd-85f0-4758-815e-7eb62007a15c-c000.snappy.parquet"),
Expand All @@ -279,10 +281,10 @@ mod tests {
.await
.unwrap();
assert_eq!(table.version(), 3);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(table.protocol().min_writer_version, 2);
assert_eq!(table.protocol().min_reader_version, 1);
assert_eq!(
table.get_files(),
table.get_files_iter().collect_vec(),
vec![
Path::from("part-00000-cb6b150b-30b8-4662-ad28-ff32ddab96d2-c000.snappy.parquet"),
Path::from("part-00000-7c2deba3-1994-4fb8-bc07-d46c948aa415-c000.snappy.parquet"),
Expand All @@ -295,10 +297,10 @@ mod tests {
async fn read_delta_8_0_table_without_version() {
let table = crate::open_table("./tests/data/delta-0.8.0").await.unwrap();
assert_eq!(table.version(), 1);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(table.protocol().min_writer_version, 2);
assert_eq!(table.protocol().min_reader_version, 1);
assert_eq!(
table.get_files(),
table.get_files_iter().collect_vec(),
vec![
Path::from("part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet"),
Path::from("part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet")
Expand Down Expand Up @@ -341,21 +343,21 @@ mod tests {
async fn read_delta_8_0_table_with_load_version() {
let mut table = crate::open_table("./tests/data/delta-0.8.0").await.unwrap();
assert_eq!(table.version(), 1);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(table.protocol().min_writer_version, 2);
assert_eq!(table.protocol().min_reader_version, 1);
assert_eq!(
table.get_files(),
table.get_files_iter().collect_vec(),
vec![
Path::from("part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet"),
Path::from("part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet"),
]
);
table.load_version(0).await.unwrap();
assert_eq!(table.version(), 0);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(table.protocol().min_writer_version, 2);
assert_eq!(table.protocol().min_reader_version, 1);
assert_eq!(
table.get_files(),
table.get_files_iter().collect_vec(),
vec![
Path::from("part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet"),
Path::from("part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet"),
Expand Down Expand Up @@ -483,7 +485,7 @@ mod tests {
.unwrap();

assert_eq!(
table.get_files(),
table.get_files_iter().collect_vec(),
vec![
Path::parse(
"x=A%2FA/part-00007-b350e235-2832-45df-9918-6cab4f7578f7.c000.snappy.parquet"
Expand Down Expand Up @@ -683,7 +685,7 @@ mod tests {
.unwrap();
assert_eq!(table.version(), 2);
assert_eq!(
table.get_files(),
table.get_files_iter().collect_vec(),
vec![Path::from(
"part-00000-7444aec4-710a-4a4c-8abe-3323499043e9.c000.snappy.parquet"
),]
Expand Down
3 changes: 2 additions & 1 deletion crates/deltalake-core/src/operations/convert_to_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ mod tests {
storage::config::StorageOptions,
Path,
};
use itertools::Itertools;
use pretty_assertions::assert_eq;
use std::fs;
use tempfile::tempdir;
Expand Down Expand Up @@ -501,7 +502,7 @@ mod tests {
"Testing location: {test_data_from:?}"
);

let mut files = table.get_files();
let mut files = table.get_files_iter().collect_vec();
files.sort();
assert_eq!(
files, expected_paths,
Expand Down
22 changes: 11 additions & 11 deletions crates/deltalake-core/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ mod tests {
.await
.unwrap();
assert_eq!(table.version(), 0);
assert_eq!(table.get_metadata().unwrap().schema, table_schema)
assert_eq!(table.get_schema().unwrap(), &table_schema)
}

#[tokio::test]
Expand All @@ -362,7 +362,7 @@ mod tests {
.await
.unwrap();
assert_eq!(table.version(), 0);
assert_eq!(table.get_metadata().unwrap().schema, table_schema)
assert_eq!(table.get_schema().unwrap(), &table_schema)
}

#[tokio::test]
Expand Down Expand Up @@ -391,14 +391,14 @@ mod tests {
.unwrap();
assert_eq!(table.version(), 0);
assert_eq!(
table.get_min_reader_version(),
table.protocol().min_reader_version,
PROTOCOL.default_reader_version()
);
assert_eq!(
table.get_min_writer_version(),
table.protocol().min_writer_version,
PROTOCOL.default_writer_version()
);
assert_eq!(table.schema().unwrap(), &schema);
assert_eq!(table.get_schema().unwrap(), &schema);

// check we can overwrite default settings via adding actions
let protocol = Protocol {
Expand All @@ -413,8 +413,8 @@ mod tests {
.with_actions(vec![Action::Protocol(protocol)])
.await
.unwrap();
assert_eq!(table.get_min_reader_version(), 0);
assert_eq!(table.get_min_writer_version(), 0);
assert_eq!(table.protocol().min_reader_version, 0);
assert_eq!(table.protocol().min_writer_version, 0);

let table = CreateBuilder::new()
.with_location("memory://")
Expand All @@ -423,7 +423,7 @@ mod tests {
.await
.unwrap();
let append = table
.get_metadata()
.metadata()
.unwrap()
.configuration
.get(DeltaConfigKey::AppendOnly.as_ref())
Expand All @@ -445,7 +445,7 @@ mod tests {
.await
.unwrap();
assert_eq!(table.version(), 0);
let first_id = table.get_metadata().unwrap().id.clone();
let first_id = table.metadata().unwrap().id.clone();

let log_store = table.log_store;

Expand All @@ -464,7 +464,7 @@ mod tests {
.with_save_mode(SaveMode::Ignore)
.await
.unwrap();
assert_eq!(table.get_metadata().unwrap().id, first_id);
assert_eq!(table.metadata().unwrap().id, first_id);

// Check table is overwritten
let table = CreateBuilder::new()
Expand All @@ -473,6 +473,6 @@ mod tests {
.with_save_mode(SaveMode::Overwrite)
.await
.unwrap();
assert_ne!(table.get_metadata().unwrap().id, first_id)
assert_ne!(table.metadata().unwrap().id, first_id)
}
}
12 changes: 6 additions & 6 deletions crates/deltalake-core/src/operations/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,27 +207,27 @@ async fn execute(
let mut actions = vec![];
let protocol = if protocol_downgrade_allowed {
Protocol {
min_reader_version: table.get_min_reader_version(),
min_writer_version: table.get_min_writer_version(),
min_reader_version: table.protocol().min_reader_version,
min_writer_version: table.protocol().min_writer_version,
writer_features: if snapshot.protocol().min_writer_version < 7 {
None
} else {
table.get_writer_features().cloned()
table.protocol().writer_features.clone()
},
reader_features: if snapshot.protocol().min_reader_version < 3 {
None
} else {
table.get_reader_features().cloned()
table.protocol().reader_features.clone()
},
}
} else {
Protocol {
min_reader_version: max(
table.get_min_reader_version(),
table.protocol().min_reader_version,
snapshot.protocol().min_reader_version,
),
min_writer_version: max(
table.get_min_writer_version(),
table.protocol().min_writer_version,
snapshot.protocol().min_writer_version,
),
writer_features: snapshot.protocol().writer_features.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub fn create_add_action(
default_row_commit_version: None,
tags: None,
deletion_vector: None,
clustering_provider: None,
})
}

Expand Down
4 changes: 2 additions & 2 deletions crates/deltalake-core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ mod tests {
.await
.unwrap();
assert_eq!(table.version(), 0);
assert_eq!(table.get_metadata().unwrap().schema, table_schema);
assert_eq!(table.get_schema().unwrap(), &table_schema);
let res = create_checkpoint_for(0, table.get_state(), table.log_store.as_ref()).await;
assert!(res.is_ok());

Expand Down Expand Up @@ -548,7 +548,7 @@ mod tests {
.await
.unwrap();
assert_eq!(table.version(), 0);
assert_eq!(table.get_metadata().unwrap().schema, table_schema);
assert_eq!(table.get_schema().unwrap(), &table_schema);
match create_checkpoint_for(1, table.get_state(), table.log_store.as_ref()).await {
Ok(_) => {
/*
Expand Down
2 changes: 2 additions & 0 deletions crates/deltalake-core/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,7 @@ mod tests {
modification_time: 0,
base_row_id: None,
default_row_commit_version: None,
clustering_provider: None,
};

let stats = action.get_stats().unwrap().unwrap();
Expand Down Expand Up @@ -796,6 +797,7 @@ mod tests {
modification_time: 0,
base_row_id: None,
default_row_commit_version: None,
clustering_provider: None,
};

let stats = action.get_stats().unwrap().unwrap();
Expand Down
1 change: 1 addition & 0 deletions crates/deltalake-core/src/protocol/parquet_read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ impl Add {
base_row_id: None,
default_row_commit_version: None,
tags: None,
clustering_provider: None,
};

for (i, (name, _)) in record.get_column_iter().enumerate() {
Expand Down
1 change: 1 addition & 0 deletions crates/deltalake-core/src/storage/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ mod tests {
deletion_vector: None,
partition_values_parsed: None,
stats_parsed: None,
clustering_provider: None,
};

let meta: ObjectMeta = (&add).try_into().unwrap();
Expand Down
Loading

0 comments on commit 0827fea

Please sign in to comment.