Skip to content

Commit

Permalink
Support reading tables with type widening in default parquet reader
Browse files Browse the repository at this point in the history
  • Loading branch information
johanl-db committed Sep 13, 2024
1 parent ed1919a commit ec8600e
Show file tree
Hide file tree
Showing 13 changed files with 174 additions and 0 deletions.
106 changes: 106 additions & 0 deletions kernel/src/engine/arrow_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,55 @@ fn check_cast_compat(
target_type: ArrowDataType,
source_type: &ArrowDataType,
) -> DeltaResult<DataTypeCompat> {
use ArrowDataType::*;

match (source_type, &target_type) {
(source_type, target_type) if source_type == target_type => Ok(DataTypeCompat::Identical),
(&ArrowDataType::Timestamp(_, _), &ArrowDataType::Timestamp(_, _)) => {
// timestamps are able to be cast between each other
Ok(DataTypeCompat::NeedsCast(target_type))
}
// Allow up-casting to a larger type if it's safe and can't cause overflow or loss of precision.
(Int8, Int16 | Int32 | Int64 | Float64) => Ok(DataTypeCompat::NeedsCast(target_type)),
(Int16, Int32 | Int64 | Float64) => Ok(DataTypeCompat::NeedsCast(target_type)),
(Int32, Int64 | Float64) => Ok(DataTypeCompat::NeedsCast(target_type)),
(Float32, Float64) => Ok(DataTypeCompat::NeedsCast(target_type)),
(_, Decimal128(p, s) | Decimal256(p, s)) if can_upcast_to_decimal(source_type, *p, *s) => {
Ok(DataTypeCompat::NeedsCast(target_type))
}
(Date32, Timestamp(_, None)) => Ok(DataTypeCompat::NeedsCast(target_type)),
_ => Err(make_arrow_error(format!(
"Incorrect datatype. Expected {}, got {}",
target_type, source_type
))),
}
}

// Returns whether the given source type can be safely cast to a decimal with the given precision and scale without
// loss of information.
pub(crate) fn can_upcast_to_decimal(
source_type: &ArrowDataType,
target_precision: u8,
target_scale: i8,
) -> bool {
use ArrowDataType::*;

let (source_precision, source_scale) = match source_type {
Decimal128(p, s) => (*p, *s),
Decimal256(p, s) => (*p, *s),
// Allow converting integers to a decimal that can hold all possible values.
Int8 => (3u8, 0i8),
Int16 => (5u8, 0i8),
Int32 => (10u8, 0i8),
Int64 => (20u8, 0i8),
_ => return false,
};

target_precision >= source_precision
&& target_scale >= source_scale
&& target_precision - source_precision >= (target_scale - source_scale) as u8
}

/// Ensure a kernel data type matches an arrow data type. This only ensures that the actual "type"
/// is the same, but does so recursively into structs, and ensures lists and maps have the correct
/// associated types as well. This returns an `Ok(DataTypeCompat)` if the types are compatible, and
Expand Down Expand Up @@ -1427,4 +1463,74 @@ mod tests {
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}

