Skip to content

Commit

Permalink
feat: raise if files not loaded
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco authored and rtyler committed Sep 12, 2024
1 parent 86b0ba9 commit ad35eda
Show file tree
Hide file tree
Showing 14 changed files with 68 additions and 2 deletions.
3 changes: 3 additions & 0 deletions crates/core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,9 @@ pub enum DeltaTableError {
#[error("Table has not yet been initialized")]
NotInitialized,

#[error("Table has not yet been initialized with files, therefore {0} is not supported")]
NotInitializedWithFiles(String),

#[error("Change Data not enabled for version: {version}, Start: {start}, End: {end}")]
ChangeDataNotRecorded { version: i64, start: i64, end: i64 },

Expand Down
10 changes: 10 additions & 0 deletions crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ impl Snapshot {
&self.protocol
}

/// Get the table config which is loaded with of the snapshot
pub fn load_config(&self) -> &DeltaTableConfig {
&self.config
}

/// Get the table root of the snapshot
pub fn table_root(&self) -> Path {
Path::from(self.table_url.clone())
Expand Down Expand Up @@ -535,6 +540,11 @@ impl EagerSnapshot {
self.snapshot.table_root()
}

/// Get the table config which is loaded with of the snapshot
pub fn load_config(&self) -> &DeltaTableConfig {
&self.snapshot.load_config()
}

/// Well known table configuration
pub fn table_config(&self) -> TableConfig<'_> {
self.snapshot.table_config()
Expand Down
6 changes: 6 additions & 0 deletions crates/core/src/operations/constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ impl std::future::IntoFuture for ConstraintBuilder {
let this = self;

Box::pin(async move {
if !this.snapshot.load_config().require_files {
return Err(DeltaTableError::NotInitializedWithFiles(
"ADD CONSTRAINTS".into(),
));
}

let name = match this.name {
Some(v) => v,
None => return Err(DeltaTableError::Generic("No name provided".to_string())),
Expand Down
4 changes: 4 additions & 0 deletions crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,10 @@ async fn execute(
writer_properties: Option<WriterProperties>,
mut commit_properties: CommitProperties,
) -> DeltaResult<(DeltaTableState, DeleteMetrics)> {
if !&snapshot.load_config().require_files {
return Err(DeltaTableError::NotInitializedWithFiles("DELETE".into()));
}

let exec_start = Instant::now();
let mut metrics = DeleteMetrics::default();

Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/operations/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ impl std::future::IntoFuture for LoadBuilder {

Box::pin(async move {
PROTOCOL.can_read_from(&this.snapshot.snapshot)?;
if !this.snapshot.load_config().require_files {
return Err(DeltaTableError::NotInitializedWithFiles("reading".into()));
}

let table = DeltaTable::new_with_state(this.log_store, this.snapshot);
let schema = table.snapshot()?.arrow_schema()?;
Expand Down
4 changes: 4 additions & 0 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -690,6 +690,10 @@ async fn execute(
not_match_target_operations: Vec<MergeOperationConfig>,
not_match_source_operations: Vec<MergeOperationConfig>,
) -> DeltaResult<(DeltaTableState, MergeMetrics)> {
if !snapshot.load_config().require_files {
return Err(DeltaTableError::NotInitializedWithFiles("MERGE".into()));
}

let mut metrics = MergeMetrics::default();
let exec_start = Instant::now();
// Determining whether we should write change data once so that computation of change data can
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,9 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {

Box::pin(async move {
PROTOCOL.can_write_to(&this.snapshot.snapshot)?;
if !&this.snapshot.load_config().require_files {
return Err(DeltaTableError::NotInitializedWithFiles("OPTIMIZE".into()));
}

let writer_properties = this.writer_properties.unwrap_or_else(|| {
WriterProperties::builder()
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/operations/update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,9 @@ async fn execute(
// For files that were identified, scan for records that match the predicate,
// perform update operations, and then commit add and remove actions to
// the log.
if !&snapshot.load_config().require_files {
return Err(DeltaTableError::NotInitializedWithFiles("UPDATE".into()));
}

let update_planner = DeltaPlanner::<UpdateMetricExtensionPlanner> {
extension_planner: UpdateMetricExtensionPlanner {},
Expand Down
5 changes: 4 additions & 1 deletion crates/core/src/operations/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,11 @@ impl std::future::IntoFuture for VacuumBuilder {

fn into_future(self) -> Self::IntoFuture {
let this = self;

Box::pin(async move {
if !&this.snapshot.load_config().require_files {
return Err(DeltaTableError::NotInitializedWithFiles("VACUUM".into()));
}

let plan = this.create_vacuum_plan().await?;
if this.dry_run {
return Ok((
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,9 @@ impl std::future::IntoFuture for WriteBuilder {
if this.mode == SaveMode::Overwrite {
if let Some(snapshot) = &this.snapshot {
PROTOCOL.check_append_only(&snapshot.snapshot)?;
if !snapshot.load_config().require_files {
return Err(DeltaTableError::NotInitializedWithFiles("WRITE".into()));
}
}
}
if this.schema_mode == Some(SchemaMode::Overwrite) && this.mode != SaveMode::Overwrite {
Expand Down
5 changes: 5 additions & 0 deletions crates/core/src/table/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ impl DeltaTableState {
self.snapshot.schema()
}

/// Get the table config which is loaded with of the snapshot
pub fn load_config(&self) -> &DeltaTableConfig {
&self.snapshot.load_config()
}

/// Well known table configuration
pub fn table_config(&self) -> TableConfig<'_> {
self.snapshot.table_config()
Expand Down
1 change: 1 addition & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class RawDeltaTable:
) -> bool: ...
def table_uri(self) -> str: ...
def version(self) -> int: ...
def has_files(self) -> bool: ...
def get_add_file_sizes(self) -> Dict[str, int]: ...
def get_latest_version(self) -> int: ...
def get_num_index_cols(self) -> int: ...
Expand Down
5 changes: 4 additions & 1 deletion python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import os

from deltalake._internal import (
DeltaError,
PyMergeBuilder,
RawDeltaTable,
)
Expand Down Expand Up @@ -1138,6 +1139,9 @@ def to_pyarrow_dataset(
Returns:
the PyArrow dataset in PyArrow
"""
if not self._table.has_files():
raise DeltaError("Table is instantiated without files.")

table_protocol = self.protocol()
if (
table_protocol.min_reader_version > MAX_SUPPORTED_READER_VERSION
Expand All @@ -1158,7 +1162,6 @@ def to_pyarrow_dataset(
raise DeltaProtocolError(
f"The table has set these reader features: {missing_features} but these are not yet supported by the deltalake reader."
)

if not filesystem:
filesystem = pa_fs.PyFileSystem(
DeltaStorageHandler.from_table(
Expand Down
15 changes: 15 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use deltalake::protocol::{DeltaOperation, SaveMode};
use deltalake::storage::IORuntime;
use deltalake::DeltaTableBuilder;
use deltalake::{DeltaOps, DeltaResult};
use error::DeltaError;
use futures::future::join_all;

use pyo3::exceptions::{PyRuntimeError, PyValueError};
Expand Down Expand Up @@ -166,6 +167,10 @@ impl RawDeltaTable {
Ok(self._table.version())
}

pub fn has_files(&self) -> PyResult<bool> {
Ok(self._table.config.require_files)
}

pub fn metadata(&self) -> PyResult<RawDeltaTableMetaData> {
let metadata = self._table.metadata().map_err(PythonError::from)?;
Ok(RawDeltaTableMetaData {
Expand Down Expand Up @@ -273,6 +278,9 @@ impl RawDeltaTable {
py: Python,
partition_filters: Option<Vec<(PyBackedStr, PyBackedStr, PartitionFilterValue)>>,
) -> PyResult<Vec<String>> {
if !self.has_files()? {
return Err(DeltaError::new_err("Table is instantiated without files."));
}
py.allow_threads(|| {
if let Some(filters) = partition_filters {
let filters = convert_partition_filters(filters).map_err(PythonError::from)?;
Expand All @@ -298,6 +306,10 @@ impl RawDeltaTable {
&self,
partition_filters: Option<Vec<(PyBackedStr, PyBackedStr, PartitionFilterValue)>>,
) -> PyResult<Vec<String>> {
if !self._table.config.require_files {
return Err(DeltaError::new_err("Table is initiated without files."));
}

if let Some(filters) = partition_filters {
let filters = convert_partition_filters(filters).map_err(PythonError::from)?;
Ok(self
Expand Down Expand Up @@ -1073,6 +1085,9 @@ impl RawDeltaTable {
}

pub fn get_add_actions(&self, flatten: bool) -> PyResult<PyArrowType<RecordBatch>> {
if !self.has_files()? {
return Err(DeltaError::new_err("Table is instantiated without files."));
}
Ok(PyArrowType(
self._table
.snapshot()
Expand Down

0 comments on commit ad35eda

Please sign in to comment.