Skip to content

Commit

Permalink
Support no tombstone loading & new table builder API (#445)
Browse files Browse the repository at this point in the history
This allows opting out of tombstone-loading, for scenarios / use cases that don't require them; this applies to all applications that are not interested in writing checkpoints, most notably read-only consumers.

The implementation uses a builder pattern DeltaTableBuilder, which changes the public-facing API.

Co-authored-by: Bruno Bigras <bigras.bruno@gmail.com>
Co-authored-by: Mykhailo Osypov <mykhailo.osypov@gmail.com>
Co-authored-by: Florian Valeye <florian.valeye@gmail.com>
  • Loading branch information
4 people authored Oct 7, 2021
1 parent 773864d commit 8398cb2
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 26 deletions.
192 changes: 169 additions & 23 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ impl DeltaTableState {
}

/// merges new state information into our state
pub fn merge(&mut self, mut new_state: DeltaTableState) {
pub fn merge(&mut self, mut new_state: DeltaTableState, require_tombstones: bool) {
self.files.append(&mut new_state.files);

if !new_state.tombstones.is_empty() {
Expand All @@ -442,7 +442,11 @@ impl DeltaTableState {
self.files
.retain(|a| !new_removals.contains(a.path.as_str()));
}
self.tombstones.append(&mut new_state.tombstones);

if require_tombstones {
self.tombstones.append(&mut new_state.tombstones);
}

if new_state.min_reader_version > 0 {
self.min_reader_version = new_state.min_reader_version;
self.min_writer_version = new_state.min_writer_version;
Expand Down Expand Up @@ -486,12 +490,147 @@ fn extract_rel_path<'a, 'b>(
}
}

/// possible version specifications for loading a delta table
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DeltaVersion {
/// load the newest version
Newest,
/// specify the version to laod
Version(DeltaDataTypeVersion),
/// specify the timestamp in UTC
Timestamp(DateTime<Utc>),
}

impl Default for DeltaVersion {
fn default() -> Self {
DeltaVersion::Newest
}
}

/// configuration options for delta table
#[derive(Debug)]
pub struct DeltaTableConfig {
/// indicates whether our use case requires tracking tombstones.
/// read-only applications never require tombstones. Tombstones
/// are only required when writing checkpoints, so even many writers
/// may want to skip them.
/// defaults to true as a safe default.
pub require_tombstones: bool,
}

impl Default for DeltaTableConfig {
fn default() -> Self {
Self {
require_tombstones: true,
}
}
}

/// Load-time delta table configuration options
#[derive(Debug)]
pub struct DeltaTableLoadOptions {
/// table root uri
pub table_uri: String,
/// backend to access storage system
pub storage_backend: Box<dyn StorageBackend>,
/// indicates whether our use case requires tracking tombstones.
/// read-only applications never require tombstones. Tombstones
/// are only required when writing checkpoints, so even many writers
/// may want to skip them.
/// defaults to true as a safe default.
pub require_tombstones: bool,
/// specify the version we are going to load: a time stamp, a version, or just the newest
/// available version
pub version: DeltaVersion,
}

impl DeltaTableLoadOptions {
/// create default table load options for a table uri
pub fn new(table_uri: &str) -> Result<Self, DeltaTableError> {
Ok(Self {
table_uri: table_uri.to_string(),
storage_backend: storage::get_backend_for_uri(table_uri)?,
require_tombstones: true,
version: DeltaVersion::default(),
})
}
}

/// builder for configuring a delta table load.
#[derive(Debug)]
pub struct DeltaTableBuilder {
options: DeltaTableLoadOptions,
}

impl DeltaTableBuilder {
/// TODO
pub fn from_uri(table_uri: &str) -> Result<Self, DeltaTableError> {
Ok(DeltaTableBuilder {
options: DeltaTableLoadOptions::new(table_uri)?,
})
}

/// TODO
pub fn without_tombstones(mut self) -> Self {
self.options.require_tombstones = false;
self
}

/// TODO
pub fn with_version(mut self, version: DeltaDataTypeVersion) -> Self {
self.options.version = DeltaVersion::Version(version);
self
}

/// specify the timestamp given as ISO-8601/RFC-3339 timestamp
pub fn with_datestring(self, date_string: &str) -> Result<Self, DeltaTableError> {
let datetime =
DateTime::<Utc>::from(DateTime::<FixedOffset>::parse_from_rfc3339(date_string)?);
Ok(self.with_timestamp(datetime))
}

/// specify a timestamp
pub fn with_timestamp(mut self, timestamp: DateTime<Utc>) -> Self {
self.options.version = DeltaVersion::Timestamp(timestamp);
self
}

/// explicitely set a backend (override backend derived from `table_uri`)
pub fn with_storage_backend(mut self, storage: Box<dyn StorageBackend>) -> Self {
self.options.storage_backend = storage;
self
}

/// finally load the table
pub async fn load(self) -> Result<DeltaTable, DeltaTableError> {
let config = DeltaTableConfig {
require_tombstones: self.options.require_tombstones,
};

let mut table = DeltaTable::new(
&self.options.table_uri,
self.options.storage_backend,
config,
)?;

match self.options.version {
DeltaVersion::Newest => table.load().await?,
DeltaVersion::Version(v) => table.load_version(v).await?,
DeltaVersion::Timestamp(ts) => table.load_with_datetime(ts).await?,
}

Ok(table)
}
}

/// In memory representation of a Delta Table
pub struct DeltaTable {
/// The version of the table as of the most recent loaded Delta log entry.
pub version: DeltaDataTypeVersion,
/// The URI the DeltaTable was loaded from.
pub table_uri: String,
/// the load options used during load
pub config: DeltaTableConfig,

state: DeltaTableState,

Expand Down Expand Up @@ -609,10 +748,10 @@ impl DeltaTable {
let mut new_state = DeltaTableState::default();
for line in reader.lines() {
let action: Action = serde_json::from_str(line?.as_str())?;
process_action(&mut new_state, action)?;
process_action(&mut new_state, action, true)?;
}

self.state.merge(new_state);
self.state.merge(new_state, self.config.require_tombstones);

Ok(())
}
Expand Down Expand Up @@ -642,6 +781,7 @@ impl DeltaTable {
process_action(
&mut self.state,
Action::from_parquet_record(schema, &record)?,
self.config.require_tombstones,
)?;
}
}
Expand Down Expand Up @@ -1130,6 +1270,7 @@ impl DeltaTable {
pub fn new(
table_uri: &str,
storage_backend: Box<dyn StorageBackend>,
config: DeltaTableConfig,
) -> Result<Self, DeltaTableError> {
let table_uri = storage_backend.trim_path(table_uri);
let log_uri_normalized = storage_backend.join_path(&table_uri, "_delta_log");
Expand All @@ -1138,6 +1279,7 @@ impl DeltaTable {
state: DeltaTableState::default(),
storage: storage_backend,
table_uri,
config,
last_check_point: None,
log_uri: log_uri_normalized,
version_timestamp: HashMap::new(),
Expand Down Expand Up @@ -1179,7 +1321,7 @@ impl DeltaTable {
// Mutate the DeltaTable's state using process_action()
// in order to get most up-to-date state based on the commit above
for action in actions {
let _ = process_action(&mut self.state, action)?;
let _ = process_action(&mut self.state, action, self.config.require_tombstones)?;
}

Ok(())
Expand Down Expand Up @@ -1511,14 +1653,20 @@ fn log_entry_from_actions(actions: &[Action]) -> Result<String, serde_json::Erro
Ok(jsons.join("\n"))
}

fn process_action(state: &mut DeltaTableState, action: Action) -> Result<(), ApplyLogError> {
fn process_action(
state: &mut DeltaTableState,
action: Action,
handle_tombstones: bool,
) -> Result<(), ApplyLogError> {
match action {
Action::add(v) => {
state.files.push(v.path_decoded()?);
}
Action::remove(v) => {
let v = v.path_decoded()?;
state.tombstones.push(v);
if handle_tombstones {
let v = v.path_decoded()?;
state.tombstones.push(v);
}
}
Action::protocol(v) => {
state.min_reader_version = v.min_reader_version;
Expand Down Expand Up @@ -1548,9 +1696,7 @@ fn process_action(state: &mut DeltaTableState, action: Action) -> Result<(), App
/// Creates and loads a DeltaTable from the given path with current metadata.
/// Infers the storage backend to use from the scheme in the given table path.
pub async fn open_table(table_uri: &str) -> Result<DeltaTable, DeltaTableError> {
let storage_backend = storage::get_backend_for_uri(table_uri)?;
let mut table = DeltaTable::new(table_uri, storage_backend)?;
table.load().await?;
let table = DeltaTableBuilder::from_uri(table_uri)?.load().await?;

Ok(table)
}
Expand All @@ -1561,22 +1707,21 @@ pub async fn open_table_with_version(
table_uri: &str,
version: DeltaDataTypeVersion,
) -> Result<DeltaTable, DeltaTableError> {
let storage_backend = storage::get_backend_for_uri(table_uri)?;
let mut table = DeltaTable::new(table_uri, storage_backend)?;
table.load_version(version).await?;

let table = DeltaTableBuilder::from_uri(table_uri)?
.with_version(version)
.load()
.await?;
Ok(table)
}

/// Creates a DeltaTable from the given path.
/// Loads metadata from the version appropriate based on the given ISO-8601/RFC-3339 timestamp.
/// Infers the storage backend to use from the scheme in the given table path.
pub async fn open_table_with_ds(table_uri: &str, ds: &str) -> Result<DeltaTable, DeltaTableError> {
let datetime = DateTime::<Utc>::from(DateTime::<FixedOffset>::parse_from_rfc3339(ds)?);
let storage_backend = storage::get_backend_for_uri(table_uri)?;
let mut table = DeltaTable::new(table_uri, storage_backend)?;
table.load_with_datetime(datetime).await?;

let table = DeltaTableBuilder::from_uri(table_uri)?
.with_datestring(ds)?
.load()
.await?;
Ok(table)
}

Expand Down Expand Up @@ -1614,7 +1759,7 @@ mod tests {
last_updated: Some(0),
});

let _ = process_action(&mut state, txn_action).unwrap();
let _ = process_action(&mut state, txn_action, false).unwrap();

assert_eq!(2, *state.app_transaction_version.get("abc").unwrap());
assert_eq!(1, *state.app_transaction_version.get("xyz").unwrap());
Expand All @@ -1631,7 +1776,7 @@ mod tests {
.iter()
{
let be = storage::get_backend_for_uri(table_uri).unwrap();
let table = DeltaTable::new(table_uri, be).unwrap();
let table = DeltaTable::new(table_uri, be, DeltaTableConfig::default()).unwrap();
assert_eq!(table.table_uri, "s3://tests/data/delta-0.8.0");
}
}
Expand Down Expand Up @@ -1715,7 +1860,8 @@ mod tests {
let backend = Box::new(storage::file::FileStorageBackend::new(
tmp_dir.path().to_str().unwrap(),
));
let mut dt = DeltaTable::new(path, backend).unwrap();
let mut dt = DeltaTable::new(path, backend, DeltaTableConfig::default()).unwrap();
// let mut dt = DeltaTable::new(path, backend, DeltaTableLoadOptions::default()).unwrap();

let mut commit_info = Map::<String, Value>::new();
commit_info.insert(
Expand Down
4 changes: 2 additions & 2 deletions rust/tests/fs_common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use deltalake::action::{Action, Protocol};
use deltalake::{storage, DeltaTable, DeltaTableMetaData, Schema};
use deltalake::{storage, DeltaTable, DeltaTableConfig, DeltaTableMetaData, Schema};
use parquet::file::reader::{FileReader, SerializedFileReader};
use parquet::schema::types::Type;
use std::collections::HashMap;
Expand All @@ -26,7 +26,7 @@ pub async fn create_test_table(
config: HashMap<String, Option<String>>,
) -> DeltaTable {
let backend = storage::get_backend_for_uri(path).unwrap();
let mut table = DeltaTable::new(path, backend).unwrap();
let mut table = DeltaTable::new(path, backend, DeltaTableConfig::default()).unwrap();
let md = DeltaTableMetaData::new(None, None, None, schema, Vec::new(), config);
let protocol = Protocol {
min_reader_version: 1,
Expand Down
23 changes: 23 additions & 0 deletions rust/tests/read_delta_test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
extern crate deltalake;

use deltalake::storage::file::FileStorageBackend;
use deltalake::DeltaTableBuilder;
use deltalake::StorageBackend;
use pretty_assertions::assert_eq;
use std::collections::HashMap;
Expand Down Expand Up @@ -52,6 +53,28 @@ async fn read_delta_table_with_update() {
);
}

#[tokio::test]
async fn read_delta_table_ignoring_tombstones() {
let table = DeltaTableBuilder::from_uri("./tests/data/delta-0.8.0")
.unwrap()
.without_tombstones()
.load()
.await
.unwrap();
assert!(
table.get_state().all_tombstones().is_empty(),
"loading without tombstones should skip tombstones"
);

assert_eq!(
table.get_files(),
vec![
"part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet",
"part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet"
]
);
}

#[tokio::test]
async fn read_delta_2_0_table_with_version() {
let mut table = deltalake::open_table_with_version("./tests/data/delta-0.2.0", 0)
Expand Down
4 changes: 3 additions & 1 deletion rust/tests/s3_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ mod s3 {
},
)
.unwrap();
let mut table = deltalake::DeltaTable::new(table_uri, storage).unwrap();
let mut table =
deltalake::DeltaTable::new(table_uri, storage, deltalake::DeltaTableConfig::default())
.unwrap();
table.load().await.unwrap();
println!("{}", table);

Expand Down

0 comments on commit 8398cb2

Please sign in to comment.