#[test]
fn accepts_safe_decimal_casts() {
use super::can_upcast_to_decimal;
use ArrowDataType::*;

assert!(can_upcast_to_decimal(&Decimal128(1, 0), 2u8, 0i8));
assert!(can_upcast_to_decimal(&Decimal128(1, 0), 2u8, 1i8));
assert!(can_upcast_to_decimal(&Decimal128(5, -2), 6u8, -2i8));
assert!(can_upcast_to_decimal(&Decimal128(5, -2), 6u8, -1i8));
assert!(can_upcast_to_decimal(&Decimal128(5, 1), 6u8, 1i8));
assert!(can_upcast_to_decimal(&Decimal128(5, 1), 6u8, 2i8));
assert!(can_upcast_to_decimal(
&Decimal128(10, 5),
arrow_schema::DECIMAL128_MAX_PRECISION,
arrow_schema::DECIMAL128_MAX_SCALE - 5
));
assert!(can_upcast_to_decimal(&Decimal256(17, 5), 30u8, 5i8));
assert!(can_upcast_to_decimal(&Decimal256(17, 5), 30u8, 7i8));
assert!(can_upcast_to_decimal(&Decimal256(17, 5), 30u8, 7i8));
assert!(can_upcast_to_decimal(&Decimal256(17, -5), 30u8, -5i8));
assert!(can_upcast_to_decimal(&Decimal256(17, -5), 30u8, -3i8));
assert!(can_upcast_to_decimal(
&Decimal256(10, 5),
arrow_schema::DECIMAL256_MAX_PRECISION,
arrow_schema::DECIMAL256_MAX_SCALE - 5
));

assert!(can_upcast_to_decimal(&Int8, 3u8, 0i8));
assert!(can_upcast_to_decimal(&Int8, 4u8, 0i8));
assert!(can_upcast_to_decimal(&Int8, 4u8, 1i8));
assert!(can_upcast_to_decimal(&Int8, 7u8, 2i8));

assert!(can_upcast_to_decimal(&Int16, 5u8, 0i8));
assert!(can_upcast_to_decimal(&Int16, 6u8, 0i8));
assert!(can_upcast_to_decimal(&Int16, 6u8, 1i8));
assert!(can_upcast_to_decimal(&Int16, 9u8, 2i8));

assert!(can_upcast_to_decimal(&Int32, 10u8, 0i8));
assert!(can_upcast_to_decimal(&Int32, 11u8, 0i8));
assert!(can_upcast_to_decimal(&Int32, 11u8, 1i8));
assert!(can_upcast_to_decimal(&Int32, 14u8, 2i8));

assert!(can_upcast_to_decimal(&Int64, 20u8, 0i8));
assert!(can_upcast_to_decimal(&Int64, 21u8, 0i8));
assert!(can_upcast_to_decimal(&Int64, 21u8, 1i8));
assert!(can_upcast_to_decimal(&Int64, 24u8, 2i8));
}

