-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Adds support to TimestampNTZ type in Delta
Previously this type was not supported in Spark and Spark 3.3 added support for this To prevent(gate) older writers/readers from reading to this column we need a protocol(feature) bump that does the gating * This PR creates a new TableFeature TimestampNTZ feature that is a ReaderWriter feature * This is how to feature is automatically enabled <google-sheets-html-origin><style type="text/css"><!--td {border: 1px solid #cccccc;}br {mso-data-placement:same-cell;}--></style> Scenario | Previously | With this change -- | -- | -- User creates a new table with timestamp NTZ column | AnalysisException saying type not supported | Protocol upgraded to feature vector protocol and TimestampNTZ Feature automatically enabled and DDL successful User adds a new column of type TimestampNTZ on legacy protocol version | AnalysisException saying type not supported | User DDL completes successful.(Protocol also upgraded automatically) User adds a new column of type TimestampNTZ on table with TimestampNTZFeature enabled on the table | AnalysisException saying type not supported | User DDL completes successful. Closes #1626 GitOrigin-RevId: d92b62895cf1cdc3dfaed9e97d2ef6e9378f98a3
- Loading branch information
1 parent
380425b
commit 6532535
Showing
7 changed files
with
199 additions
and
27 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
129 changes: 129 additions & 0 deletions
129
core/src/test/scala/org/apache/spark/sql/delta/DeltaTimestampNTZSuite.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
/* | ||
* Copyright (2021) The Delta Lake Project Authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.delta | ||
|
||
import java.sql.Timestamp | ||
import java.time.LocalDateTime | ||
|
||
import org.apache.spark.sql.delta.actions.Protocol | ||
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest | ||
|
||
import org.apache.spark.SparkThrowable | ||
import org.apache.spark.sql.{QueryTest, Row} | ||
import org.apache.spark.sql.catalyst.TableIdentifier | ||
import org.apache.spark.sql.test.SharedSparkSession | ||
import org.apache.spark.sql.types.StructType | ||
|
||
class DeltaTimestampNTZSuite extends QueryTest | ||
with SharedSparkSession with DeltaSQLCommandTest { | ||
|
||
private def getProtocolForTable(table: String): Protocol = { | ||
val deltaLog = DeltaLog.forTable(spark, TableIdentifier(table)) | ||
deltaLog.unsafeVolatileSnapshot.protocol | ||
} | ||
|
||
test("create a new table with TIMESTAMP_NTZ, higher protocol and feature should be picked.") { | ||
withTable("tbl") { | ||
sql("CREATE TABLE tbl(c1 STRING, c2 TIMESTAMP, c3 TIMESTAMP_NTZ) USING DELTA") | ||
sql( | ||
"""INSERT INTO tbl VALUES | ||
|('foo','2022-01-02 03:04:05.123456','2022-01-02 03:04:05.123456')""".stripMargin) | ||
assert(spark.table("tbl").head == Row( | ||
"foo", | ||
new Timestamp(2022 - 1900, 0, 2, 3, 4, 5, 123456000), | ||
LocalDateTime.of(2022, 1, 2, 3, 4, 5, 123456000))) | ||
assert(getProtocolForTable("tbl") == | ||
TimestampNTZTableFeature.minProtocolVersion.withFeature(TimestampNTZTableFeature) | ||
) | ||
} | ||
} | ||
|
||
test("creating a table without TIMESTAMP_NTZ should use the usual minimum protocol") { | ||
withTable("tbl") { | ||
sql("CREATE TABLE tbl(c1 STRING, c2 TIMESTAMP, c3 TIMESTAMP) USING DELTA") | ||
assert(getProtocolForTable("tbl") == Protocol(1, 2)) | ||
|
||
val deltaLog = DeltaLog.forTable(spark, TableIdentifier("tbl")) | ||
assert( | ||
!deltaLog.unsafeVolatileSnapshot.protocol.isFeatureSupported(TimestampNTZTableFeature), | ||
s"Table tbl contains TimestampNTZFeature descriptor when its not supposed to" | ||
) | ||
} | ||
} | ||
|
||
test("add a new column using TIMESTAMP_NTZ should upgrade to the correct protocol versions") { | ||
withTable("tbl") { | ||
sql("CREATE TABLE tbl(c1 STRING, c2 TIMESTAMP) USING delta") | ||
assert(getProtocolForTable("tbl") == Protocol(1, 2)) | ||
|
||
// Should throw error | ||
val e = intercept[SparkThrowable] { | ||
sql("ALTER TABLE tbl ADD COLUMN c3 TIMESTAMP_NTZ") | ||
} | ||
|
||
// add table feature | ||
sql(s"ALTER TABLE tbl " + | ||
s"SET TBLPROPERTIES('delta.feature.timestampNtz' = 'supported')") | ||
|
||
sql("ALTER TABLE tbl ADD COLUMN c3 TIMESTAMP_NTZ") | ||
|
||
|
||
sql( | ||
"""INSERT INTO tbl VALUES | ||
|('foo','2022-01-02 03:04:05.123456','2022-01-02 03:04:05.123456')""".stripMargin) | ||
assert(spark.table("tbl").head == Row( | ||
"foo", | ||
new Timestamp(2022 - 1900, 0, 2, 3, 4, 5, 123456000), | ||
LocalDateTime.of(2022, 1, 2, 3, 4, 5, 123456000))) | ||
|
||
assert(getProtocolForTable("tbl") == | ||
TimestampNTZTableFeature.minProtocolVersion | ||
.withFeature(TimestampNTZTableFeature) | ||
.withFeature(InvariantsTableFeature) | ||
.withFeature(AppendOnlyTableFeature) | ||
) | ||
} | ||
} | ||
|
||
test("use TIMESTAMP_NTZ in a partition column") { | ||
withTable("delta_test") { | ||
sql( | ||
"""CREATE TABLE delta_test(c1 STRING, c2 TIMESTAMP, c3 TIMESTAMP_NTZ) | ||
|USING delta | ||
|PARTITIONED BY (c3)""".stripMargin) | ||
sql( | ||
"""INSERT INTO delta_test VALUES | ||
|('foo','2022-01-02 03:04:05.123456','2022-01-02 03:04:05.123456')""".stripMargin) | ||
assert(spark.table("delta_test").head == Row( | ||
"foo", | ||
new Timestamp(2022 - 1900, 0, 2, 3, 4, 5, 123456000), | ||
LocalDateTime.of(2022, 1, 2, 3, 4, 5, 123456000))) | ||
assert(getProtocolForTable("delta_test") == | ||
TimestampNTZTableFeature.minProtocolVersion.withFeature(TimestampNTZTableFeature) | ||
) | ||
} | ||
} | ||
|
||
test("min/max stats collection should not apply on TIMESTAMP_NTZ") { | ||
withTable("delta_test") { | ||
sql("CREATE TABLE delta_test(c1 STRING, c2 TIMESTAMP, c3 TIMESTAMP_NTZ) USING delta") | ||
val statsSchema = DeltaLog.forTable(spark, TableIdentifier("delta_test")).snapshot.statsSchema | ||
assert(statsSchema("minValues").dataType == StructType.fromDDL("c1 STRING, c2 TIMESTAMP")) | ||
assert(statsSchema("maxValues").dataType == StructType.fromDDL("c1 STRING, c2 TIMESTAMP")) | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters