Skip to content

Commit

Permalink
Merge pull request #197 from chmp/feature/arrow-abstraction
Browse files Browse the repository at this point in the history
Add generic Array abstraction
  • Loading branch information
chmp committed Jul 21, 2024
2 parents 1f658a6 + 73ecc2d commit fd513cc
Show file tree
Hide file tree
Showing 39 changed files with 1,039 additions and 428 deletions.
4 changes: 2 additions & 2 deletions serde_arrow/src/arrow2_impl/deserialization.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::internal::{
arrow::TimeUnit,
deserialization::{
array_deserializer::ArrayDeserializer,
bool_deserializer::BoolDeserializer,
Expand Down Expand Up @@ -32,7 +33,6 @@ use crate::_impl::arrow2::{
datatypes::{DataType, Field, UnionMode},
types::{f16, NativeType, Offset as ArrowOffset},
};
use crate::internal::schema::GenericTimeUnit;

impl<'de> Deserializer<'de> {
/// Build a deserializer from `arrow2` arrays (*requires one of the
Expand Down Expand Up @@ -215,7 +215,7 @@ pub fn build_date64_deserializer<'a>(
Ok(Date64Deserializer::new(
as_primitive_values(array)?,
get_validity(array),
GenericTimeUnit::Millisecond,
TimeUnit::Millisecond,
field.is_utc()?,
)
.into())
Expand Down
55 changes: 29 additions & 26 deletions serde_arrow/src/arrow2_impl/schema.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use std::collections::HashMap;

use crate::{
_impl::arrow2::datatypes::{DataType, Field, IntegerType, TimeUnit, UnionMode},
_impl::arrow2::datatypes::{
DataType, Field, IntegerType, TimeUnit as ArrowTimeUnit, UnionMode,
},
internal::{
arrow::TimeUnit,
error::{error, fail, Error, Result},
schema::{
merge_strategy_with_metadata, split_strategy_from_metadata, GenericDataType,
GenericField, GenericTimeUnit, SchemaLike, Sealed, SerdeArrowSchema,
GenericField, SchemaLike, Sealed, SerdeArrowSchema,
},
},
};
Expand Down Expand Up @@ -67,7 +70,7 @@ impl TryFrom<&Field> for GenericField {
type Error = Error;

fn try_from(field: &Field) -> Result<Self> {
use {GenericDataType as T, GenericTimeUnit as U};
use {GenericDataType as T, TimeUnit as U};

let metadata = field
.metadata
Expand Down Expand Up @@ -104,26 +107,26 @@ impl TryFrom<&Field> for GenericField {
}
T::Decimal128(*precision as u8, *scale as i8)
}
DataType::Time32(TimeUnit::Second) => T::Time32(U::Second),
DataType::Time32(TimeUnit::Millisecond) => T::Time32(U::Millisecond),
DataType::Time32(ArrowTimeUnit::Second) => T::Time32(U::Second),
DataType::Time32(ArrowTimeUnit::Millisecond) => T::Time32(U::Millisecond),
DataType::Time32(unit) => fail!("Invalid time unit {unit:?} for Time32"),
DataType::Time64(TimeUnit::Microsecond) => T::Time64(U::Microsecond),
DataType::Time64(TimeUnit::Nanosecond) => T::Time64(U::Nanosecond),
DataType::Time64(ArrowTimeUnit::Microsecond) => T::Time64(U::Microsecond),
DataType::Time64(ArrowTimeUnit::Nanosecond) => T::Time64(U::Nanosecond),
DataType::Time64(unit) => fail!("Invalid time unit {unit:?} for Time64"),
DataType::Timestamp(TimeUnit::Second, tz) => T::Timestamp(U::Second, tz.clone()),
DataType::Timestamp(TimeUnit::Millisecond, tz) => {
DataType::Timestamp(ArrowTimeUnit::Second, tz) => T::Timestamp(U::Second, tz.clone()),
DataType::Timestamp(ArrowTimeUnit::Millisecond, tz) => {
T::Timestamp(U::Millisecond, tz.clone())
}
DataType::Timestamp(TimeUnit::Microsecond, tz) => {
DataType::Timestamp(ArrowTimeUnit::Microsecond, tz) => {
T::Timestamp(U::Microsecond, tz.clone())
}
DataType::Timestamp(TimeUnit::Nanosecond, tz) => {
DataType::Timestamp(ArrowTimeUnit::Nanosecond, tz) => {
T::Timestamp(U::Nanosecond, tz.clone())
}
DataType::Duration(TimeUnit::Second) => T::Duration(U::Second),
DataType::Duration(TimeUnit::Millisecond) => T::Duration(U::Millisecond),
DataType::Duration(TimeUnit::Microsecond) => T::Duration(U::Microsecond),
DataType::Duration(TimeUnit::Nanosecond) => T::Duration(U::Nanosecond),
DataType::Duration(ArrowTimeUnit::Second) => T::Duration(U::Second),
DataType::Duration(ArrowTimeUnit::Millisecond) => T::Duration(U::Millisecond),
DataType::Duration(ArrowTimeUnit::Microsecond) => T::Duration(U::Microsecond),
DataType::Duration(ArrowTimeUnit::Nanosecond) => T::Duration(U::Nanosecond),
DataType::List(field) => {
children.push(GenericField::try_from(field.as_ref())?);
T::List
Expand Down Expand Up @@ -194,7 +197,7 @@ impl TryFrom<&GenericField> for Field {
type Error = Error;

fn try_from(value: &GenericField) -> Result<Self> {
use {GenericDataType as T, GenericTimeUnit as U};
use {GenericDataType as T, TimeUnit as U};

let data_type = match &value.data_type {
T::Null => DataType::Null,
Expand All @@ -212,11 +215,11 @@ impl TryFrom<&GenericField> for Field {
T::F64 => DataType::Float64,
T::Date32 => DataType::Date32,
T::Date64 => DataType::Date64,
T::Time32(U::Second) => DataType::Time32(TimeUnit::Second),
T::Time32(U::Millisecond) => DataType::Time32(TimeUnit::Millisecond),
T::Time32(U::Second) => DataType::Time32(ArrowTimeUnit::Second),
T::Time32(U::Millisecond) => DataType::Time32(ArrowTimeUnit::Millisecond),
T::Time32(unit) => fail!("Invalid time unit {unit} for Time32"),
T::Time64(U::Microsecond) => DataType::Time64(TimeUnit::Microsecond),
T::Time64(U::Nanosecond) => DataType::Time64(TimeUnit::Nanosecond),
T::Time64(U::Microsecond) => DataType::Time64(ArrowTimeUnit::Microsecond),
T::Time64(U::Nanosecond) => DataType::Time64(ArrowTimeUnit::Nanosecond),
T::Time64(unit) => fail!("Invalid time unit {unit} for Time64"),
T::Timestamp(unit, tz) => DataType::Timestamp((*unit).into(), tz.clone()),
T::Duration(unit) => DataType::Duration((*unit).into()),
Expand Down Expand Up @@ -306,13 +309,13 @@ impl TryFrom<&GenericField> for Field {
}
}

impl From<GenericTimeUnit> for TimeUnit {
fn from(value: GenericTimeUnit) -> Self {
impl From<TimeUnit> for ArrowTimeUnit {
fn from(value: TimeUnit) -> Self {
match value {
GenericTimeUnit::Second => Self::Second,
GenericTimeUnit::Millisecond => Self::Millisecond,
GenericTimeUnit::Microsecond => Self::Microsecond,
GenericTimeUnit::Nanosecond => Self::Nanosecond,
TimeUnit::Second => Self::Second,
TimeUnit::Millisecond => Self::Millisecond,
TimeUnit::Microsecond => Self::Microsecond,
TimeUnit::Nanosecond => Self::Nanosecond,
}
}
}
7 changes: 4 additions & 3 deletions serde_arrow/src/arrow_impl/deserialization.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::internal::{
arrow::TimeUnit,
deserialization::{
array_deserializer::ArrayDeserializer,
binary_deserializer::BinaryDeserializer,
Expand All @@ -22,7 +23,7 @@ use crate::internal::{
},
deserializer::Deserializer,
error::{fail, Result},
schema::{GenericDataType, GenericField, GenericTimeUnit},
schema::{GenericDataType, GenericField},
utils::Offset,
};

Expand Down Expand Up @@ -124,7 +125,7 @@ pub fn build_array_deserializer<'a>(
field: &GenericField,
array: &'a dyn Array,
) -> Result<ArrayDeserializer<'a>> {
use {GenericDataType as T, GenericTimeUnit as U};
use {GenericDataType as T, TimeUnit as U};
match &field.data_type {
T::Null => Ok(NullDeserializer.into()),
T::Bool => build_bool_deserializer(field, array),
Expand Down Expand Up @@ -284,7 +285,7 @@ pub fn build_date64_deserializer<'a>(
Ok(Date64Deserializer::new(
as_primitive_values::<Date64Type>(array)?,
get_validity(array),
GenericTimeUnit::Millisecond,
TimeUnit::Millisecond,
field.is_utc()?,
)
.into())
Expand Down
79 changes: 53 additions & 26 deletions serde_arrow/src/arrow_impl/schema.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use std::sync::Arc;

use crate::{
_impl::arrow::datatypes::{DataType, Field, FieldRef, TimeUnit, UnionMode},
_impl::arrow::datatypes::{DataType, Field, FieldRef, TimeUnit as ArrowTimeUnit, UnionMode},
internal::{
arrow::TimeUnit,
error::{error, fail, Error, Result},
schema::{
merge_strategy_with_metadata, split_strategy_from_metadata, GenericDataType,
GenericField, GenericTimeUnit, SchemaLike, Sealed, SerdeArrowSchema,
GenericField, SchemaLike, Sealed, SerdeArrowSchema,
},
},
};
Expand Down Expand Up @@ -123,7 +124,7 @@ impl TryFrom<&DataType> for GenericDataType {
type Error = Error;

fn try_from(value: &DataType) -> Result<GenericDataType> {
use {GenericDataType as T, GenericTimeUnit as U};
use {GenericDataType as T, TimeUnit as U};
match value {
DataType::Boolean => Ok(T::Bool),
DataType::Null => Ok(T::Null),
Expand All @@ -143,31 +144,31 @@ impl TryFrom<&DataType> for GenericDataType {
DataType::Date32 => Ok(T::Date32),
DataType::Date64 => Ok(T::Date64),
DataType::Decimal128(precision, scale) => Ok(T::Decimal128(*precision, *scale)),
DataType::Time32(TimeUnit::Second) => Ok(T::Time32(U::Second)),
DataType::Time32(TimeUnit::Millisecond) => Ok(T::Time32(U::Millisecond)),
DataType::Time32(ArrowTimeUnit::Second) => Ok(T::Time32(U::Second)),
DataType::Time32(ArrowTimeUnit::Millisecond) => Ok(T::Time32(U::Millisecond)),
DataType::Time32(unit) => fail!("Invalid time unit {unit:?} for Time32"),
DataType::Time64(TimeUnit::Microsecond) => Ok(T::Time64(U::Microsecond)),
DataType::Time64(TimeUnit::Nanosecond) => Ok(T::Time64(U::Nanosecond)),
DataType::Time64(ArrowTimeUnit::Microsecond) => Ok(T::Time64(U::Microsecond)),
DataType::Time64(ArrowTimeUnit::Nanosecond) => Ok(T::Time64(U::Nanosecond)),
DataType::Time64(unit) => fail!("Invalid time unit {unit:?} for Time64"),
DataType::Timestamp(TimeUnit::Second, tz) => {
DataType::Timestamp(ArrowTimeUnit::Second, tz) => {
Ok(T::Timestamp(U::Second, tz.as_ref().map(|s| s.to_string())))
}
DataType::Timestamp(TimeUnit::Millisecond, tz) => Ok(T::Timestamp(
DataType::Timestamp(ArrowTimeUnit::Millisecond, tz) => Ok(T::Timestamp(
U::Millisecond,
tz.as_ref().map(|s| s.to_string()),
)),
DataType::Timestamp(TimeUnit::Microsecond, tz) => Ok(T::Timestamp(
DataType::Timestamp(ArrowTimeUnit::Microsecond, tz) => Ok(T::Timestamp(
U::Microsecond,
tz.as_ref().map(|s| s.to_string()),
)),
DataType::Timestamp(TimeUnit::Nanosecond, tz) => Ok(T::Timestamp(
DataType::Timestamp(ArrowTimeUnit::Nanosecond, tz) => Ok(T::Timestamp(
U::Nanosecond,
tz.as_ref().map(|s| s.to_string()),
)),
DataType::Duration(TimeUnit::Second) => Ok(T::Duration(U::Second)),
DataType::Duration(TimeUnit::Millisecond) => Ok(T::Duration(U::Millisecond)),
DataType::Duration(TimeUnit::Microsecond) => Ok(T::Duration(U::Microsecond)),
DataType::Duration(TimeUnit::Nanosecond) => Ok(T::Duration(U::Nanosecond)),
DataType::Duration(ArrowTimeUnit::Second) => Ok(T::Duration(U::Second)),
DataType::Duration(ArrowTimeUnit::Millisecond) => Ok(T::Duration(U::Millisecond)),
DataType::Duration(ArrowTimeUnit::Microsecond) => Ok(T::Duration(U::Microsecond)),
DataType::Duration(ArrowTimeUnit::Nanosecond) => Ok(T::Duration(U::Nanosecond)),
DataType::Binary => Ok(T::Binary),
DataType::LargeBinary => Ok(T::LargeBinary),
DataType::FixedSizeBinary(n) => Ok(T::FixedSizeBinary(*n)),
Expand Down Expand Up @@ -253,7 +254,7 @@ impl TryFrom<&GenericField> for Field {
type Error = Error;

fn try_from(value: &GenericField) -> Result<Self> {
use {GenericDataType as T, GenericTimeUnit as U};
use {GenericDataType as T, TimeUnit as U};

let data_type = match &value.data_type {
T::Null => DataType::Null,
Expand Down Expand Up @@ -354,11 +355,11 @@ impl TryFrom<&GenericField> for Field {

DataType::Dictionary(Box::new(key_type), Box::new(val_field.data_type().clone()))
}
T::Time32(U::Second) => DataType::Time32(TimeUnit::Second),
T::Time32(U::Millisecond) => DataType::Time32(TimeUnit::Millisecond),
T::Time32(U::Second) => DataType::Time32(ArrowTimeUnit::Second),
T::Time32(U::Millisecond) => DataType::Time32(ArrowTimeUnit::Millisecond),
T::Time32(unit) => fail!("invalid time unit {unit} for Time32"),
T::Time64(U::Microsecond) => DataType::Time64(TimeUnit::Microsecond),
T::Time64(U::Nanosecond) => DataType::Time64(TimeUnit::Nanosecond),
T::Time64(U::Microsecond) => DataType::Time64(ArrowTimeUnit::Microsecond),
T::Time64(U::Nanosecond) => DataType::Time64(ArrowTimeUnit::Nanosecond),
T::Time64(unit) => fail!("invalid time unit {unit} for Time64"),
T::Timestamp(unit, tz) => {
DataType::Timestamp((*unit).into(), tz.clone().map(|s| s.into()))
Expand All @@ -376,13 +377,39 @@ impl TryFrom<&GenericField> for Field {
}
}

impl From<GenericTimeUnit> for TimeUnit {
fn from(value: GenericTimeUnit) -> Self {
impl From<TimeUnit> for ArrowTimeUnit {
fn from(value: TimeUnit) -> Self {
match value {
GenericTimeUnit::Second => Self::Second,
GenericTimeUnit::Millisecond => Self::Millisecond,
GenericTimeUnit::Microsecond => Self::Microsecond,
GenericTimeUnit::Nanosecond => Self::Nanosecond,
TimeUnit::Second => Self::Second,
TimeUnit::Millisecond => Self::Millisecond,
TimeUnit::Microsecond => Self::Microsecond,
TimeUnit::Nanosecond => Self::Nanosecond,
}
}
}

impl TryFrom<crate::internal::arrow::DataType> for DataType {
type Error = Error;

fn try_from(value: crate::internal::arrow::DataType) -> Result<Self> {
use {crate::internal::arrow::DataType as DT, DataType as ArrowDT};

match value {
DT::Int8 => Ok(ArrowDT::Int8),
DT::Int16 => Ok(ArrowDT::Int16),
DT::Int32 => Ok(ArrowDT::Int32),
DT::Int64 => Ok(ArrowDT::Int64),
DT::UInt8 => Ok(ArrowDT::UInt8),
DT::UInt16 => Ok(ArrowDT::UInt16),
DT::UInt32 => Ok(ArrowDT::UInt32),
DT::UInt64 => Ok(ArrowDT::UInt64),
DT::Float16 => Ok(ArrowDT::Float16),
DT::Float32 => Ok(ArrowDT::Float32),
DT::Float64 => Ok(ArrowDT::Float64),
dt => fail!(
"{} not supported",
crate::internal::arrow::BaseDataTypeDisplay(&dt)
),
}
}
}
Loading

0 comments on commit fd513cc

Please sign in to comment.