#[test]
fn rejects_unsafe_decimal_casts() {
use super::can_upcast_to_decimal;
use ArrowDataType::*;

assert!(!can_upcast_to_decimal(&Decimal128(2, 0), 2u8, 1i8));
assert!(!can_upcast_to_decimal(&Decimal128(2, 0), 2u8, -1i8));
assert!(!can_upcast_to_decimal(&Decimal128(5, 2), 6u8, 4i8));
assert!(!can_upcast_to_decimal(&Decimal256(2, 0), 2u8, 1i8));
assert!(!can_upcast_to_decimal(&Decimal256(2, 0), 2u8, -1i8));
assert!(!can_upcast_to_decimal(&Decimal256(5, 2), 6u8, 4i8));

assert!(!can_upcast_to_decimal(&Int8, 2u8, 0i8));
assert!(!can_upcast_to_decimal(&Int8, 3u8, 1i8));
assert!(!can_upcast_to_decimal(&Int16, 4u8, 0i8));
assert!(!can_upcast_to_decimal(&Int16, 5u8, 1i8));
assert!(!can_upcast_to_decimal(&Int32, 9u8, 0i8));
assert!(!can_upcast_to_decimal(&Int32, 10u8, 1i8));
assert!(!can_upcast_to_decimal(&Int64, 19u8, 0i8));
assert!(!can_upcast_to_decimal(&Int64, 20u8, 1i8));
}
}
14 changes: 14 additions & 0 deletions kernel/src/features/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ pub enum ReaderFeatures {
#[strum(serialize = "timestampNtz")]
#[serde(rename = "timestampNtz")]
TimestampWithoutTimezone,
// Allow columns to change type
TypeWidening,
#[strum(serialize = "typeWidening-preview")]
#[serde(rename = "typeWidening-preview")]
TypeWideningPreview,
/// version 2 of checkpointing
V2Checkpoint,
}
Expand Down Expand Up @@ -74,6 +79,11 @@ pub enum WriterFeatures {
#[strum(serialize = "timestampNtz")]
#[serde(rename = "timestampNtz")]
TimestampWithoutTimezone,
// Allow columns to change type
TypeWidening,
#[strum(serialize = "typeWidening-preview")]
#[serde(rename = "typeWidening-preview")]
TypeWideningPreview,
/// domain specific metadata
DomainMetadata,
/// version 2 of checkpointing
Expand All @@ -94,6 +104,8 @@ mod tests {
(ReaderFeatures::ColumnMapping, "columnMapping"),
(ReaderFeatures::DeletionVectors, "deletionVectors"),
(ReaderFeatures::TimestampWithoutTimezone, "timestampNtz"),
(ReaderFeatures::TypeWidening, "typeWidening"),
(ReaderFeatures::TypeWideningPreview, "typeWidening-preview"),
(ReaderFeatures::V2Checkpoint, "v2Checkpoint"),
];

Expand Down Expand Up @@ -126,6 +138,8 @@ mod tests {
(WriterFeatures::DeletionVectors, "deletionVectors"),
(WriterFeatures::RowTracking, "rowTracking"),
(WriterFeatures::TimestampWithoutTimezone, "timestampNtz"),
(WriterFeatures::TypeWidening, "typeWidening"),
(WriterFeatures::TypeWideningPreview, "typeWidening-preview"),
(WriterFeatures::DomainMetadata, "domainMetadata"),
(WriterFeatures::V2Checkpoint, "v2Checkpoint"),
(WriterFeatures::IcebergCompatV1, "icebergCompatV1"),
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"commitInfo":{"timestamp":1726233749922,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"3694"},"engineInfo":"Apache-Spark/3.5.2 Delta-Lake/3.3.0-SNAPSHOT","txnId":"329e27de-485a-4c93-af9c-bb0ea3452823"}}
{"metaData":{"id":"e80ace91-dbef-4206-a821-e3f6c786f81c","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"byte_long\",\"type\":\"byte\",\"nullable\":true,\"metadata\":{}},{\"name\":\"int_long\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"float_double\",\"type\":\"float\",\"nullable\":true,\"metadata\":{}},{\"name\":\"byte_double\",\"type\":\"byte\",\"nullable\":true,\"metadata\":{}},{\"name\":\"short_double\",\"type\":\"short\",\"nullable\":true,\"metadata\":{}},{\"name\":\"int_double\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_decimal_same_scale\",\"type\":\"decimal(10,2)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_decimal_greater_scale\",\"type\":\"decimal(10,2)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"byte_decimal\",\"type\":\"byte\",\"nullable\":true,\"metadata\":{}},{\"name\":\"short_decimal\",\"type\":\"short\",\"nullable\":true,\"metadata\":{}},{\"name\":\"int_decimal\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"long_decimal\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date_timestamp_ntz\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1726233743254}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"add":{"path":"part-00000-f6dbc649-d5bc-42b1-984c-4376799a50d9-c000.snappy.parquet","partitionValues":{},"size":3694,"modificationTime":1726233749678,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"byte_long\":1,\"int_long\":2,\"float_double\":3.4,\"byte_double\":5,\"short_double\":6,\"int_double\":7,\"decimal_decimal_same_scale\":123.45,\"decimal_decimal_greater_scale\":67.89,\"byte_decimal\":1,\"short_decimal\":2,\"int_decimal\":3,\"long_decimal\":4,\"date_timestamp_ntz\":\"2024-09-09\"},\"maxValues\":{\"byte_long\":1,\"int_long\":2,\"float_double\":3.4,\"byte_double\":5,\"short_double\":6,\"int_double\":7,\"decimal_decimal_same_scale\":123.45,\"decimal_decimal_greater_scale\":67.89,\"byte_decimal\":1,\"short_decimal\":2,\"int_decimal\":3,\"long_decimal\":4,\"date_timestamp_ntz\":\"2024-09-09\"},\"nullCount\":{\"byte_long\":0,\"int_long\":0,\"float_double\":0,\"byte_double\":0,\"short_double\":0,\"int_double\":0,\"decimal_decimal_same_scale\":0,\"decimal_decimal_greater_scale\":0,\"byte_decimal\":0,\"short_decimal\":0,\"int_decimal\":0,\"long_decimal\":0,\"date_timestamp_ntz\":0}}"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"commitInfo":{"timestamp":1726233754010,"operation":"SET TBLPROPERTIES","operationParameters":{"properties":"{\"delta.enableTypeWidening\":\"true\",\"delta.feature.timestampntz\":\"supported\"}"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.2 Delta-Lake/3.3.0-SNAPSHOT","txnId":"d49bfb13-3563-4840-9641-4cb6d5685201"}}
{"metaData":{"id":"e80ace91-dbef-4206-a821-e3f6c786f81c","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"byte_long\",\"type\":\"byte\",\"nullable\":true,\"metadata\":{}},{\"name\":\"int_long\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"float_double\",\"type\":\"float\",\"nullable\":true,\"metadata\":{}},{\"name\":\"byte_double\",\"type\":\"byte\",\"nullable\":true,\"metadata\":{}},{\"name\":\"short_double\",\"type\":\"short\",\"nullable\":true,\"metadata\":{}},{\"name\":\"int_double\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_decimal_same_scale\",\"type\":\"decimal(10,2)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_decimal_greater_scale\",\"type\":\"decimal(10,2)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"byte_decimal\",\"type\":\"byte\",\"nullable\":true,\"metadata\":{}},{\"name\":\"short_decimal\",\"type\":\"short\",\"nullable\":true,\"metadata\":{}},{\"name\":\"int_decimal\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"long_decimal\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date_timestamp_ntz\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableTypeWidening":"true"},"createdTime":1726233743254}}
{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["timestampNtz","typeWidening-preview"],"writerFeatures":["timestampNtz","typeWidening-preview","appendOnly","invariants"]}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"commitInfo":{"timestamp":1726233756593,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":1,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"4001"},"engineInfo":"Apache-Spark/3.5.2 Delta-Lake/3.3.0-SNAPSHOT","txnId":"18e0f057-e98c-4cd1-bd2a-56f106701dff"}}
{"metaData":{"id":"e80ace91-dbef-4206-a821-e3f6c786f81c","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"byte_long\",\"type\":\"long\",\"nullable\":true,\"metadata\":{\"delta.typeChanges\":[{\"toType\":\"long\",\"fromType\":\"byte\",\"tableVersion\":2}]}},{\"name\":\"int_long\",\"type\":\"long\",\"nullable\":true,\"metadata\":{\"delta.typeChanges\":[{\"toType\":\"long\",\"fromType\":\"integer\",\"tableVersion\":2}]}},{\"name\":\"float_double\",\"type\":\"double\",\"nullable\":true,\"metadata\":{\"delta.typeChanges\":[{\"toType\":\"double\",\"fromType\":\"float\",\"tableVersion\":2}]}},{\"name\":\"byte_double\",\"type\":\"double\",\"nullable\":true,\"metadata\":{\"delta.typeChanges\":[{\"toType\":\"double\",\"fromType\":\"byte\",\"tableVersion\":2}]}},{\"name\":\"short_double\",\"type\":\"double\",\"nullable\":true,\"metadata\":{\"delta.typeChanges\":[{\"toType\":\"double\",\"fromType\":\"short\",\"tableVersion\":2}]}},{\"name\":\"int_double\",\"type\":\"double\",\"nullable\":true,\"metadata\":{\"delta.typeChanges\":[{\"toType\":\"double\",\"fromType\":\"integer\",\"tableVersion\":2}]}},{\"name\":\"decimal_decimal_same_scale\",\"type\":\"decimal(20,2)\",\"nullable\":true,\"metadata\":{\"delta.typeChanges\":[{\"toType\":\"decimal(20,2)\",\"fromType\":\"decimal(10,2)\",\"tableVersion\":2}]}},{\"name\":\"decimal_decimal_greater_scale\",\"type\":\"decimal(20,5)\",\"nullable\":true,\"metadata\":{\"delta.typeChanges\":[{\"toType\":\"decimal(20,5)\",\"fromType\":\"decimal(10,2)\",\"tableVersion\":2}]}},{\"name\":\"byte_decimal\",\"type\":\"decimal(4,1)\",\"nullable\":true,\"metadata\":{\"delta.typeChanges\":[{\"toType\":\"decimal(4,1)\",\"fromType\":\"byte\",\"tableVersion\":2}]}},{\"name\":\"short_decimal\",\"type\":\"decimal(6,1)\",\"nullable\":true,\"metadata\":{\"delta.typeChanges\":[{\"toType\":\"decimal(6,1)\",\"fromType\":\"short\",\"tableVersion\":2}]}},{\"name\":\"int_decimal\",\"type\":\"decimal(11,1)\",\"nullable\":true,\"metadata\":{\"delta.typeChanges\":[{\"toType\":\"decimal(11,1)\",\"fromType\":\"integer\",\"tableVersion\":2}]}},{\"name\":\"long_decimal\",\"type\":\"decimal(21,1)\",\"nullable\":true,\"metadata\":{\"delta.typeChanges\":[{\"toType\":\"decimal(21,1)\",\"fromType\":\"long\",\"tableVersion\":2}]}},{\"name\":\"date_timestamp_ntz\",\"type\":\"timestamp_ntz\",\"nullable\":true,\"metadata\":{\"delta.typeChanges\":[{\"toType\":\"timestamp_ntz\",\"fromType\":\"date\",\"tableVersion\":2}]}}]}","partitionColumns":[],"configuration":{"delta.enableTypeWidening":"true"},"createdTime":1726233743254}}
{"add":{"path":"part-00000-61accb66-b740-416b-9f5b-f0fccaceb415-c000.snappy.parquet","partitionValues":{},"size":4001,"modificationTime":1726233756528,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"byte_long\":9223372036854775807,\"int_long\":9223372036854775807,\"float_double\":1.234567890123,\"byte_double\":1.234567890123,\"short_double\":1.234567890123,\"int_double\":1.234567890123,\"decimal_decimal_same_scale\":12345678901234.56,\"decimal_decimal_greater_scale\":12345678901.23456,\"byte_decimal\":123.4,\"short_decimal\":12345.6,\"int_decimal\":1234567890.1,\"long_decimal\":123456789012345678.9,\"date_timestamp_ntz\":\"2024-09-09T12:34:56.123\"},\"maxValues\":{\"byte_long\":9223372036854775807,\"int_long\":9223372036854775807,\"float_double\":1.234567890123,\"byte_double\":1.234567890123,\"short_double\":1.234567890123,\"int_double\":1.234567890123,\"decimal_decimal_same_scale\":12345678901234.56,\"decimal_decimal_greater_scale\":12345678901.23456,\"byte_decimal\":123.4,\"short_decimal\":12345.6,\"int_decimal\":1234567890.1,\"long_decimal\":123456789012345678.9,\"date_timestamp_ntz\":\"2024-09-09T12:34:56.123\"},\"nullCount\":{\"byte_long\":0,\"int_long\":0,\"float_double\":0,\"byte_double\":0,\"short_double\":0,\"int_double\":0,\"decimal_decimal_same_scale\":0,\"decimal_decimal_greater_scale\":0,\"byte_decimal\":0,\"short_decimal\":0,\"int_decimal\":0,\"long_decimal\":0,\"date_timestamp_ntz\":0}}","defaultRowCommitVersion":2}}
Binary file not shown.
Binary file not shown.
44 changes: 44 additions & 0 deletions kernel/tests/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1040,3 +1040,47 @@ fn timestamp_ntz() -> Result<(), Box<dyn std::error::Error>> {
)?;
Ok(())
}

#[test]
fn type_widening_basic() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
"+---------------------+---------------------+--------------------+----------------+----------------+----------------+----------------------------+",
"| byte_long | int_long | float_double | byte_double | short_double | int_double | date_timestamp_ntz |",
"+---------------------+---------------------+--------------------+----------------+----------------+----------------+----------------------------+",
"| 1 | 2 | 3.4000000953674316 | 5.0 | 6.0 | 7.0 | 2024-09-09T00:00:00 |",
"| 9223372036854775807 | 9223372036854775807 | 1.234567890123 | 1.234567890123 | 1.234567890123 | 1.234567890123 | 2024-09-09T12:34:56.123456 |",
"+---------------------+---------------------+--------------------+----------------+----------------+----------------+----------------------------+",
];
let select_cols: Option<&[&str]> = Some(&[
"byte_long",
"int_long",
"float_double",
"byte_double",
"short_double",
"int_double",
"date_timestamp_ntz",
]);

read_table_data_str("./tests/data/type-widening/", select_cols, None, expected)
}

#[test]
fn type_widening_decimal() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
"+----------------------------+-------------------------------+--------------+---------------+--------------+----------------------+",
"| decimal_decimal_same_scale | decimal_decimal_greater_scale | byte_decimal | short_decimal | int_decimal | long_decimal |",
"+----------------------------+-------------------------------+--------------+---------------+--------------+----------------------+",
"| 123.45 | 67.89000 | 1.0 | 2.0 | 3.0 | 4.0 |",
"| 12345678901234.56 | 12345678901.23456 | 123.4 | 12345.6 | 1234567890.1 | 123456789012345678.9 |",
"+----------------------------+-------------------------------+--------------+---------------+--------------+----------------------+",
];
let select_cols: Option<&[&str]> = Some(&[
"decimal_decimal_same_scale",
"decimal_decimal_greater_scale",
"byte_decimal",
"short_decimal",
"int_decimal",
"long_decimal",
]);
read_table_data_str("./tests/data/type-widening/", select_cols, None, expected)
}

0 comments on commit ec8600e

Please sign in to comment.