diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 773598b4..392d713b 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -32,6 +32,8 @@ jobs: - uses: Swatinem/rust-cache@v2 - name: build and lint with clippy run: cargo clippy --tests -- -D warnings + - name: lint without default features + run: cargo clippy --no-default--features -- -D warnings test: runs-on: ubuntu-latest steps: diff --git a/kernel/Cargo.toml b/kernel/Cargo.toml index 760321cf..1ba893a5 100644 --- a/kernel/Cargo.toml +++ b/kernel/Cargo.toml @@ -21,8 +21,6 @@ either = "1.8" fix-hidden-lifetime-bug = "0.2" itertools = "0.11" lazy_static = "1.4" -# used for providing a storage abstraction layer -object_store = "^0.7.0" # need to generalize over arrow, arrow2 and diff parquet etc. (BYOP) regex = "1.8" roaring = "0.10.1" @@ -37,6 +35,7 @@ z85 = "3.0.5" # Used in default client futures = { version = "0.3", optional = true } +object_store = { version = "^0.7.0", optional = true } parquet = { version = "^46.0", optional = true, features=["async", "object_store"]} # optionally used with default client (though not required) @@ -44,7 +43,7 @@ tokio = { version = "1", optional = true, features=["rt-multi-thread"] } [features] default = ["default-client"] -default-client = ["chrono", "parquet", "futures"] +default-client = ["chrono", "futures", "object_store", "parquet"] [dev-dependencies] arrow = { version = "^46.0", features = ["json", "prettyprint"] } diff --git a/kernel/src/actions/mod.rs b/kernel/src/actions/mod.rs index 5cd7fd0e..f22da04a 100644 --- a/kernel/src/actions/mod.rs +++ b/kernel/src/actions/mod.rs @@ -10,6 +10,7 @@ use itertools::izip; use crate::{DeltaResult, Error}; +pub(crate) mod arrow; pub(crate) mod schemas; pub(crate) mod types; diff --git a/kernel/src/client/arrow.rs b/kernel/src/client/arrow.rs deleted file mode 100644 index 5408eb8c..00000000 --- a/kernel/src/client/arrow.rs +++ /dev/null @@ -1,275 +0,0 @@ -use std::sync::Arc; - -use arrow_schema::{ - ArrowError, DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema, - SchemaRef as ArrowSchemaRef, TimeUnit, -}; - -use crate::schema::{ArrayType, DataType, MapType, PrimitiveType, StructField, StructType}; - -impl TryFrom<&StructType> for ArrowSchema { - type Error = ArrowError; - - fn try_from(s: &StructType) -> Result { - let fields = s - .fields() - .iter() - .map(|f| >::try_from(*f)) - .collect::, ArrowError>>()?; - - Ok(ArrowSchema::new(fields)) - } -} - -impl TryFrom<&StructField> for ArrowField { - type Error = ArrowError; - - fn try_from(f: &StructField) -> Result { - let metadata = f - .metadata() - .iter() - .map(|(key, val)| Ok((key.clone(), serde_json::to_string(val)?))) - .collect::>() - .map_err(|err| ArrowError::JsonError(err.to_string()))?; - - let field = ArrowField::new( - f.name(), - ArrowDataType::try_from(f.data_type())?, - f.is_nullable(), - ) - .with_metadata(metadata); - - Ok(field) - } -} - -impl TryFrom<&ArrayType> for ArrowField { - type Error = ArrowError; - - fn try_from(a: &ArrayType) -> Result { - Ok(ArrowField::new( - "item", - ArrowDataType::try_from(a.element_type())?, - a.contains_null(), - )) - } -} - -impl TryFrom<&MapType> for ArrowField { - type Error = ArrowError; - - fn try_from(a: &MapType) -> Result { - Ok(ArrowField::new( - "entries", - ArrowDataType::Struct( - vec![ - ArrowField::new("key", ArrowDataType::try_from(a.key_type())?, false), - ArrowField::new( - "value", - ArrowDataType::try_from(a.value_type())?, - a.value_contains_null(), - ), - ] - .into(), - ), - false, // always non-null - )) - } -} - -impl TryFrom<&DataType> for ArrowDataType { - type Error = ArrowError; - - fn try_from(t: &DataType) -> Result { - match t { - DataType::Primitive(p) => { - match p { - PrimitiveType::String => Ok(ArrowDataType::Utf8), - PrimitiveType::Long => Ok(ArrowDataType::Int64), // undocumented type - PrimitiveType::Integer => Ok(ArrowDataType::Int32), - PrimitiveType::Short => Ok(ArrowDataType::Int16), - PrimitiveType::Byte => Ok(ArrowDataType::Int8), - PrimitiveType::Float => Ok(ArrowDataType::Float32), - PrimitiveType::Double => Ok(ArrowDataType::Float64), - PrimitiveType::Boolean => Ok(ArrowDataType::Boolean), - PrimitiveType::Binary => Ok(ArrowDataType::Binary), - PrimitiveType::Decimal(precision, scale) => { - let precision = u8::try_from(*precision).map_err(|_| { - ArrowError::SchemaError(format!( - "Invalid precision for decimal: {}", - precision - )) - })?; - let scale = i8::try_from(*scale).map_err(|_| { - ArrowError::SchemaError(format!("Invalid scale for decimal: {}", scale)) - })?; - - if precision <= 38 { - Ok(ArrowDataType::Decimal128(precision, scale)) - } else if precision <= 76 { - Ok(ArrowDataType::Decimal256(precision, scale)) - } else { - Err(ArrowError::SchemaError(format!( - "Precision too large to be represented in Arrow: {}", - precision - ))) - } - } - PrimitiveType::Date => { - // A calendar date, represented as a year-month-day triple without a - // timezone. Stored as 4 bytes integer representing days since 1970-01-01 - Ok(ArrowDataType::Date32) - } - PrimitiveType::Timestamp => { - // Issue: https://github.com/delta-io/delta/issues/643 - Ok(ArrowDataType::Timestamp(TimeUnit::Microsecond, None)) - } - } - } - DataType::Struct(s) => Ok(ArrowDataType::Struct( - s.fields() - .iter() - .map(|f| >::try_from(*f)) - .collect::, ArrowError>>()? - .into(), - )), - DataType::Array(a) => Ok(ArrowDataType::List(Arc::new(>::try_from(a)?))), - DataType::Map(m) => Ok(ArrowDataType::Map( - Arc::new(ArrowField::new( - "entries", - ArrowDataType::Struct( - vec![ - ArrowField::new( - "key", - >::try_from(m.key_type())?, - false, - ), - ArrowField::new( - "value", - >::try_from(m.value_type())?, - m.value_contains_null(), - ), - ] - .into(), - ), - true, - )), - false, - )), - } - } -} - -impl TryFrom<&ArrowSchema> for StructType { - type Error = ArrowError; - - fn try_from(arrow_schema: &ArrowSchema) -> Result { - let new_fields: Result, _> = arrow_schema - .fields() - .iter() - .map(|field| field.as_ref().try_into()) - .collect(); - Ok(StructType::new(new_fields?)) - } -} - -impl TryFrom for StructType { - type Error = ArrowError; - - fn try_from(arrow_schema: ArrowSchemaRef) -> Result { - arrow_schema.as_ref().try_into() - } -} - -impl TryFrom<&ArrowField> for StructField { - type Error = ArrowError; - - fn try_from(arrow_field: &ArrowField) -> Result { - Ok(StructField::new( - arrow_field.name().clone(), - arrow_field.data_type().try_into()?, - arrow_field.is_nullable(), - ) - .with_metadata(arrow_field.metadata().iter().map(|(k, v)| (k.clone(), v)))) - } -} - -impl TryFrom<&ArrowDataType> for DataType { - type Error = ArrowError; - - fn try_from(arrow_datatype: &ArrowDataType) -> Result { - match arrow_datatype { - ArrowDataType::Utf8 => Ok(DataType::Primitive(PrimitiveType::String)), - ArrowDataType::LargeUtf8 => Ok(DataType::Primitive(PrimitiveType::String)), - ArrowDataType::Int64 => Ok(DataType::Primitive(PrimitiveType::Long)), // undocumented type - ArrowDataType::Int32 => Ok(DataType::Primitive(PrimitiveType::Integer)), - ArrowDataType::Int16 => Ok(DataType::Primitive(PrimitiveType::Short)), - ArrowDataType::Int8 => Ok(DataType::Primitive(PrimitiveType::Byte)), - ArrowDataType::UInt64 => Ok(DataType::Primitive(PrimitiveType::Long)), // undocumented type - ArrowDataType::UInt32 => Ok(DataType::Primitive(PrimitiveType::Integer)), - ArrowDataType::UInt16 => Ok(DataType::Primitive(PrimitiveType::Short)), - ArrowDataType::UInt8 => Ok(DataType::Primitive(PrimitiveType::Boolean)), - ArrowDataType::Float32 => Ok(DataType::Primitive(PrimitiveType::Float)), - ArrowDataType::Float64 => Ok(DataType::Primitive(PrimitiveType::Double)), - ArrowDataType::Boolean => Ok(DataType::Primitive(PrimitiveType::Boolean)), - ArrowDataType::Binary => Ok(DataType::Primitive(PrimitiveType::Binary)), - ArrowDataType::FixedSizeBinary(_) => Ok(DataType::Primitive(PrimitiveType::Binary)), - ArrowDataType::LargeBinary => Ok(DataType::Primitive(PrimitiveType::Binary)), - ArrowDataType::Decimal128(p, s) => Ok(DataType::Primitive(PrimitiveType::Decimal( - *p as i32, *s as i32, - ))), - ArrowDataType::Decimal256(p, s) => Ok(DataType::Primitive(PrimitiveType::Decimal( - *p as i32, *s as i32, - ))), - ArrowDataType::Date32 => Ok(DataType::Primitive(PrimitiveType::Date)), - ArrowDataType::Date64 => Ok(DataType::Primitive(PrimitiveType::Date)), - ArrowDataType::Timestamp(TimeUnit::Microsecond, None) => { - Ok(DataType::Primitive(PrimitiveType::Timestamp)) - } - ArrowDataType::Timestamp(TimeUnit::Microsecond, Some(tz)) - if tz.eq_ignore_ascii_case("utc") => - { - Ok(DataType::Primitive(PrimitiveType::Timestamp)) - } - ArrowDataType::Struct(fields) => { - let converted_fields: Result, _> = fields - .iter() - .map(|field| field.as_ref().try_into()) - .collect(); - Ok(DataType::Struct(Box::new(StructType::new( - converted_fields?, - )))) - } - ArrowDataType::List(field) => Ok(DataType::Array(Box::new(ArrayType::new( - (*field).data_type().try_into()?, - (*field).is_nullable(), - )))), - ArrowDataType::LargeList(field) => Ok(DataType::Array(Box::new(ArrayType::new( - (*field).data_type().try_into()?, - (*field).is_nullable(), - )))), - ArrowDataType::FixedSizeList(field, _) => Ok(DataType::Array(Box::new( - ArrayType::new((*field).data_type().try_into()?, (*field).is_nullable()), - ))), - ArrowDataType::Map(field, _) => { - if let ArrowDataType::Struct(struct_fields) = field.data_type() { - let key_type = struct_fields[0].data_type().try_into()?; - let value_type = struct_fields[1].data_type().try_into()?; - let value_type_nullable = struct_fields[1].is_nullable(); - Ok(DataType::Map(Box::new(MapType::new( - key_type, - value_type, - value_type_nullable, - )))) - } else { - panic!("DataType::Map should contain a struct field child"); - } - } - s => Err(ArrowError::SchemaError(format!( - "Invalid data type for Delta Lake: {s}" - ))), - } - } -} diff --git a/kernel/src/client/mod.rs b/kernel/src/client/mod.rs index a88d47f4..a4a5fc70 100644 --- a/kernel/src/client/mod.rs +++ b/kernel/src/client/mod.rs @@ -13,7 +13,6 @@ use crate::{ DeltaResult, ExpressionHandler, FileSystemClient, JsonHandler, ParquetHandler, TableClient, }; -pub mod arrow; pub mod executor; pub mod file_handler; pub mod filesystem; diff --git a/kernel/src/error.rs b/kernel/src/error.rs index 4c172103..89442127 100644 --- a/kernel/src/error.rs +++ b/kernel/src/error.rs @@ -14,9 +14,11 @@ pub enum Error { source: Box, }, + #[cfg(feature = "parquet")] #[error("Arrow error: {0}")] Parquet(#[from] parquet::errors::ParquetError), + #[cfg(feature = "object_store")] #[error("Error interacting with object store: {0}")] ObjectStore(object_store::Error), @@ -48,6 +50,7 @@ pub enum Error { MissingMetadata, } +#[cfg(feature = "object_store")] impl From for Error { fn from(value: object_store::Error) -> Self { match value {