Skip to content

Commit

Permalink
Introduce require_files for tracking the add files in table state (#594)
Browse files Browse the repository at this point in the history
  • Loading branch information
mosyp authored May 4, 2022
1 parent 8cd0037 commit 812d827
Show file tree
Hide file tree
Showing 4 changed files with 119 additions and 38 deletions.
69 changes: 50 additions & 19 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,21 +429,30 @@ impl Default for DeltaVersion {
}
}

/// configuration options for delta table
/// Configuration options for delta table
#[derive(Debug)]
pub struct DeltaTableConfig {
/// indicates whether our use case requires tracking tombstones.
/// read-only applications never require tombstones. Tombstones
/// Indicates whether our use case requires tracking tombstones.
/// This defaults to `true`
///
/// Read-only applications never require tombstones. Tombstones
/// are only required when writing checkpoints, so even many writers
/// may want to skip them.
/// defaults to true as a safe default.
pub require_tombstones: bool,

/// Indicates whether DeltaTable should track files.
/// This defaults to `true`
///
/// Some append-only applications might have no need of tracking any files.
/// Hence, DeltaTable will be loaded with significant memory reduction.
pub require_files: bool,
}

impl Default for DeltaTableConfig {
fn default() -> Self {
Self {
require_tombstones: true,
require_files: true,
}
}
}
Expand All @@ -455,15 +464,22 @@ pub struct DeltaTableLoadOptions {
pub table_uri: String,
/// backend to access storage system
pub storage_backend: Box<dyn StorageBackend>,
/// indicates whether our use case requires tracking tombstones.
/// read-only applications never require tombstones. Tombstones
/// are only required when writing checkpoints, so even many writers
/// may want to skip them.
/// defaults to true as a safe default.
pub require_tombstones: bool,
/// specify the version we are going to load: a time stamp, a version, or just the newest
/// available version
pub version: DeltaVersion,
/// Indicates whether our use case requires tracking tombstones.
/// This defaults to `true`
///
/// Read-only applications never require tombstones. Tombstones
/// are only required when writing checkpoints, so even many writers
/// may want to skip them.
pub require_tombstones: bool,
/// Indicates whether DeltaTable should track files.
/// This defaults to `true`
///
/// Some append-only applications might have no need of tracking any files.
/// Hence, DeltaTable will be loaded with significant memory reduction.
pub require_files: bool,
}

impl DeltaTableLoadOptions {
Expand All @@ -473,6 +489,7 @@ impl DeltaTableLoadOptions {
table_uri: table_uri.to_string(),
storage_backend: storage::get_backend_for_uri(table_uri)?,
require_tombstones: true,
require_files: true,
version: DeltaVersion::default(),
})
}
Expand All @@ -485,20 +502,26 @@ pub struct DeltaTableBuilder {
}

