Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Move version field into DeltaTableState struct #553

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl RawDeltaTable {
}

pub fn version(&self) -> PyResult<i64> {
Ok(self._table.version)
Ok(self._table.state.version)
}

pub fn metadata(&self) -> PyResult<RawDeltaTableMetaData> {
Expand Down
2 changes: 1 addition & 1 deletion ruby/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl TableData {
}

fn version(&self) -> i64 {
self.actual.version
self.actual.state.version
}

fn files(&self) -> Vec<String> {
Expand Down
8 changes: 4 additions & 4 deletions rust/src/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ impl From<CheckpointError> for ArrowError {
}
}

/// Creates checkpoint at `table.version` for given `table`.
/// Creates checkpoint at `table.state.version` for given `table`.
pub async fn create_checkpoint(table: &DeltaTable) -> Result<(), CheckpointError> {
create_checkpoint_for(
table.version,
table.state.version,
table.get_state(),
table.storage.as_ref(),
&table.table_uri,
Expand All @@ -105,7 +105,7 @@ pub async fn cleanup_metadata(table: &DeltaTable) -> Result<i32, DeltaTableError
let log_retention_timestamp =
Utc::now().timestamp_millis() - table.get_state().log_retention_millis();
cleanup_expired_logs_for(
table.version + 1,
table.state.version + 1,
table.storage.as_ref(),
log_retention_timestamp,
&table.table_uri,
Expand Down Expand Up @@ -133,7 +133,7 @@ pub async fn create_checkpoint_from_table_uri_and_cleanup(
let enable_expired_log_cleanup =
cleanup.unwrap_or_else(|| table.get_state().enable_expired_log_cleanup());

if table.version >= 0 && enable_expired_log_cleanup {
if table.state.version >= 0 && enable_expired_log_cleanup {
let deleted_log_num = cleanup_metadata(&table).await?;
debug!("Deleted {:?} log files.", deleted_log_num);
}
Expand Down
41 changes: 22 additions & 19 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,15 +559,14 @@ pub enum PeekCommit {

/// In memory representation of a Delta Table
pub struct DeltaTable {
/// The version of the table as of the most recent loaded Delta log entry.
pub version: DeltaDataTypeVersion,
/// Current table state
pub state: DeltaTableState,

/// The URI the DeltaTable was loaded from.
pub table_uri: String,
/// the load options used during load
pub config: DeltaTableConfig,

state: DeltaTableState,

// metadata
// application_transactions
pub(crate) storage: Box<dyn StorageBackend>,
Expand Down Expand Up @@ -772,7 +771,7 @@ impl DeltaTable {
/// Load DeltaTable with data from latest checkpoint
pub async fn load(&mut self) -> Result<(), DeltaTableError> {
self.last_check_point = None;
self.version = -1;
self.state.version = -1;
self.state = DeltaTableState::default();
self.update().await
}
Expand Down Expand Up @@ -806,13 +805,16 @@ impl DeltaTable {
new_version: DeltaDataTypeVersion,
actions: Vec<Action>,
) -> Result<(), DeltaTableError> {
if self.version + 1 != new_version {
return Err(DeltaTableError::VersionMismatch(new_version, self.version));
if self.state.version + 1 != new_version {
return Err(DeltaTableError::VersionMismatch(
new_version,
self.state.version,
));
}

let s = DeltaTableState::from_actions(actions)?;
self.state.merge(s, self.config.require_tombstones);
self.version = new_version;
self.state.version = new_version;

Ok(())
}
Expand All @@ -827,7 +829,7 @@ impl DeltaTable {
} else {
self.last_check_point = Some(last_check_point);
self.restore_checkpoint(last_check_point).await?;
self.version = last_check_point.version;
self.state.version = last_check_point.version;
self.update_incremental().await
}
}
Expand All @@ -837,13 +839,15 @@ impl DeltaTable {
}

/// Updates the DeltaTable to the latest version by incrementally applying newer versions.
/// It assumes that the table is already updated to the current version `self.version`.
/// It assumes that the table is already updated to the current version `self.state.version`.
pub async fn update_incremental(&mut self) -> Result<(), DeltaTableError> {
while let PeekCommit::New(version, actions) = self.peek_next_commit(self.version).await? {
while let PeekCommit::New(version, actions) =
self.peek_next_commit(self.state.version).await?
{
self.apply_actions(version, actions)?;
}

if self.version == -1 {
if self.state.version == -1 {
let err = format!(
"No snapshot or version 0 found, perhaps {} is an empty dir?",
self.table_uri
Expand All @@ -870,7 +874,7 @@ impl DeltaTable {
return Err(DeltaTableError::from(e));
}
}
self.version = version;
self.state.version = version;

let mut next_version;
// 1. find latest checkpoint below version
Expand All @@ -887,7 +891,7 @@ impl DeltaTable {
}

// 2. apply all logs starting from checkpoint
while next_version <= self.version {
while next_version <= self.state.version {
self.apply_log(next_version).await?;
next_version += 1;
}
Expand Down Expand Up @@ -924,7 +928,7 @@ impl DeltaTable {
limit: Option<usize>,
) -> Result<Vec<Map<String, Value>>, DeltaTableError> {
let mut version = match limit {
Some(l) => max(self.version - l as i64 + 1, 0),
Some(l) => max(self.state.version - l as i64 + 1, 0),
None => self.get_earliest_delta_log_version().await?,
};
let mut commit_infos_list = vec![];
Expand Down Expand Up @@ -1283,7 +1287,6 @@ impl DeltaTable {
let table_uri = storage_backend.trim_path(table_uri);
let log_uri_normalized = storage_backend.join_path(&table_uri, "_delta_log");
Ok(Self {
version: 0,
state: DeltaTableState::default(),
storage: storage_backend,
table_uri,
Expand Down Expand Up @@ -1376,7 +1379,7 @@ impl DeltaTable {
impl fmt::Display for DeltaTable {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
writeln!(f, "DeltaTable({})", self.table_uri)?;
writeln!(f, "\tversion: {}", self.version)?;
writeln!(f, "\tversion: {}", self.state.version)?;
match self.state.current_metadata() {
Some(metadata) => {
writeln!(f, "\tmetadata: {}", metadata)?;
Expand Down Expand Up @@ -1609,7 +1612,7 @@ impl<'a> DeltaTransaction<'a> {
loop {
self.delta_table.update().await?;

let version = self.delta_table.version + 1;
let version = self.delta_table.state.version + 1;

match self
.delta_table
Expand Down Expand Up @@ -1817,7 +1820,7 @@ mod tests {

// Validation
// assert DeltaTable version is now 0 and no data files have been added
assert_eq!(dt.version, 0);
assert_eq!(dt.state.version, 0);
assert_eq!(dt.state.files().len(), 0);

// assert new _delta_log file created in tempDir
Expand Down
3 changes: 3 additions & 0 deletions rust/src/table_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ pub struct DeltaTableState {
tombstone_retention_millis: DeltaDataTypeLong,
log_retention_millis: DeltaDataTypeLong,
enable_expired_log_cleanup: bool,
/// The version of the table as of the most recent loaded Delta log entry.
pub version: DeltaDataTypeVersion,
}

impl DeltaTableState {
Expand Down Expand Up @@ -272,6 +274,7 @@ mod tests {
tombstone_retention_millis: 0,
log_retention_millis: 0,
enable_expired_log_cleanup: true,
version: 0,
};

let txn_action = action::Action::txn(action::Txn {
Expand Down
6 changes: 3 additions & 3 deletions rust/tests/adls_gen2_table_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ mod adls_gen2_table {
.await
.unwrap();

assert_eq!(table.version, 4);
assert_eq!(table.state.version, 4);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(
Expand Down Expand Up @@ -93,7 +93,7 @@ mod adls_gen2_table {
.unwrap();

// Assert 1
assert_eq!(0, dt.version);
assert_eq!(0, dt.state.version);
assert_eq!(1, dt.get_min_reader_version());
assert_eq!(2, dt.get_min_writer_version());
assert_eq!(0, dt.get_files().len());
Expand All @@ -106,7 +106,7 @@ mod adls_gen2_table {

// Assert 2
assert_eq!(1, version);
assert_eq!(version, dt.version);
assert_eq!(version, dt.state.version);
assert_eq!(2, dt.get_files().len());

// Cleanup
Expand Down
4 changes: 2 additions & 2 deletions rust/tests/checkpoint_writer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ mod delete_expired_delta_log_in_checkpoint {

checkpoints::create_checkpoint_from_table_uri_and_cleanup(
&table.table_uri,
table.version,
table.state.version,
None,
)
.await
Expand Down Expand Up @@ -178,7 +178,7 @@ mod delete_expired_delta_log_in_checkpoint {

checkpoints::create_checkpoint_from_table_uri_and_cleanup(
&table.table_uri,
table.version,
table.state.version,
None,
)
.await
Expand Down
2 changes: 1 addition & 1 deletion rust/tests/gcs_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ mod gcs {
let table = deltalake::open_table(format!("gs://{}/simple_table", bucket).as_str())
.await
.unwrap();
assert_eq!(table.version, 4);
assert_eq!(table.state.version, 4);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(
Expand Down
2 changes: 1 addition & 1 deletion rust/tests/read_delta_partitions_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,5 +165,5 @@ async fn read_null_partitions_from_checkpoint() {

// verify that table loads from checkpoint and handles null partitions
let table = deltalake::open_table(&table.table_uri).await.unwrap();
assert_eq!(table.version, 2);
assert_eq!(table.state.version, 2);
}
22 changes: 11 additions & 11 deletions rust/tests/read_delta_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ async fn read_delta_2_0_table_without_version() {
let table = deltalake::open_table("./tests/data/delta-0.2.0")
.await
.unwrap();
assert_eq!(table.version, 3);
assert_eq!(table.state.version, 3);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(
Expand Down Expand Up @@ -82,7 +82,7 @@ async fn read_delta_2_0_table_with_version() {
let mut table = deltalake::open_table_with_version("./tests/data/delta-0.2.0", 0)
.await
.unwrap();
assert_eq!(table.version, 0);
assert_eq!(table.state.version, 0);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(
Expand All @@ -96,7 +96,7 @@ async fn read_delta_2_0_table_with_version() {
table = deltalake::open_table_with_version("./tests/data/delta-0.2.0", 2)
.await
.unwrap();
assert_eq!(table.version, 2);
assert_eq!(table.state.version, 2);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(
Expand All @@ -110,7 +110,7 @@ async fn read_delta_2_0_table_with_version() {
table = deltalake::open_table_with_version("./tests/data/delta-0.2.0", 3)
.await
.unwrap();
assert_eq!(table.version, 3);
assert_eq!(table.state.version, 3);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(
Expand All @@ -128,7 +128,7 @@ async fn read_delta_8_0_table_without_version() {
let table = deltalake::open_table("./tests/data/delta-0.8.0")
.await
.unwrap();
assert_eq!(table.version, 1);
assert_eq!(table.state.version, 1);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(
Expand Down Expand Up @@ -173,7 +173,7 @@ async fn read_delta_8_0_table_with_load_version() {
let mut table = deltalake::open_table("./tests/data/delta-0.8.0")
.await
.unwrap();
assert_eq!(table.version, 1);
assert_eq!(table.state.version, 1);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(
Expand All @@ -184,7 +184,7 @@ async fn read_delta_8_0_table_with_load_version() {
]
);
table.load_version(0).await.unwrap();
assert_eq!(table.version, 0);
assert_eq!(table.state.version, 0);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(
Expand Down Expand Up @@ -475,10 +475,10 @@ async fn test_table_history() {
async fn test_poll_table_commits() {
let path = "./tests/data/simple_table_with_checkpoint";
let mut table = deltalake::open_table_with_version(path, 9).await.unwrap();
let peek = table.peek_next_commit(table.version).await.unwrap();
let peek = table.peek_next_commit(table.state.version).await.unwrap();

let is_new = if let PeekCommit::New(version, actions) = peek {
assert_eq!(table.version, 9);
assert_eq!(table.state.version, 9);
assert!(!table
.get_files_iter()
.any(|f| f == "part-00000-f0e955c5-a1e3-4eec-834e-dcc098fc9005-c000.snappy.parquet"));
Expand All @@ -488,7 +488,7 @@ async fn test_poll_table_commits() {

table.apply_actions(version, actions).unwrap();

assert_eq!(table.version, 10);
assert_eq!(table.state.version, 10);
assert!(table
.get_files_iter()
.any(|f| f == "part-00000-f0e955c5-a1e3-4eec-834e-dcc098fc9005-c000.snappy.parquet"));
Expand All @@ -499,7 +499,7 @@ async fn test_poll_table_commits() {
};
assert!(is_new);

let peek = table.peek_next_commit(table.version).await.unwrap();
let peek = table.peek_next_commit(table.state.version).await.unwrap();
let is_up_to_date = match peek {
PeekCommit::UpToDate => true,
_ => false,
Expand Down
Loading