From 8e6d83603f1a15a19586ca87c7f72cb4a4ed93bb Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Tue, 8 Aug 2023 17:48:15 -0700 Subject: [PATCH 1/2] Add deserialization of Bytes -> Decimal --- src/io/parquet/read/deserialize/simple.rs | 32 ++++++++++++++++++++++- src/io/parquet/read/schema/convert.rs | 2 ++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/src/io/parquet/read/deserialize/simple.rs b/src/io/parquet/read/deserialize/simple.rs index d19296a4b72..299f339a902 100644 --- a/src/io/parquet/read/deserialize/simple.rs +++ b/src/io/parquet/read/deserialize/simple.rs @@ -8,7 +8,7 @@ use parquet2::{ use crate::types::i256; use crate::{ - array::{Array, DictionaryKey, MutablePrimitiveArray, PrimitiveArray}, + array::{Array, DictionaryKey, MutablePrimitiveArray, PrimitiveArray, BinaryArray}, datatypes::{DataType, IntervalUnit, TimeUnit}, error::{Error, Result}, types::{days_ms, NativeType}, @@ -230,6 +230,36 @@ pub fn page_iter_to_arrays<'a, I: Pages + 'a>( Box::new(arrays) as _ } + (PhysicalType::ByteArray, Decimal(_, _)) => { + let pages = binary::Iter::::new( + pages, + DataType::Binary, + chunk_size, + num_rows, + ); + + let pages = pages.map(move |maybe_array| { + let array = maybe_array?; + let array_len = array.len(); + let array = array.as_any().downcast_ref::>().unwrap(); + let values = (0..array_len).map(|i| { + let value = array.value(i); + let n = value.len(); + if n > 16 { + return Err(Error::Overflow); + } + Ok(super::super::convert_i128(value, n)) + }) + .collect::>>(); + let validity = array.validity().cloned(); + + PrimitiveArray::::try_new(data_type.clone(), values?.into(), validity) + }); + + let arrays = pages.map(|x| x.map(|x| x.boxed())); + + Box::new(arrays) as _ + } (PhysicalType::Int32, Decimal256(_, _)) => dyn_iter(iden(primitive::IntegerIter::new( pages, data_type, diff --git a/src/io/parquet/read/schema/convert.rs b/src/io/parquet/read/schema/convert.rs index 821d5107649..a8ea3dc3928 100644 --- a/src/io/parquet/read/schema/convert.rs +++ b/src/io/parquet/read/schema/convert.rs @@ -142,6 +142,8 @@ fn from_byte_array( (_, Some(PrimitiveConvertedType::Bson)) => DataType::Binary, (_, Some(PrimitiveConvertedType::Enum)) => DataType::Binary, (_, Some(PrimitiveConvertedType::Utf8)) => DataType::Utf8, + (Some(PrimitiveLogicalType::Decimal(precision, scale)), _) => DataType::Decimal(*precision, *scale), + (_, Some(PrimitiveConvertedType::Decimal(precision, scale))) => DataType::Decimal(*precision, *scale), (_, _) => DataType::Binary, } } From ab0485612060444305e2053f0a5d18805514f9e6 Mon Sep 17 00:00:00 2001 From: Jay Chia Date: Tue, 5 Sep 2023 13:54:58 -0700 Subject: [PATCH 2/2] fmt and lints --- src/io/parquet/read/deserialize/simple.rs | 28 ++++++++++------------- src/io/parquet/read/schema/convert.rs | 8 +++++-- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/io/parquet/read/deserialize/simple.rs b/src/io/parquet/read/deserialize/simple.rs index 299f339a902..15cbca10b9b 100644 --- a/src/io/parquet/read/deserialize/simple.rs +++ b/src/io/parquet/read/deserialize/simple.rs @@ -8,7 +8,7 @@ use parquet2::{ use crate::types::i256; use crate::{ - array::{Array, DictionaryKey, MutablePrimitiveArray, PrimitiveArray, BinaryArray}, + array::{Array, BinaryArray, DictionaryKey, MutablePrimitiveArray, PrimitiveArray}, datatypes::{DataType, IntervalUnit, TimeUnit}, error::{Error, Result}, types::{days_ms, NativeType}, @@ -231,26 +231,22 @@ pub fn page_iter_to_arrays<'a, I: Pages + 'a>( Box::new(arrays) as _ } (PhysicalType::ByteArray, Decimal(_, _)) => { - let pages = binary::Iter::::new( - pages, - DataType::Binary, - chunk_size, - num_rows, - ); + let pages = binary::Iter::::new(pages, DataType::Binary, chunk_size, num_rows); let pages = pages.map(move |maybe_array| { let array = maybe_array?; let array_len = array.len(); let array = array.as_any().downcast_ref::>().unwrap(); - let values = (0..array_len).map(|i| { - let value = array.value(i); - let n = value.len(); - if n > 16 { - return Err(Error::Overflow); - } - Ok(super::super::convert_i128(value, n)) - }) - .collect::>>(); + let values = (0..array_len) + .map(|i| { + let value = array.value(i); + let n = value.len(); + if n > 16 { + return Err(Error::Overflow); + } + Ok(super::super::convert_i128(value, n)) + }) + .collect::>>(); let validity = array.validity().cloned(); PrimitiveArray::::try_new(data_type.clone(), values?.into(), validity) diff --git a/src/io/parquet/read/schema/convert.rs b/src/io/parquet/read/schema/convert.rs index a8ea3dc3928..ef3786c1e9b 100644 --- a/src/io/parquet/read/schema/convert.rs +++ b/src/io/parquet/read/schema/convert.rs @@ -142,8 +142,12 @@ fn from_byte_array( (_, Some(PrimitiveConvertedType::Bson)) => DataType::Binary, (_, Some(PrimitiveConvertedType::Enum)) => DataType::Binary, (_, Some(PrimitiveConvertedType::Utf8)) => DataType::Utf8, - (Some(PrimitiveLogicalType::Decimal(precision, scale)), _) => DataType::Decimal(*precision, *scale), - (_, Some(PrimitiveConvertedType::Decimal(precision, scale))) => DataType::Decimal(*precision, *scale), + (Some(PrimitiveLogicalType::Decimal(precision, scale)), _) => { + DataType::Decimal(*precision, *scale) + } + (_, Some(PrimitiveConvertedType::Decimal(precision, scale))) => { + DataType::Decimal(*precision, *scale) + } (_, _) => DataType::Binary, } }