Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: move version field to DeltaTableState #649

Merged
merged 3 commits into from
Jun 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ impl RawDeltaTable {
}

pub fn version(&self) -> PyResult<i64> {
Ok(self._table.version)
Ok(self._table.version())
}

pub fn metadata(&self) -> PyResult<RawDeltaTableMetaData> {
Expand Down
2 changes: 1 addition & 1 deletion ruby/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl TableData {
}

fn version(&self) -> i64 {
self.actual.version
self.actual.version()
}

fn files(&self) -> Vec<String> {
Expand Down
8 changes: 4 additions & 4 deletions rust/src/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ impl From<CheckpointError> for ArrowError {
}
}

/// Creates checkpoint at `table.version` for given `table`.
/// Creates checkpoint at current table version
pub async fn create_checkpoint(table: &DeltaTable) -> Result<(), CheckpointError> {
create_checkpoint_for(
table.version,
table.version(),
table.get_state(),
table.storage.as_ref(),
&table.table_uri,
Expand All @@ -105,7 +105,7 @@ pub async fn cleanup_metadata(table: &DeltaTable) -> Result<i32, DeltaTableError
let log_retention_timestamp =
Utc::now().timestamp_millis() - table.get_state().log_retention_millis();
cleanup_expired_logs_for(
table.version + 1,
table.version() + 1,
table.storage.as_ref(),
log_retention_timestamp,
&table.table_uri,
Expand Down Expand Up @@ -133,7 +133,7 @@ pub async fn create_checkpoint_from_table_uri_and_cleanup(
let enable_expired_log_cleanup =
cleanup.unwrap_or_else(|| table.get_state().enable_expired_log_cleanup());

if table.version >= 0 && enable_expired_log_cleanup {
if table.version() >= 0 && enable_expired_log_cleanup {
let deleted_log_num = cleanup_metadata(&table).await?;
debug!("Deleted {:?} log files.", deleted_log_num);
}
Expand Down
49 changes: 25 additions & 24 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::delta_config::DeltaConfigError;
#[derive(Serialize, Deserialize, Debug, Default, Clone, Copy)]
pub struct CheckPoint {
/// Delta table version
version: DeltaDataTypeVersion, // 20 digits decimals
pub(crate) version: DeltaDataTypeVersion, // 20 digits decimals
size: DeltaDataTypeLong,
parts: Option<u32>, // 10 digits decimals
}
Expand Down Expand Up @@ -583,15 +583,12 @@ pub enum PeekCommit {

/// In memory representation of a Delta Table
pub struct DeltaTable {
/// The version of the table as of the most recent loaded Delta log entry.
pub version: DeltaDataTypeVersion,
/// The state of the table as of the most recent loaded Delta log entry.
pub state: DeltaTableState,
/// The URI the DeltaTable was loaded from.
pub table_uri: String,
/// the load options used during load
pub config: DeltaTableConfig,

state: DeltaTableState,

// metadata
// application_transactions
pub(crate) storage: Box<dyn StorageBackend>,
Expand Down Expand Up @@ -795,11 +792,15 @@ impl DeltaTable {
Ok(version)
}

/// Currently loaded evrsion of the table
pub fn version(&self) -> DeltaDataTypeVersion {
self.state.version
}

/// Load DeltaTable with data from latest checkpoint
pub async fn load(&mut self) -> Result<(), DeltaTableError> {
self.last_check_point = None;
self.version = -1;
self.state = DeltaTableState::default();
self.state = DeltaTableState::with_version(-1);
self.update().await
}

Expand Down Expand Up @@ -832,14 +833,16 @@ impl DeltaTable {
new_version: DeltaDataTypeVersion,
actions: Vec<Action>,
) -> Result<(), DeltaTableError> {
if self.version + 1 != new_version {
return Err(DeltaTableError::VersionMismatch(new_version, self.version));
if self.version() + 1 != new_version {
return Err(DeltaTableError::VersionMismatch(
new_version,
self.version(),
));
}

let s = DeltaTableState::from_actions(actions)?;
let s = DeltaTableState::from_actions(actions, new_version)?;
self.state
.merge(s, self.config.require_tombstones, self.config.require_files);
self.version = new_version;

Ok(())
}
Expand All @@ -854,7 +857,7 @@ impl DeltaTable {
} else {
self.last_check_point = Some(last_check_point);
self.restore_checkpoint(last_check_point).await?;
self.version = last_check_point.version;
self.state.version = last_check_point.version;
self.update_incremental().await
}
}
Expand All @@ -866,11 +869,11 @@ impl DeltaTable {
/// Updates the DeltaTable to the latest version by incrementally applying newer versions.
/// It assumes that the table is already updated to the current version `self.version`.
pub async fn update_incremental(&mut self) -> Result<(), DeltaTableError> {
while let PeekCommit::New(version, actions) = self.peek_next_commit(self.version).await? {
while let PeekCommit::New(version, actions) = self.peek_next_commit(self.version()).await? {
self.apply_actions(version, actions)?;
}

if self.version == -1 {
if self.version() == -1 {
let err = format!(
"No snapshot or version 0 found, perhaps {} is an empty dir?",
self.table_uri
Expand All @@ -897,7 +900,6 @@ impl DeltaTable {
return Err(DeltaTableError::from(e));
}
}
self.version = version;

let mut next_version;
// 1. find latest checkpoint below version
Expand All @@ -908,13 +910,13 @@ impl DeltaTable {
}
None => {
// no checkpoint found, clear table state and start from the beginning
self.state = DeltaTableState::default();
self.state = DeltaTableState::with_version(0);
next_version = 0;
}
}

// 2. apply all logs starting from checkpoint
while next_version <= self.version {
while next_version <= version {
self.apply_log(next_version).await?;
next_version += 1;
}
Expand Down Expand Up @@ -951,7 +953,7 @@ impl DeltaTable {
limit: Option<usize>,
) -> Result<Vec<Map<String, Value>>, DeltaTableError> {
let mut version = match limit {
Some(l) => max(self.version - l as i64 + 1, 0),
Some(l) => max(self.version() - l as i64 + 1, 0),
None => self.get_earliest_delta_log_version().await?,
};
let mut commit_infos_list = vec![];
Expand Down Expand Up @@ -1309,8 +1311,7 @@ impl DeltaTable {
let table_uri = storage_backend.trim_path(table_uri);
let log_uri_normalized = storage_backend.join_path(&table_uri, "_delta_log");
Ok(Self {
version: -1,
state: DeltaTableState::default(),
state: DeltaTableState::with_version(-1),
storage: storage_backend,
table_uri,
config,
Expand Down Expand Up @@ -1412,7 +1413,7 @@ impl DeltaTable {
impl fmt::Display for DeltaTable {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
writeln!(f, "DeltaTable({})", self.table_uri)?;
writeln!(f, "\tversion: {}", self.version)?;
writeln!(f, "\tversion: {}", self.version())?;
match self.state.current_metadata() {
Some(metadata) => {
writeln!(f, "\tmetadata: {}", metadata)?;
Expand Down Expand Up @@ -1599,7 +1600,7 @@ impl<'a> DeltaTransaction<'a> {
loop {
self.delta_table.update().await?;

let version = self.delta_table.version + 1;
let version = self.delta_table.version() + 1;

match self
.delta_table
Expand Down Expand Up @@ -1793,7 +1794,7 @@ mod tests {

// Validation
// assert DeltaTable version is now 0 and no data files have been added
assert_eq!(dt.version, 0);
assert_eq!(dt.version(), 0);
assert_eq!(dt.state.files().len(), 0);

// assert new _delta_log file created in tempDir
Expand Down
4 changes: 2 additions & 2 deletions rust/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,15 @@ mod tests {
assert!(log_path.exists());

let mut table = open_table(&table_uri).await.unwrap();
assert_eq!(table.version, 0);
assert_eq!(table.version(), 0);

// Check we can create an existing table with ignore
let ts1 = table.get_version_timestamp(0).await.unwrap();
let mut child = Command::new("sleep").arg("1").spawn().unwrap();
let _result = child.wait().unwrap();
let _ = collect(transaction, task_ctx.clone()).await.unwrap();
let mut table = open_table(&table_uri).await.unwrap();
assert_eq!(table.version, 0);
assert_eq!(table.version(), 0);
let ts2 = table.get_version_timestamp(0).await.unwrap();
assert_eq!(ts1, ts2);

Expand Down
10 changes: 5 additions & 5 deletions rust/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl DeltaCommands {
) -> DeltaCommandResult<()> {
let transaction = Arc::new(DeltaTransactionPlan::new(
self.table.table_uri.clone(),
self.table.version,
self.table.version(),
plan,
operation,
None,
Expand Down Expand Up @@ -259,7 +259,7 @@ mod tests {
.unwrap();

let table = open_table(&table_uri).await.unwrap();
assert_eq!(table.version, 0);
assert_eq!(table.version(), 0);

let res = commands.create(metadata, SaveMode::ErrorIfExists).await;
assert!(res.is_err())
Expand All @@ -270,7 +270,7 @@ mod tests {
let batch = get_record_batch(None, false);
let partition_cols = vec!["modified".to_string()];
let mut table = create_initialized_table(&partition_cols).await;
assert_eq!(table.version, 0);
assert_eq!(table.version(), 0);

let mut commands = DeltaCommands::try_from_uri(table.table_uri.to_string())
.await
Expand All @@ -286,7 +286,7 @@ mod tests {
.unwrap();

table.update().await.unwrap();
assert_eq!(table.version, 1);
assert_eq!(table.version(), 1);

let files = table.get_file_uris();
assert_eq!(files.collect::<Vec<_>>().len(), 2)
Expand All @@ -311,7 +311,7 @@ mod tests {
.unwrap();

let table = open_table(&table_uri).await.unwrap();
assert_eq!(table.version, 0);
assert_eq!(table.version(), 0);

let files = table.get_file_uris();
assert_eq!(files.collect::<Vec<_>>().len(), 2)
Expand Down
14 changes: 7 additions & 7 deletions rust/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ mod tests {
async fn test_append_data() {
let partition_cols = vec!["modified".to_string()];
let mut table = create_initialized_table(&partition_cols).await;
assert_eq!(table.version, 0);
assert_eq!(table.version(), 0);

let transaction = get_transaction(table.table_uri.clone(), 0, SaveMode::Append);
let session_ctx = SessionContext::new();
Expand All @@ -434,20 +434,20 @@ mod tests {
.unwrap();
table.update().await.unwrap();
assert_eq!(table.get_file_uris().collect::<Vec<_>>().len(), 2);
assert_eq!(table.version, 1);
assert_eq!(table.version(), 1);

let transaction = get_transaction(table.table_uri.clone(), 1, SaveMode::Append);
let _ = collect(transaction.clone(), task_ctx).await.unwrap();
table.update().await.unwrap();
assert_eq!(table.get_file_uris().collect::<Vec<_>>().len(), 4);
assert_eq!(table.version, 2);
assert_eq!(table.version(), 2);
}

#[tokio::test]
async fn test_overwrite_data() {
let partition_cols = vec!["modified".to_string()];
let mut table = create_initialized_table(&partition_cols).await;
assert_eq!(table.version, 0);
assert_eq!(table.version(), 0);

let transaction = get_transaction(table.table_uri.clone(), 0, SaveMode::Overwrite);
let session_ctx = SessionContext::new();
Expand All @@ -458,13 +458,13 @@ mod tests {
.unwrap();
table.update().await.unwrap();
assert_eq!(table.get_file_uris().collect::<Vec<_>>().len(), 2);
assert_eq!(table.version, 1);
assert_eq!(table.version(), 1);

let transaction = get_transaction(table.table_uri.clone(), 1, SaveMode::Overwrite);
let _ = collect(transaction.clone(), task_ctx).await.unwrap();
table.update().await.unwrap();
assert_eq!(table.get_file_uris().collect::<Vec<_>>().len(), 2);
assert_eq!(table.version, 2);
assert_eq!(table.version(), 2);
}

#[tokio::test]
Expand All @@ -487,7 +487,7 @@ mod tests {
// THe table should be created on write and thus have version 0
let table = open_table(table_path.to_str().unwrap()).await.unwrap();
assert_eq!(table.get_file_uris().collect::<Vec<_>>().len(), 2);
assert_eq!(table.version, 0);
assert_eq!(table.version(), 0);
}

fn get_transaction(
Expand Down
2 changes: 1 addition & 1 deletion rust/src/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ impl MergePlan {
// optimized partition was updated then abort the commit. Requires (#593).
if !actions.is_empty() {
let mut metadata = Map::new();
metadata.insert("readVersion".to_owned(), table.version.into());
metadata.insert("readVersion".to_owned(), table.version().into());
let maybe_map_metrics = serde_json::to_value(metrics.clone());
if let Ok(map) = maybe_map_metrics {
metadata.insert("operationMetrics".to_owned(), map);
Expand Down
Loading