Skip to content

Commit

Permalink
feat: rewrite operations (#852)
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap authored Nov 16, 2022
1 parent 15c4bd5 commit e72cdfe
Show file tree
Hide file tree
Showing 25 changed files with 2,016 additions and 1,329 deletions.
197 changes: 114 additions & 83 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 0 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,3 @@ members = [
"dynamodb_lock",
]
exclude = ["proofs", "delta-inspect"]

[profile.dev]
split-debuginfo = "unpacked"
1 change: 1 addition & 0 deletions python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ crate-type = ["cdylib"]
name = "deltalake._internal"

[dependencies]
arrow-schema = { version = "24", features = ["serde"] }
chrono = "0"
env_logger = "0"
futures = "0.3"
Expand Down
51 changes: 30 additions & 21 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod filesystem;
mod schema;
mod utils;

use arrow::pyarrow::PyArrowType;
use chrono::{DateTime, FixedOffset, Utc};
use deltalake::action::{
self, Action, ColumnCountStat, ColumnValueStat, DeltaOperation, SaveMode, Stats,
Expand Down Expand Up @@ -275,8 +276,7 @@ impl RawDeltaTable {
.map_err(PyDeltaTableError::from_raw)?;
serde_json::to_string(
&<ArrowSchema as TryFrom<&deltalake::Schema>>::try_from(schema)
.map_err(PyDeltaTableError::from_arrow)?
.to_json(),
.map_err(PyDeltaTableError::from_arrow)?,
)
.map_err(|_| PyDeltaTableError::new_err("Got invalid table schema"))
}
Expand All @@ -291,7 +291,7 @@ impl RawDeltaTable {
&mut self,
py: Python<'py>,
partition_filters: Option<Vec<(&str, &str, PartitionFilterValue)>>,
schema: ArrowSchema,
schema: PyArrowType<ArrowSchema>,
) -> PyResult<Vec<(String, Option<&'py PyAny>)>> {
let path_set = match partition_filters {
Some(filters) => Some(HashSet::<_>::from_iter(
Expand Down Expand Up @@ -322,10 +322,12 @@ impl RawDeltaTable {
add_actions: Vec<PyAddAction>,
mode: &str,
partition_by: Vec<String>,
schema: ArrowSchema,
schema: PyArrowType<ArrowSchema>,
) -> PyResult<()> {
let mode = save_mode_from_str(mode)?;
let schema: Schema = (&schema).try_into()?;
let schema: Schema = (&schema.0)
.try_into()
.map_err(PyDeltaTableError::from_arrow)?;

let existing_schema = self
._table
Expand Down Expand Up @@ -423,7 +425,7 @@ fn json_value_to_py(value: &serde_json::Value, py: Python) -> PyObject {
/// skipped during a scan.
fn filestats_to_expression<'py>(
py: Python<'py>,
schema: &ArrowSchema,
schema: &PyArrowType<ArrowSchema>,
partitions_values: &HashMap<String, Option<String>>,
stats: Option<Stats>,
) -> PyResult<Option<&'py PyAny>> {
Expand All @@ -433,22 +435,27 @@ fn filestats_to_expression<'py>(
let mut expressions: Vec<PyResult<&PyAny>> = Vec::new();

let cast_to_type = |column_name: &String, value: PyObject, schema: &ArrowSchema| {
let column_type = schema
.field_with_name(column_name)
.map_err(|_| {
PyDeltaTableError::new_err(format!("Column not found in schema: {}", column_name))
})?
.data_type()
.clone()
.into_py(py);
let column_type = PyArrowType(
schema
.field_with_name(column_name)
.map_err(|_| {
PyDeltaTableError::new_err(format!(
"Column not found in schema: {}",
column_name
))
})?
.data_type()
.clone(),
)
.into_py(py);
pa.call_method1("scalar", (value,))?
.call_method1("cast", (column_type,))
};

for (column, value) in partitions_values.iter() {
if let Some(value) = value {
// value is a string, but needs to be parsed into appropriate type
let converted_value = cast_to_type(column, value.into_py(py), schema)?;
let converted_value = cast_to_type(column, value.into_py(py), &schema.0)?;
expressions.push(
field
.call1((column,))?
Expand All @@ -464,7 +471,7 @@ fn filestats_to_expression<'py>(
// Blocked on https://issues.apache.org/jira/browse/ARROW-11259
_ => None,
}) {
let maybe_minimum = cast_to_type(&col_name, minimum, schema);
let maybe_minimum = cast_to_type(&col_name, minimum, &schema.0);
if let Ok(minimum) = maybe_minimum {
expressions.push(field.call1((col_name,))?.call_method1("__ge__", (minimum,)));
}
Expand All @@ -474,7 +481,7 @@ fn filestats_to_expression<'py>(
ColumnValueStat::Value(val) => Some((k.clone(), json_value_to_py(val, py))),
_ => None,
}) {
let maybe_maximum = cast_to_type(&col_name, maximum, schema);
let maybe_maximum = cast_to_type(&col_name, maximum, &schema.0);
if let Ok(maximum) = maybe_maximum {
expressions.push(field.call1((col_name,))?.call_method1("__le__", (maximum,)));
}
Expand Down Expand Up @@ -557,7 +564,7 @@ impl From<&PyAddAction> for action::Add {
#[allow(clippy::too_many_arguments)]
fn write_new_deltalake(
table_uri: String,
schema: ArrowSchema,
schema: PyArrowType<ArrowSchema>,
add_actions: Vec<PyAddAction>,
_mode: &str,
partition_by: Vec<String>,
Expand All @@ -575,7 +582,9 @@ fn write_new_deltalake(
name,
description,
None, // Format
(&schema).try_into()?,
(&schema.0)
.try_into()
.map_err(PyDeltaTableError::from_arrow)?,
partition_by,
configuration.unwrap_or_default(),
);
Expand Down Expand Up @@ -618,10 +627,10 @@ impl PyDeltaDataChecker {
}
}

fn check_batch(&self, batch: RecordBatch) -> PyResult<()> {
fn check_batch(&self, batch: PyArrowType<RecordBatch>) -> PyResult<()> {
self.rt.block_on(async {
self.inner
.check_batch(&batch)
.check_batch(&batch.0)
.await
.map_err(PyDeltaTableError::from_raw)
})
Expand Down
81 changes: 45 additions & 36 deletions python/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use deltalake::arrow::datatypes::{
DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema,
};
use deltalake::arrow::error::ArrowError;
use deltalake::arrow::pyarrow::PyArrowType;
use deltalake::schema::{
Schema, SchemaDataType, SchemaField, SchemaTypeArray, SchemaTypeMap, SchemaTypeStruct,
};
Expand Down Expand Up @@ -211,11 +212,11 @@ impl PrimitiveType {
///
/// :rtype: pyarrow.DataType
#[pyo3(text_signature = "($self)")]
fn to_pyarrow(&self) -> PyResult<ArrowDataType> {
fn to_pyarrow(&self) -> PyResult<PyArrowType<ArrowDataType>> {
let inner_type = SchemaDataType::primitive(self.inner_type.clone());
(&inner_type)
.try_into()
.map_err(|err: ArrowError| PyException::new_err(err.to_string()))
Ok(PyArrowType((&inner_type).try_into().map_err(
|err: ArrowError| PyException::new_err(err.to_string()),
)?))
}

/// Create a PrimitiveType from a PyArrow type
Expand All @@ -227,8 +228,8 @@ impl PrimitiveType {
/// :rtype: PrimitiveType
#[pyo3(text_signature = "(data_type)")]
#[staticmethod]
fn from_pyarrow(data_type: ArrowDataType) -> PyResult<Self> {
let inner_type: SchemaDataType = (&data_type)
fn from_pyarrow(data_type: PyArrowType<ArrowDataType>) -> PyResult<Self> {
let inner_type: SchemaDataType = (&data_type.0)
.try_into()
.map_err(|err: ArrowError| PyException::new_err(err.to_string()))?;

Expand Down Expand Up @@ -374,10 +375,12 @@ impl ArrayType {
///
/// :rtype: pyarrow.DataType
#[pyo3(text_signature = "($self)")]
fn to_pyarrow(&self) -> PyResult<ArrowDataType> {
(&SchemaDataType::array(self.inner_type.clone()))
.try_into()
.map_err(|err: ArrowError| PyException::new_err(err.to_string()))
fn to_pyarrow(&self) -> PyResult<PyArrowType<ArrowDataType>> {
Ok(PyArrowType(
(&SchemaDataType::array(self.inner_type.clone()))
.try_into()
.map_err(|err: ArrowError| PyException::new_err(err.to_string()))?,
))
}

/// Create an ArrayType from a pyarrow.ListType.
Expand All @@ -389,8 +392,8 @@ impl ArrayType {
/// :rtype: ArrayType
#[staticmethod]
#[pyo3(text_signature = "(data_type)")]
fn from_pyarrow(data_type: ArrowDataType) -> PyResult<Self> {
let inner_type: SchemaDataType = (&data_type)
fn from_pyarrow(data_type: PyArrowType<ArrowDataType>) -> PyResult<Self> {
let inner_type: SchemaDataType = (&data_type.0)
.try_into()
.map_err(|err: ArrowError| PyException::new_err(err.to_string()))?;

Expand Down Expand Up @@ -556,10 +559,12 @@ impl MapType {
///
/// :rtype: pyarrow.MapType
#[pyo3(text_signature = "($self)")]
fn to_pyarrow(&self) -> PyResult<ArrowDataType> {
(&SchemaDataType::map(self.inner_type.clone()))
.try_into()
.map_err(|err: ArrowError| PyException::new_err(err.to_string()))
fn to_pyarrow(&self) -> PyResult<PyArrowType<ArrowDataType>> {
Ok(PyArrowType(
(&SchemaDataType::map(self.inner_type.clone()))
.try_into()
.map_err(|err: ArrowError| PyException::new_err(err.to_string()))?,
))
}

/// Create a MapType from a PyArrow MapType.
Expand All @@ -571,8 +576,8 @@ impl MapType {
/// :rtype: MapType
#[staticmethod]
#[pyo3(text_signature = "(data_type)")]
fn from_pyarrow(data_type: ArrowDataType) -> PyResult<Self> {
let inner_type: SchemaDataType = (&data_type)
fn from_pyarrow(data_type: PyArrowType<ArrowDataType>) -> PyResult<Self> {
let inner_type: SchemaDataType = (&data_type.0)
.try_into()
.map_err(|err: ArrowError| PyException::new_err(err.to_string()))?;

Expand Down Expand Up @@ -744,10 +749,10 @@ impl Field {
///
/// :rtype: pyarrow.Field
#[pyo3(text_signature = "($self)")]
fn to_pyarrow(&self) -> PyResult<ArrowField> {
(&self.inner)
.try_into()
.map_err(|err: ArrowError| PyException::new_err(err.to_string()))
fn to_pyarrow(&self) -> PyResult<PyArrowType<ArrowField>> {
Ok(PyArrowType((&self.inner).try_into().map_err(
|err: ArrowError| PyException::new_err(err.to_string()),
)?))
}

/// Create a Field from a PyArrow field
Expand All @@ -759,9 +764,9 @@ impl Field {
/// :rtype: Field
#[staticmethod]
#[pyo3(text_signature = "(field)")]
fn from_pyarrow(field: ArrowField) -> PyResult<Self> {
fn from_pyarrow(field: PyArrowType<ArrowField>) -> PyResult<Self> {
Ok(Self {
inner: SchemaField::try_from(&field)
inner: SchemaField::try_from(&field.0)
.map_err(|err: ArrowError| PyException::new_err(err.to_string()))?,
})
}
Expand Down Expand Up @@ -892,10 +897,12 @@ impl StructType {
///
/// :rtype: pyarrow.StructType
#[pyo3(text_signature = "($self)")]
fn to_pyarrow(&self) -> PyResult<ArrowDataType> {
(&SchemaDataType::r#struct(self.inner_type.clone()))
.try_into()
.map_err(|err: ArrowError| PyException::new_err(err.to_string()))
fn to_pyarrow(&self) -> PyResult<PyArrowType<ArrowDataType>> {
Ok(PyArrowType(
(&SchemaDataType::r#struct(self.inner_type.clone()))
.try_into()
.map_err(|err: ArrowError| PyException::new_err(err.to_string()))?,
))
}

/// Create a new StructType from a PyArrow struct type.
Expand All @@ -907,8 +914,8 @@ impl StructType {
/// :rtype: StructType
#[staticmethod]
#[pyo3(text_signature = "(data_type)")]
fn from_pyarrow(data_type: ArrowDataType) -> PyResult<Self> {
let inner_type: SchemaDataType = (&data_type)
fn from_pyarrow(data_type: PyArrowType<ArrowDataType>) -> PyResult<Self> {
let inner_type: SchemaDataType = (&data_type.0)
.try_into()
.map_err(|err: ArrowError| PyException::new_err(err.to_string()))?;

Expand Down Expand Up @@ -1003,11 +1010,13 @@ impl PySchema {
///
/// :rtype: pyarrow.Schema
#[pyo3(text_signature = "($self)")]
fn to_pyarrow(self_: PyRef<'_, Self>) -> PyResult<ArrowSchema> {
fn to_pyarrow(self_: PyRef<'_, Self>) -> PyResult<PyArrowType<ArrowSchema>> {
let super_ = self_.as_ref();
(&super_.inner_type.clone())
.try_into()
.map_err(|err: ArrowError| PyException::new_err(err.to_string()))
Ok(PyArrowType(
(&super_.inner_type.clone())
.try_into()
.map_err(|err: ArrowError| PyException::new_err(err.to_string()))?,
))
}

/// Create from a PyArrow schema
Expand All @@ -1017,8 +1026,8 @@ impl PySchema {
/// :rtype: Schema
#[staticmethod]
#[pyo3(text_signature = "(data_type)")]
fn from_pyarrow(data_type: ArrowSchema, py: Python) -> PyResult<PyObject> {
let inner_type: SchemaTypeStruct = (&data_type)
fn from_pyarrow(data_type: PyArrowType<ArrowSchema>, py: Python) -> PyResult<PyObject> {
let inner_type: SchemaTypeStruct = (&data_type.0)
.try_into()
.map_err(|err: ArrowError| PyException::new_err(err.to_string()))?;

Expand Down
15 changes: 8 additions & 7 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[package]
name = "deltalake"
version = "0.4.1"
rust-version = "1.64"
authors = ["Qingping Hou <dave2008713@gmail.com>"]
homepage = "https://github.com/delta-io/delta.rs"
license = "Apache-2.0"
Expand All @@ -9,7 +10,7 @@ description = "Native Delta Lake implementation in Rust"
edition = "2021"

[dependencies]
arrow = { version = "22", optional = true }
arrow = { version = "24", optional = true }
async-trait = "0.1"
bytes = "1"
chrono = "0.4.22"
Expand All @@ -23,14 +24,14 @@ num-bigint = "0.4"
num-traits = "0.2.15"
object_store = "0.5.1"
once_cell = "1.16.0"
parquet = { version = "22", features = ["async"], optional = true }
parking_lot = "0.12"
parquet = { version = "24", features = ["async"], optional = true }
parquet2 = { version = "0.16", optional = true }
parquet-format = { version = "~4.0.0" }
percent-encoding = "2"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
thiserror = "1"
tokio = { version = "1", features = ["macros", "rt"] }
tokio = { version = "1", features = ["macros", "rt", "parking_lot"] }
regex = "1"
uuid = { version = "1", features = ["serde", "v4"] }
url = "2.3"
Expand All @@ -45,9 +46,9 @@ rusoto_dynamodb = { version = "0.48", default-features = false, optional = true
rusoto_glue = { version = "0.48", default-features = false, optional = true }

# Datafusion
datafusion = { version = "12", optional = true }
datafusion-expr = { version = "12", optional = true }
datafusion-common = { version = "12", optional = true }
datafusion = { version = "13", optional = true }
datafusion-expr = { version = "13", optional = true }
datafusion-common = { version = "13", optional = true }

# NOTE dependencies only for integration tests
fs_extra = { version = "1.2.0", optional = true }
Expand Down
Loading

0 comments on commit e72cdfe

Please sign in to comment.