Skip to content

Commit

Permalink
refactor: move version field to DeltaTableState (#649)
Browse files Browse the repository at this point in the history
* refactor: move version field to table state

* docs: add comments explaining how  treats the version field
  • Loading branch information
roeap authored Jun 19, 2022
1 parent 82c1936 commit 1fa8d6d
Show file tree
Hide file tree
Showing 19 changed files with 134 additions and 111 deletions.
2 changes: 1 addition & 1 deletion python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl RawDeltaTable {
}

pub fn version(&self) -> PyResult<i64> {
Ok(self._table.version)
Ok(self._table.version())
}

pub fn metadata(&self) -> PyResult<RawDeltaTableMetaData> {
Expand Down
2 changes: 1 addition & 1 deletion ruby/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl TableData {
}

fn version(&self) -> i64 {
self.actual.version
self.actual.version()
}

fn files(&self) -> Vec<String> {
Expand Down
8 changes: 4 additions & 4 deletions rust/src/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ impl From<CheckpointError> for ArrowError {
}
}

/// Creates checkpoint at `table.version` for given `table`.
/// Creates checkpoint at current table version
pub async fn create_checkpoint(table: &DeltaTable) -> Result<(), CheckpointError> {
create_checkpoint_for(
table.version,
table.version(),
table.get_state(),
table.storage.as_ref(),
&table.table_uri,
Expand All @@ -105,7 +105,7 @@ pub async fn cleanup_metadata(table: &DeltaTable) -> Result<i32, DeltaTableError
let log_retention_timestamp =
Utc::now().timestamp_millis() - table.get_state().log_retention_millis();
cleanup_expired_logs_for(
table.version + 1,
table.version() + 1,
table.storage.as_ref(),
log_retention_timestamp,
&table.table_uri,
Expand Down Expand Up @@ -133,7 +133,7 @@ pub async fn create_checkpoint_from_table_uri_and_cleanup(
let enable_expired_log_cleanup =
cleanup.unwrap_or_else(|| table.get_state().enable_expired_log_cleanup());

if table.version >= 0 && enable_expired_log_cleanup {
if table.version() >= 0 && enable_expired_log_cleanup {
let deleted_log_num = cleanup_metadata(&table).await?;
debug!("Deleted {:?} log files.", deleted_log_num);
}
Expand Down
49 changes: 25 additions & 24 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::delta_config::DeltaConfigError;
#[derive(Serialize, Deserialize, Debug, Default, Clone, Copy)]
pub struct CheckPoint {
/// Delta table version
version: DeltaDataTypeVersion, // 20 digits decimals
pub(crate) version: DeltaDataTypeVersion, // 20 digits decimals
size: DeltaDataTypeLong,
parts: Option<u32>, // 10 digits decimals
}
Expand Down Expand Up @@ -583,15 +583,12 @@ pub enum PeekCommit {

/// In memory representation of a Delta Table
pub struct DeltaTable {
/// The version of the table as of the most recent loaded Delta log entry.
pub version: DeltaDataTypeVersion,
/// The state of the table as of the most recent loaded Delta log entry.
pub state: DeltaTableState,
/// The URI the DeltaTable was loaded from.
pub table_uri: String,
/// the load options used during load
pub config: DeltaTableConfig,

state: DeltaTableState,

// metadata
// application_transactions
pub(crate) storage: Box<dyn StorageBackend>,
Expand Down Expand Up @@ -795,11 +792,15 @@ impl DeltaTable {
Ok(version)
}

/// Currently loaded evrsion of the table
pub fn version(&self) -> DeltaDataTypeVersion {
self.state.version
}

/// Load DeltaTable with data from latest checkpoint
pub async fn load(&mut self) -> Result<(), DeltaTableError> {
self.last_check_point = None;
self.version = -1;
self.state = DeltaTableState::default();
self.state = DeltaTableState::with_version(-1);
self.update().await
}

Expand Down Expand Up @@ -832,14 +833,16 @@ impl DeltaTable {
new_version: DeltaDataTypeVersion,
actions: Vec<Action>,
) -> Result<(), DeltaTableError> {
if self.version + 1 != new_version {
return Err(DeltaTableError::VersionMismatch(new_version, self.version));
if self.version() + 1 != new_version {
return Err(DeltaTableError::VersionMismatch(
new_version,
self.version(),
));
}

let s = DeltaTableState::from_actions(actions)?;
let s = DeltaTableState::from_actions(actions, new_version)?;
self.state
.merge(s, self.config.require_tombstones, self.config.require_files);
self.version = new_version;

Ok(())
}
Expand All @@ -854,7 +857,7 @@ impl DeltaTable {
} else {
self.last_check_point = Some(last_check_point);
self.restore_checkpoint(last_check_point).await?;
self.version = last_check_point.version;
self.state.version = last_check_point.version;
self.update_incremental().await
}
}
Expand All @@ -866,11 +869,11 @@ impl DeltaTable {
/// Updates the DeltaTable to the latest version by incrementally applying newer versions.
/// It assumes that the table is already updated to the current version `self.version`.
pub async fn update_incremental(&mut self) -> Result<(), DeltaTableError> {
while let PeekCommit::New(version, actions) = self.peek_next_commit(self.version).await? {
while let PeekCommit::New(version, actions) = self.peek_next_commit(self.version()).await? {
self.apply_actions(version, actions)?;
}

if self.version == -1 {
if self.version() == -1 {
let err = format!(
"No snapshot or version 0 found, perhaps {} is an empty dir?",
self.table_uri
Expand All @@ -897,7 +900,6 @@ impl DeltaTable {
return Err(DeltaTableError::from(e));
}
}
self.version = version;

let mut next_version;
// 1. find latest checkpoint below version
Expand All @@ -908,13 +910,13 @@ impl DeltaTable {
}
None => {
// no checkpoint found, clear table state and start from the beginning
self.state = DeltaTableState::default();
self.state = DeltaTableState::with_version(0);
next_version = 0;
}
}

// 2. apply all logs starting from checkpoint
while next_version <= self.version {
while next_version <= version {
self.apply_log(next_version).await?;
next_version += 1;
}
Expand Down Expand Up @@ -951,7 +953,7 @@ impl DeltaTable {
limit: Option<usize>,
) -> Result<Vec<Map<String, Value>>, DeltaTableError> {
let mut version = match limit {
Some(l) => max(self.version - l as i64 + 1, 0),
Some(l) => max(self.version() - l as i64 + 1, 0),
None => self.get_earliest_delta_log_version().await?,
};
let mut commit_infos_list = vec![];
Expand Down Expand Up @@ -1309,8 +1311,7 @@ impl DeltaTable {
let table_uri = storage_backend.trim_path(table_uri);
let log_uri_normalized = storage_backend.join_path(&table_uri, "_delta_log");
Ok(Self {
version: -1,
state: DeltaTableState::default(),
state: DeltaTableState::with_version(-1),
storage: storage_backend,
table_uri,
config,
Expand Down Expand Up @@ -1412,7 +1413,7 @@ impl DeltaTable {
impl fmt::Display for DeltaTable {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
writeln!(f, "DeltaTable({})", self.table_uri)?;
writeln!(f, "\tversion: {}", self.version)?;
writeln!(f, "\tversion: {}", self.version())?;
match self.state.current_metadata() {
Some(metadata) => {
writeln!(f, "\tmetadata: {}", metadata)?;
Expand Down Expand Up @@ -1599,7 +1600,7 @@ impl<'a> DeltaTransaction<'a> {
loop {
self.delta_table.update().await?;

let version = self.delta_table.version + 1;
let version = self.delta_table.version() + 1;

match self
.delta_table
Expand Down Expand Up @@ -1793,7 +1794,7 @@ mod tests {

// Validation
// assert DeltaTable version is now 0 and no data files have been added
assert_eq!(dt.version, 0);
assert_eq!(dt.version(), 0);
assert_eq!(dt.state.files().len(), 0);

// assert new _delta_log file created in tempDir
Expand Down
4 changes: 2 additions & 2 deletions rust/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,15 @@ mod tests {
assert!(log_path.exists());

let mut table = open_table(&table_uri).await.unwrap();
assert_eq!(table.version, 0);
assert_eq!(table.version(), 0);

// Check we can create an existing table with ignore
let ts1 = table.get_version_timestamp(0).await.unwrap();
let mut child = Command::new("sleep").arg("1").spawn().unwrap();
let _result = child.wait().unwrap();
let _ = collect(transaction, task_ctx.clone()).await.unwrap();
let mut table = open_table(&table_uri).await.unwrap();
assert_eq!(table.version, 0);
assert_eq!(table.version(), 0);
let ts2 = table.get_version_timestamp(0).await.unwrap();
assert_eq!(ts1, ts2);

Expand Down
10 changes: 5 additions & 5 deletions rust/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl DeltaCommands {
) -> DeltaCommandResult<()> {
let transaction = Arc::new(DeltaTransactionPlan::new(
self.table.table_uri.clone(),
self.table.version,
self.table.version(),
plan,
operation,
None,
Expand Down Expand Up @@ -259,7 +259,7 @@ mod tests {
.unwrap();

let table = open_table(&table_uri).await.unwrap();
assert_eq!(table.version, 0);
assert_eq!(table.version(), 0);

let res = commands.create(metadata, SaveMode::ErrorIfExists).await;
assert!(res.is_err())
Expand All @@ -270,7 +270,7 @@ mod tests {
let batch = get_record_batch(None, false);
let partition_cols = vec!["modified".to_string()];
let mut table = create_initialized_table(&partition_cols).await;
assert_eq!(table.version, 0);
assert_eq!(table.version(), 0);

let mut commands = DeltaCommands::try_from_uri(table.table_uri.to_string())
.await
Expand All @@ -286,7 +286,7 @@ mod tests {
.unwrap();

table.update().await.unwrap();
assert_eq!(table.version, 1);
assert_eq!(table.version(), 1);

let files = table.get_file_uris();
assert_eq!(files.collect::<Vec<_>>().len(), 2)
Expand All @@ -311,7 +311,7 @@ mod tests {
.unwrap();

let table = open_table(&table_uri).await.unwrap();
assert_eq!(table.version, 0);
assert_eq!(table.version(), 0);

let files = table.get_file_uris();
assert_eq!(files.collect::<Vec<_>>().len(), 2)
Expand Down
14 changes: 7 additions & 7 deletions rust/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ mod tests {
async fn test_append_data() {
let partition_cols = vec!["modified".to_string()];
let mut table = create_initialized_table(&partition_cols).await;
assert_eq!(table.version, 0);
assert_eq!(table.version(), 0);

let transaction = get_transaction(table.table_uri.clone(), 0, SaveMode::Append);
let session_ctx = SessionContext::new();
Expand All @@ -434,20 +434,20 @@ mod tests {
.unwrap();
table.update().await.unwrap();
assert_eq!(table.get_file_uris().collect::<Vec<_>>().len(), 2);
assert_eq!(table.version, 1);
assert_eq!(table.version(), 1);

let transaction = get_transaction(table.table_uri.clone(), 1, SaveMode::Append);
let _ = collect(transaction.clone(), task_ctx).await.unwrap();
table.update().await.unwrap();
assert_eq!(table.get_file_uris().collect::<Vec<_>>().len(), 4);
assert_eq!(table.version, 2);
assert_eq!(table.version(), 2);
}

#[tokio::test]
async fn test_overwrite_data() {
let partition_cols = vec!["modified".to_string()];
let mut table = create_initialized_table(&partition_cols).await;
assert_eq!(table.version, 0);
assert_eq!(table.version(), 0);

let transaction = get_transaction(table.table_uri.clone(), 0, SaveMode::Overwrite);
let session_ctx = SessionContext::new();
Expand All @@ -458,13 +458,13 @@ mod tests {
.unwrap();
table.update().await.unwrap();
assert_eq!(table.get_file_uris().collect::<Vec<_>>().len(), 2);
assert_eq!(table.version, 1);
assert_eq!(table.version(), 1);

let transaction = get_transaction(table.table_uri.clone(), 1, SaveMode::Overwrite);
let _ = collect(transaction.clone(), task_ctx).await.unwrap();
table.update().await.unwrap();
assert_eq!(table.get_file_uris().collect::<Vec<_>>().len(), 2);
assert_eq!(table.version, 2);
assert_eq!(table.version(), 2);
}

#[tokio::test]
Expand All @@ -487,7 +487,7 @@ mod tests {
// THe table should be created on write and thus have version 0
let table = open_table(table_path.to_str().unwrap()).await.unwrap();
assert_eq!(table.get_file_uris().collect::<Vec<_>>().len(), 2);
assert_eq!(table.version, 0);
assert_eq!(table.version(), 0);
}

fn get_transaction(
Expand Down
2 changes: 1 addition & 1 deletion rust/src/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ impl MergePlan {
// optimized partition was updated then abort the commit. Requires (#593).
if !actions.is_empty() {
let mut metadata = Map::new();
metadata.insert("readVersion".to_owned(), table.version.into());
metadata.insert("readVersion".to_owned(), table.version().into());
let maybe_map_metrics = serde_json::to_value(metrics.clone());
if let Ok(map) = maybe_map_metrics {
metadata.insert("operationMetrics".to_owned(), map);
Expand Down
Loading

0 comments on commit 1fa8d6d

Please sign in to comment.