Skip to content

Commit

Permalink
Support reading tables with type widening in default engine (#335)
Browse files Browse the repository at this point in the history
Type widening allows changing the type of an existing column in a Delta
table to a wider type.
See
https://github.com/delta-io/delta/blob/master/protocol_rfcs/type-widening.md

This change improves the default engine to enable upcasting conversions
on read that are required to read tables that have a type change
applied: data files written before the widening type change will contain
data stored using a narrower type that must be upcast to the wider type
in the current table schema.

Changes:
 - Allow conversions:
   - int8 -> int16 -> int32 -> int64
   - float32 -> float64
   - date -> timestamp without timezone
   - integers -> float64
- integers and decimals -> decimals, as long as the target precision and
scale are sufficient to store all values without losing precision

This doesn't cover casting inside nested struct/array/map, which doesn't
seem to be properly supported yet.

Testing:
- Added tests reading a table with various type changes applied,
generated using delta spark 4.0 preview.
- Added unit tests for `can_upcast_to_decimal`, returning whether a
given type can be upcast to a decimal.
  • Loading branch information
johanl-db authored Sep 19, 2024
1 parent f6b8942 commit 1a66fb9
Show file tree
Hide file tree
Showing 13 changed files with 160 additions and 0 deletions.
92 changes: 92 additions & 0 deletions kernel/src/engine/arrow_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,54 @@ fn check_cast_compat(
target_type: ArrowDataType,
source_type: &ArrowDataType,
) -> DeltaResult<DataTypeCompat> {
use ArrowDataType::*;

match (source_type, &target_type) {
(source_type, target_type) if source_type == target_type => Ok(DataTypeCompat::Identical),
(&ArrowDataType::Timestamp(_, _), &ArrowDataType::Timestamp(_, _)) => {
// timestamps are able to be cast between each other
Ok(DataTypeCompat::NeedsCast(target_type))
}
// Allow up-casting to a larger type if it's safe and can't cause overflow or loss of precision.
(Int8, Int16 | Int32 | Int64 | Float64) => Ok(DataTypeCompat::NeedsCast(target_type)),
(Int16, Int32 | Int64 | Float64) => Ok(DataTypeCompat::NeedsCast(target_type)),
(Int32, Int64 | Float64) => Ok(DataTypeCompat::NeedsCast(target_type)),
(Float32, Float64) => Ok(DataTypeCompat::NeedsCast(target_type)),
(_, Decimal128(p, s)) if can_upcast_to_decimal(source_type, *p, *s) => {
Ok(DataTypeCompat::NeedsCast(target_type))
}
(Date32, Timestamp(_, None)) => Ok(DataTypeCompat::NeedsCast(target_type)),
_ => Err(make_arrow_error(format!(
"Incorrect datatype. Expected {}, got {}",
target_type, source_type
))),
}
}

// Returns whether the given source type can be safely cast to a decimal with the given precision and scale without
// loss of information.
fn can_upcast_to_decimal(
source_type: &ArrowDataType,
target_precision: u8,
target_scale: i8,
) -> bool {
use ArrowDataType::*;

let (source_precision, source_scale) = match source_type {
Decimal128(p, s) => (*p, *s),
// Allow converting integers to a decimal that can hold all possible values.
Int8 => (3u8, 0i8),
Int16 => (5u8, 0i8),
Int32 => (10u8, 0i8),
Int64 => (20u8, 0i8),
_ => return false,
};

target_precision >= source_precision
&& target_scale >= source_scale
&& target_precision - source_precision >= (target_scale - source_scale) as u8
}

/// Ensure a kernel data type matches an arrow data type. This only ensures that the actual "type"
/// is the same, but does so recursively into structs, and ensures lists and maps have the correct
/// associated types as well. This returns an `Ok(DataTypeCompat)` if the types are compatible, and
Expand Down Expand Up @@ -1427,4 +1462,61 @@ mod tests {
assert_eq!(mask_indices, expect_mask);
assert_eq!(reorder_indices, expect_reorder);
}

#[test]
fn accepts_safe_decimal_casts() {
use super::can_upcast_to_decimal;
use ArrowDataType::*;

assert!(can_upcast_to_decimal(&Decimal128(1, 0), 2u8, 0i8));
assert!(can_upcast_to_decimal(&Decimal128(1, 0), 2u8, 1i8));
assert!(can_upcast_to_decimal(&Decimal128(5, -2), 6u8, -2i8));
assert!(can_upcast_to_decimal(&Decimal128(5, -2), 6u8, -1i8));
assert!(can_upcast_to_decimal(&Decimal128(5, 1), 6u8, 1i8));
assert!(can_upcast_to_decimal(&Decimal128(5, 1), 6u8, 2i8));
assert!(can_upcast_to_decimal(
&Decimal128(10, 5),
arrow_schema::DECIMAL128_MAX_PRECISION,
arrow_schema::DECIMAL128_MAX_SCALE - 5
));

assert!(can_upcast_to_decimal(&Int8, 3u8, 0i8));
assert!(can_upcast_to_decimal(&Int8, 4u8, 0i8));
assert!(can_upcast_to_decimal(&Int8, 4u8, 1i8));
assert!(can_upcast_to_decimal(&Int8, 7u8, 2i8));

assert!(can_upcast_to_decimal(&Int16, 5u8, 0i8));
assert!(can_upcast_to_decimal(&Int16, 6u8, 0i8));
assert!(can_upcast_to_decimal(&Int16, 6u8, 1i8));
assert!(can_upcast_to_decimal(&Int16, 9u8, 2i8));

assert!(can_upcast_to_decimal(&Int32, 10u8, 0i8));
assert!(can_upcast_to_decimal(&Int32, 11u8, 0i8));
assert!(can_upcast_to_decimal(&Int32, 11u8, 1i8));
assert!(can_upcast_to_decimal(&Int32, 14u8, 2i8));

assert!(can_upcast_to_decimal(&Int64, 20u8, 0i8));
assert!(can_upcast_to_decimal(&Int64, 21u8, 0i8));
assert!(can_upcast_to_decimal(&Int64, 21u8, 1i8));
assert!(can_upcast_to_decimal(&Int64, 24u8, 2i8));
}

#[test]
fn rejects_unsafe_decimal_casts() {
use super::can_upcast_to_decimal;
use ArrowDataType::*;

assert!(!can_upcast_to_decimal(&Decimal128(2, 0), 2u8, 1i8));
assert!(!can_upcast_to_decimal(&Decimal128(2, 0), 2u8, -1i8));
assert!(!can_upcast_to_decimal(&Decimal128(5, 2), 6u8, 4i8));

assert!(!can_upcast_to_decimal(&Int8, 2u8, 0i8));
assert!(!can_upcast_to_decimal(&Int8, 3u8, 1i8));
assert!(!can_upcast_to_decimal(&Int16, 4u8, 0i8));
assert!(!can_upcast_to_decimal(&Int16, 5u8, 1i8));
assert!(!can_upcast_to_decimal(&Int32, 9u8, 0i8));
assert!(!can_upcast_to_decimal(&Int32, 10u8, 1i8));
assert!(!can_upcast_to_decimal(&Int64, 19u8, 0i8));
assert!(!can_upcast_to_decimal(&Int64, 20u8, 1i8));
}
}
14 changes: 14 additions & 0 deletions kernel/src/features/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ pub enum ReaderFeatures {
#[strum(serialize = "timestampNtz")]
#[serde(rename = "timestampNtz")]
TimestampWithoutTimezone,
// Allow columns to change type
TypeWidening,
#[strum(serialize = "typeWidening-preview")]
#[serde(rename = "typeWidening-preview")]
TypeWideningPreview,
/// version 2 of checkpointing
V2Checkpoint,
}
Expand Down Expand Up @@ -74,6 +79,11 @@ pub enum WriterFeatures {
#[strum(serialize = "timestampNtz")]
#[serde(rename = "timestampNtz")]
TimestampWithoutTimezone,
// Allow columns to change type
TypeWidening,
#[strum(serialize = "typeWidening-preview")]
#[serde(rename = "typeWidening-preview")]
TypeWideningPreview,
/// domain specific metadata
DomainMetadata,
/// version 2 of checkpointing
Expand All @@ -94,6 +104,8 @@ mod tests {
(ReaderFeatures::ColumnMapping, "columnMapping"),
(ReaderFeatures::DeletionVectors, "deletionVectors"),
(ReaderFeatures::TimestampWithoutTimezone, "timestampNtz"),
(ReaderFeatures::TypeWidening, "typeWidening"),
(ReaderFeatures::TypeWideningPreview, "typeWidening-preview"),
(ReaderFeatures::V2Checkpoint, "v2Checkpoint"),
];

Expand Down Expand Up @@ -126,6 +138,8 @@ mod tests {
(WriterFeatures::DeletionVectors, "deletionVectors"),
(WriterFeatures::RowTracking, "rowTracking"),
(WriterFeatures::TimestampWithoutTimezone, "timestampNtz"),
(WriterFeatures::TypeWidening, "typeWidening"),
(WriterFeatures::TypeWideningPreview, "typeWidening-preview"),
(WriterFeatures::DomainMetadata, "domainMetadata"),
(WriterFeatures::V2Checkpoint, "v2Checkpoint"),
(WriterFeatures::IcebergCompatV1, "icebergCompatV1"),
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"commitInfo":{"timestamp":1726233749922,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"3694"},"engineInfo":"Apache-Spark/3.5.2 Delta-Lake/3.3.0-SNAPSHOT","txnId":"329e27de-485a-4c93-af9c-bb0ea3452823"}}
{"metaData":{"id":"e80ace91-dbef-4206-a821-e3f6c786f81c","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"byte_long\",\"type\":\"byte\",\"nullable\":true,\"metadata\":{}},{\"name\":\"int_long\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"float_double\",\"type\":\"float\",\"nullable\":true,\"metadata\":{}},{\"name\":\"byte_double\",\"type\":\"byte\",\"nullable\":true,\"metadata\":{}},{\"name\":\"short_double\",\"type\":\"short\",\"nullable\":true,\"metadata\":{}},{\"name\":\"int_double\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_decimal_same_scale\",\"type\":\"decimal(10,2)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_decimal_greater_scale\",\"type\":\"decimal(10,2)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"byte_decimal\",\"type\":\"byte\",\"nullable\":true,\"metadata\":{}},{\"name\":\"short_decimal\",\"type\":\"short\",\"nullable\":true,\"metadata\":{}},{\"name\":\"int_decimal\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"long_decimal\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date_timestamp_ntz\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1726233743254}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"add":{"path":"part-00000-f6dbc649-d5bc-42b1-984c-4376799a50d9-c000.snappy.parquet","partitionValues":{},"size":3694,"modificationTime":1726233749678,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"byte_long\":1,\"int_long\":2,\"float_double\":3.4,\"byte_double\":5,\"short_double\":6,\"int_double\":7,\"decimal_decimal_same_scale\":123.45,\"decimal_decimal_greater_scale\":67.89,\"byte_decimal\":1,\"short_decimal\":2,\"int_decimal\":3,\"long_decimal\":4,\"date_timestamp_ntz\":\"2024-09-09\"},\"maxValues\":{\"byte_long\":1,\"int_long\":2,\"float_double\":3.4,\"byte_double\":5,\"short_double\":6,\"int_double\":7,\"decimal_decimal_same_scale\":123.45,\"decimal_decimal_greater_scale\":67.89,\"byte_decimal\":1,\"short_decimal\":2,\"int_decimal\":3,\"long_decimal\":4,\"date_timestamp_ntz\":\"2024-09-09\"},\"nullCount\":{\"byte_long\":0,\"int_long\":0,\"float_double\":0,\"byte_double\":0,\"short_double\":0,\"int_double\":0,\"decimal_decimal_same_scale\":0,\"decimal_decimal_greater_scale\":0,\"byte_decimal\":0,\"short_decimal\":0,\"int_decimal\":0,\"long_decimal\":0,\"date_timestamp_ntz\":0}}"}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"commitInfo":{"timestamp":1726233754010,"operation":"SET TBLPROPERTIES","operationParameters":{"properties":"{\"delta.enableTypeWidening\":\"true\",\"delta.feature.timestampntz\":\"supported\"}"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.5.2 Delta-Lake/3.3.0-SNAPSHOT","txnId":"d49bfb13-3563-4840-9641-4cb6d5685201"}}
{"metaData":{"id":"e80ace91-dbef-4206-a821-e3f6c786f81c","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"byte_long\",\"type\":\"byte\",\"nullable\":true,\"metadata\":{}},{\"name\":\"int_long\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"float_double\",\"type\":\"float\",\"nullable\":true,\"metadata\":{}},{\"name\":\"byte_double\",\"type\":\"byte\",\"nullable\":true,\"metadata\":{}},{\"name\":\"short_double\",\"type\":\"short\",\"nullable\":true,\"metadata\":{}},{\"name\":\"int_double\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_decimal_same_scale\",\"type\":\"decimal(10,2)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"decimal_decimal_greater_scale\",\"type\":\"decimal(10,2)\",\"nullable\":true,\"metadata\":{}},{\"name\":\"byte_decimal\",\"type\":\"byte\",\"nullable\":true,\"metadata\":{}},{\"name\":\"short_decimal\",\"type\":\"short\",\"nullable\":true,\"metadata\":{}},{\"name\":\"int_decimal\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"long_decimal\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date_timestamp_ntz\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableTypeWidening":"true"},"createdTime":1726233743254}}
{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["timestampNtz","typeWidening-preview"],"writerFeatures":["timestampNtz","typeWidening-preview","appendOnly","invariants"]}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"commitInfo":{"timestamp":1726233756593,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":1,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"4001"},"engineInfo":"Apache-Spark/3.5.2 Delta-Lake/3.3.0-SNAPSHOT","txnId":"18e0f057-e98c-4cd1-bd2a-56f106701dff"}}
{"metaData":{"id":"e80ace91-dbef-4206-a821-e3f6c786f81c","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"byte_long\",\"type\":\"long\",\"nullable\":true,\"metadata\":{\"delta.typeChanges\":[{\"toType\":\"long\",\"fromType\":\"byte\",\"tableVersion\":2}]}},{\"name\":\"int_long\",\"type\":\"long\",\"nullable\":true,\"metadata\":{\"delta.typeChanges\":[{\"toType\":\"long\",\"fromType\":\"integer\",\"tableVersion\":2}]}},{\"name\":\"float_double\",\"type\":\"double\",\"nullable\":true,\"metadata\":{\"delta.typeChanges\":[{\"toType\":\"double\",\"fromType\":\"float\",\"tableVersion\":2}]}},{\"name\":\"byte_double\",\"type\":\"double\",\"nullable\":true,\"metadata\":{\"delta.typeChanges\":[{\"toType\":\"double\",\"fromType\":\"byte\",\"tableVersion\":2}]}},{\"name\":\"short_double\",\"type\":\"double\",\"nullable\":true,\"metadata\":{\"delta.typeChanges\":[{\"toType\":\"double\",\"fromType\":\"short\",\"tableVersion\":2}]}},{\"name\":\"int_double\",\"type\":\"double\",\"nullable\":true,\"metadata\":{\"delta.typeChanges\":[{\"toType\":\"double\",\"fromType\":\"integer\",\"tableVersion\":2}]}},{\"name\":\"decimal_decimal_same_scale\",\"type\":\"decimal(20,2)\",\"nullable\":true,\"metadata\":{\"delta.typeChanges\":[{\"toType\":\"decimal(20,2)\",\"fromType\":\"decimal(10,2)\",\"tableVersion\":2}]}},{\"name\":\"decimal_decimal_greater_scale\",\"type\":\"decimal(20,5)\",\"nullable\":true,\"metadata\":{\"delta.typeChanges\":[{\"toType\":\"decimal(20,5)\",\"fromType\":\"decimal(10,2)\",\"tableVersion\":2}]}},{\"name\":\"byte_decimal\",\"type\":\"decimal(4,1)\",\"nullable\":true,\"metadata\":{\"delta.typeChanges\":[{\"toType\":\"decimal(4,1)\",\"fromType\":\"byte\",\"tableVersion\":2}]}},{\"name\":\"short_decimal\",\"type\":\"decimal(6,1)\",\"nullable\":true,\"metadata\":{\"delta.typeChanges\":[{\"toType\":\"decimal(6,1)\",\"fromType\":\"short\",\"tableVersion\":2}]}},{\"name\":\"int_decimal\",\"type\":\"decimal(11,1)\",\"nullable\":true,\"metadata\":{\"delta.typeChanges\":[{\"toType\":\"decimal(11,1)\",\"fromType\":\"integer\",\"tableVersion\":2}]}},{\"name\":\"long_decimal\",\"type\":\"decimal(21,1)\",\"nullable\":true,\"metadata\":{\"delta.typeChanges\":[{\"toType\":\"decimal(21,1)\",\"fromType\":\"long\",\"tableVersion\":2}]}},{\"name\":\"date_timestamp_ntz\",\"type\":\"timestamp_ntz\",\"nullable\":true,\"metadata\":{\"delta.typeChanges\":[{\"toType\":\"timestamp_ntz\",\"fromType\":\"date\",\"tableVersion\":2}]}}]}","partitionColumns":[],"configuration":{"delta.enableTypeWidening":"true"},"createdTime":1726233743254}}
{"add":{"path":"part-00000-61accb66-b740-416b-9f5b-f0fccaceb415-c000.snappy.parquet","partitionValues":{},"size":4001,"modificationTime":1726233756528,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"byte_long\":9223372036854775807,\"int_long\":9223372036854775807,\"float_double\":1.234567890123,\"byte_double\":1.234567890123,\"short_double\":1.234567890123,\"int_double\":1.234567890123,\"decimal_decimal_same_scale\":12345678901234.56,\"decimal_decimal_greater_scale\":12345678901.23456,\"byte_decimal\":123.4,\"short_decimal\":12345.6,\"int_decimal\":1234567890.1,\"long_decimal\":123456789012345678.9,\"date_timestamp_ntz\":\"2024-09-09T12:34:56.123\"},\"maxValues\":{\"byte_long\":9223372036854775807,\"int_long\":9223372036854775807,\"float_double\":1.234567890123,\"byte_double\":1.234567890123,\"short_double\":1.234567890123,\"int_double\":1.234567890123,\"decimal_decimal_same_scale\":12345678901234.56,\"decimal_decimal_greater_scale\":12345678901.23456,\"byte_decimal\":123.4,\"short_decimal\":12345.6,\"int_decimal\":1234567890.1,\"long_decimal\":123456789012345678.9,\"date_timestamp_ntz\":\"2024-09-09T12:34:56.123\"},\"nullCount\":{\"byte_long\":0,\"int_long\":0,\"float_double\":0,\"byte_double\":0,\"short_double\":0,\"int_double\":0,\"decimal_decimal_same_scale\":0,\"decimal_decimal_greater_scale\":0,\"byte_decimal\":0,\"short_decimal\":0,\"int_decimal\":0,\"long_decimal\":0,\"date_timestamp_ntz\":0}}","defaultRowCommitVersion":2}}
Binary file not shown.
Binary file not shown.
44 changes: 44 additions & 0 deletions kernel/tests/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1040,3 +1040,47 @@ fn timestamp_ntz() -> Result<(), Box<dyn std::error::Error>> {
)?;
Ok(())
}

#[test]
fn type_widening_basic() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
"+---------------------+---------------------+--------------------+----------------+----------------+----------------+----------------------------+",
"| byte_long | int_long | float_double | byte_double | short_double | int_double | date_timestamp_ntz |",
"+---------------------+---------------------+--------------------+----------------+----------------+----------------+----------------------------+",
"| 1 | 2 | 3.4000000953674316 | 5.0 | 6.0 | 7.0 | 2024-09-09T00:00:00 |",
"| 9223372036854775807 | 9223372036854775807 | 1.234567890123 | 1.234567890123 | 1.234567890123 | 1.234567890123 | 2024-09-09T12:34:56.123456 |",
"+---------------------+---------------------+--------------------+----------------+----------------+----------------+----------------------------+",
];
let select_cols: Option<&[&str]> = Some(&[
"byte_long",
"int_long",
"float_double",
"byte_double",
"short_double",
"int_double",
"date_timestamp_ntz",
]);

read_table_data_str("./tests/data/type-widening/", select_cols, None, expected)
}

#[test]
fn type_widening_decimal() -> Result<(), Box<dyn std::error::Error>> {
let expected = vec![
"+----------------------------+-------------------------------+--------------+---------------+--------------+----------------------+",
"| decimal_decimal_same_scale | decimal_decimal_greater_scale | byte_decimal | short_decimal | int_decimal | long_decimal |",
"+----------------------------+-------------------------------+--------------+---------------+--------------+----------------------+",
"| 123.45 | 67.89000 | 1.0 | 2.0 | 3.0 | 4.0 |",
"| 12345678901234.56 | 12345678901.23456 | 123.4 | 12345.6 | 1234567890.1 | 123456789012345678.9 |",
"+----------------------------+-------------------------------+--------------+---------------+--------------+----------------------+",
];
let select_cols: Option<&[&str]> = Some(&[
"decimal_decimal_same_scale",
"decimal_decimal_greater_scale",
"byte_decimal",
"short_decimal",
"int_decimal",
"long_decimal",
]);
read_table_data_str("./tests/data/type-widening/", select_cols, None, expected)
}

0 comments on commit 1a66fb9

Please sign in to comment.