impl DeltaTableBuilder {
/// TODO
/// Creates `DeltaTableBuilder` from table uri
pub fn from_uri(table_uri: &str) -> Result<Self, DeltaTableError> {
Ok(DeltaTableBuilder {
options: DeltaTableLoadOptions::new(table_uri)?,
})
}

/// TODO
/// Sets `require_tombstones=false` to the builder
pub fn without_tombstones(mut self) -> Self {
self.options.require_tombstones = false;
self
}

/// TODO
/// Sets `require_files=false` to the builder
pub fn without_files(mut self) -> Self {
self.options.require_files = false;
self
}

/// Sets `version` to the builder
pub fn with_version(mut self, version: DeltaDataTypeVersion) -> Self {
self.options.version = DeltaVersion::Version(version);
self
Expand Down Expand Up @@ -527,6 +550,7 @@ impl DeltaTableBuilder {
pub async fn load(self) -> Result<DeltaTable, DeltaTableError> {
let config = DeltaTableConfig {
require_tombstones: self.options.require_tombstones,
require_files: self.options.require_files,
};

let mut table = DeltaTable::new(
Expand Down Expand Up @@ -708,15 +732,17 @@ impl DeltaTable {

async fn apply_log(&mut self, version: DeltaDataTypeVersion) -> Result<(), ApplyLogError> {
let new_state = DeltaTableState::from_commit(self, version).await?;
self.state.merge(new_state, self.config.require_tombstones);
self.state.merge(
new_state,
self.config.require_tombstones,
self.config.require_files,
);

Ok(())
}

async fn restore_checkpoint(&mut self, check_point: CheckPoint) -> Result<(), DeltaTableError> {
self.state =
DeltaTableState::from_checkpoint(self, &check_point, self.config.require_tombstones)
.await?;
self.state = DeltaTableState::from_checkpoint(self, &check_point).await?;

Ok(())
}
Expand Down Expand Up @@ -810,7 +836,8 @@ impl DeltaTable {
}

let s = DeltaTableState::from_actions(actions)?;
self.state.merge(s, self.config.require_tombstones);
self.state
.merge(s, self.config.require_tombstones, self.config.require_files);
self.version = new_version;

Ok(())
Expand Down Expand Up @@ -1325,7 +1352,11 @@ impl DeltaTable {
let committed_version = self.try_commit_transaction(&prepared_commit, 0).await?;

let new_state = DeltaTableState::from_commit(self, committed_version).await?;
self.state.merge(new_state, self.config.require_tombstones);
self.state.merge(
new_state,
self.config.require_tombstones,
self.config.require_files,
);

Ok(())
}
Expand Down
38 changes: 26 additions & 12 deletions rust/src/table_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ impl DeltaTableState {
let mut new_state = DeltaTableState::default();
for line in reader.lines() {
let action: action::Action = serde_json::from_str(line?.as_str())?;
new_state.process_action(action, true)?;
new_state.process_action(
action,
table.config.require_tombstones,
table.config.require_files,
)?;
}

Ok(new_state)
Expand All @@ -58,7 +62,7 @@ impl DeltaTableState {
pub fn from_actions(actions: Vec<Action>) -> Result<Self, ApplyLogError> {
let mut new_state = DeltaTableState::default();
for action in actions {
new_state.process_action(action, true)?;
new_state.process_action(action, true, true)?;
}
Ok(new_state)
}
Expand All @@ -67,7 +71,6 @@ impl DeltaTableState {
pub async fn from_checkpoint(
table: &DeltaTable,
check_point: &CheckPoint,
require_tombstones: bool,
) -> Result<Self, DeltaTableError> {
let checkpoint_data_paths = table.get_checkpoint_data_paths(check_point);
// process actions from checkpoint
Expand All @@ -85,7 +88,8 @@ impl DeltaTableState {
for record in preader.get_row_iter(None)? {
new_state.process_action(
action::Action::from_parquet_record(schema, &record)?,
require_tombstones,
table.config.require_tombstones,
table.config.require_files,
)?;
}
}
Expand Down Expand Up @@ -154,14 +158,19 @@ impl DeltaTableState {
self.current_metadata.as_ref()
}

/// merges new state information into our state
pub fn merge(&mut self, mut new_state: DeltaTableState, require_tombstones: bool) {
/// Merges new state information into our state
pub fn merge(
&mut self,
mut new_state: DeltaTableState,
require_tombstones: bool,
require_files: bool,
) {
if !new_state.tombstones.is_empty() {
self.files
.retain(|a| !new_state.tombstones.contains(a.path.as_str()));
}

if require_tombstones {
if require_tombstones && require_files {
new_state.tombstones.into_iter().for_each(|r| {
self.tombstones.insert(r);
});
Expand All @@ -173,7 +182,9 @@ impl DeltaTableState {
}
}

self.files.append(&mut new_state.files);
if require_files {
self.files.append(&mut new_state.files);
}

if new_state.min_reader_version > 0 {
self.min_reader_version = new_state.min_reader_version;
Expand Down Expand Up @@ -206,14 +217,17 @@ impl DeltaTableState {
fn process_action(
&mut self,
action: action::Action,
handle_tombstones: bool,
require_tombstones: bool,
require_files: bool,
) -> Result<(), ApplyLogError> {
match action {
action::Action::add(v) => {
self.files.push(v.path_decoded()?);
if require_files {
self.files.push(v.path_decoded()?);
}
}
action::Action::remove(v) => {
if handle_tombstones {
if require_tombstones && require_files {
let v = v.path_decoded()?;
self.tombstones.insert(v);
}
Expand Down Expand Up @@ -280,7 +294,7 @@ mod tests {
last_updated: Some(0),
});

let _ = state.process_action(txn_action, false).unwrap();
let _ = state.process_action(txn_action, false, true).unwrap();

assert_eq!(2, *state.app_transaction_version().get("abc").unwrap());
assert_eq!(1, *state.app_transaction_version().get("xyz").unwrap());
Expand Down
8 changes: 1 addition & 7 deletions rust/src/writer/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,13 +542,7 @@ mod tests {
options: HashMap<String, String>,
) -> Result<DeltaTable, DeltaTableError> {
let backend = crate::get_backend_for_uri_with_options(table_uri, options)?;
let mut table = DeltaTable::new(
table_uri,
backend,
crate::DeltaTableConfig {
require_tombstones: true,
},
)?;
let mut table = DeltaTable::new(table_uri, backend, crate::DeltaTableConfig::default())?;
table.load().await?;
Ok(table)
}
Expand Down
42 changes: 42 additions & 0 deletions rust/tests/read_delta_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,48 @@ async fn read_delta_table_ignoring_tombstones() {
);
}

#[tokio::test]
async fn read_delta_table_ignoring_files() {
let table = DeltaTableBuilder::from_uri("./tests/data/delta-0.8.0")
.unwrap()
.without_files()
.load()
.await
.unwrap();

assert!(table.get_files().is_empty(), "files should be empty");
assert!(
table.get_tombstones().next().is_none(),
"tombstones should be empty"
);
}

#[tokio::test]
async fn read_delta_table_with_ignoring_files_on_apply_log() {
let mut table = DeltaTableBuilder::from_uri("./tests/data/delta-0.8.0")
.unwrap()
.with_version(0)
.without_files()
.load()
.await
.unwrap();

assert_eq!(table.version, 0);
assert!(table.get_files().is_empty(), "files should be empty");
assert!(
table.get_tombstones().next().is_none(),
"tombstones should be empty"
);

table.update().await.unwrap();
assert_eq!(table.version, 1);
assert!(table.get_files().is_empty(), "files should be empty");
assert!(
table.get_tombstones().next().is_none(),
"tombstones should be empty"
);
}

#[tokio::test]
async fn read_delta_2_0_table_with_version() {
let mut table = deltalake::open_table_with_version("./tests/data/delta-0.2.0", 0)
Expand Down

0 comments on commit 812d827

Please sign in to comment.