From 491fcd5ebed858d3f4e2b4bd63c56709d3833ab3 Mon Sep 17 00:00:00 2001 From: jackierwzhang Date: Thu, 27 Oct 2022 09:42:22 -0700 Subject: [PATCH 1/9] commit --- build.sbt | 15 + .../resources/error/delta-error-classes.json | 6 + .../apache/spark/sql/delta/DeltaErrors.scala | 5 + .../commands/ConvertToDeltaCommand.scala | 85 +- .../sql/delta/sources/DeltaSQLConf.scala | 13 + .../transforms/IcebergPartitionUtil.scala | 175 +++ .../analysis/NoSuchProcedureException.scala | 23 + .../spark/sql/delta/IcebergSchemaUtils.scala | 53 + .../apache/spark/sql/delta/IcebergTable.scala | 266 ++++ ...pache.spark.sql.sources.DataSourceRegister | 1 + .../delta/ConvertIcebergToDeltaSuite.scala | 1358 +++++++++++++++++ 11 files changed, 1980 insertions(+), 20 deletions(-) create mode 100644 delta-iceberg-compat/src/main/scala/org/apache/iceberg/transforms/IcebergPartitionUtil.scala create mode 100644 delta-iceberg-compat/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchProcedureException.scala create mode 100644 delta-iceberg-compat/src/main/scala/org/apache/spark/sql/delta/IcebergSchemaUtils.scala create mode 100644 delta-iceberg-compat/src/main/scala/org/apache/spark/sql/delta/IcebergTable.scala create mode 100644 delta-iceberg-compat/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister create mode 100644 delta-iceberg-compat/src/test/scala/org/apache/spark/sql/delta/ConvertIcebergToDeltaSuite.scala diff --git a/build.sbt b/build.sbt index bc6d401bb3f..d96f17df87e 100644 --- a/build.sbt +++ b/build.sbt @@ -194,6 +194,21 @@ lazy val storageS3DynamoDB = (project in file("storage-s3-dynamodb")) ) ) +lazy val deltaIcebergCompat = (project in file("delta-iceberg-compat")) + .dependsOn(core % "compile->compile;test->test;provided->provided") + .settings ( + name := "delta-iceberg-compat", + commonSettings, + scalaStyleSettings, + releaseSettings, + libraryDependencies ++= Seq( { + val (expMaj, expMin, _) = getMajorMinorPatch(sparkVersion) + ("org.apache.iceberg" % s"iceberg-spark-runtime-$expMaj.$expMin" % "1.0.0" % "provided") + .cross(CrossVersion.binary) + } + ) + ) + /** * Get list of python files and return the mapping between source files and target paths * in the generated package JAR. diff --git a/core/src/main/resources/error/delta-error-classes.json b/core/src/main/resources/error/delta-error-classes.json index d2e4963a4d1..27fcf087e16 100644 --- a/core/src/main/resources/error/delta-error-classes.json +++ b/core/src/main/resources/error/delta-error-classes.json @@ -1202,6 +1202,12 @@ ], "sqlState" : "42000" }, + "DELTA_PARTITION_SCHEMA_IN_ICEBERG_TABLES" : { + "message" : [ + "Partition schema cannot be specified when converting Iceberg tables" + ], + "sqlState" : "42000" + }, "DELTA_PATH_DOES_NOT_EXIST" : { "message" : [ " doesn't exist" diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index 60448634ad1..c560beca557 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -2452,6 +2452,11 @@ trait DeltaErrorsBase unsupportedOptions.mkString(",")) ) } + + def partitionSchemaInIcebergTables: Throwable = { + new DeltaIllegalArgumentException( + errorClass = "DELTA_PARTITION_SCHEMA_IN_ICEBERG_TABLES", messageParameters = null) + } } object DeltaErrors extends DeltaErrorsBase diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala index ff4207d3ad9..6f19746e227 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.delta.commands // scalastyle:off import.ordering.noEmptyLine import java.io.Closeable +import java.lang.reflect.InvocationTargetException import java.util.Locale import scala.collection.JavaConverters._ @@ -29,7 +30,6 @@ import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema.SchemaMergingUtils import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf} import org.apache.spark.sql.delta.util._ -import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession} @@ -37,14 +37,14 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{Analyzer, NoSuchTableException} import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, SessionCatalog} import org.apache.spark.sql.catalyst.expressions.Cast -import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog, V1Table} +import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog, V1Table} import org.apache.spark.sql.execution.command.LeafRunnableCommand import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetToSparkSchemaConverter} import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{StringType, StructType} -import org.apache.spark.util.SerializableConfiguration +import org.apache.spark.util.{SerializableConfiguration, Utils} /** * Convert an existing parquet table to a delta table by creating delta logs based on @@ -72,8 +72,11 @@ abstract class ConvertToDeltaCommandBase( partitionSchema: Option[StructType], deltaPath: Option[String]) extends LeafRunnableCommand with DeltaCommand { + protected lazy val icebergEnabled: Boolean = + conf.getConf(DeltaSQLConf.DELTA_CONVERT_ICEBERG_ENABLED) + protected def isSupportedProvider(lowerCaseProvider: String): Boolean = { - lowerCaseProvider == "parquet" + lowerCaseProvider == "parquet" || (icebergEnabled && lowerCaseProvider == "iceberg") } override def run(spark: SparkSession): Seq[Row] = { @@ -280,19 +283,22 @@ abstract class ConvertToDeltaCommandBase( } /** Get the instance of the convert target table, which provides file manifest and schema */ - protected def getTargetTable( - spark: SparkSession, - target: ConvertTarget): ConvertTargetTable = { - val qualifiedDir = - ConvertToDeltaCommand.getQualifiedPath(spark, new Path(target.targetDir)).toString + protected def getTargetTable(spark: SparkSession, target: ConvertTarget): ConvertTargetTable = { target.provider match { case Some(providerName) => providerName.toLowerCase(Locale.ROOT) match { - case _ if target.catalogTable.exists(ConvertToDeltaCommand.isHiveStyleParquetTable) => - new ParquetTable(spark, qualifiedDir, target.catalogTable, partitionSchema) - case checkProvider if checkProvider.equalsIgnoreCase("parquet") => - new ParquetTable(spark, qualifiedDir, target.catalogTable, partitionSchema) - case checkProvider => - throw DeltaErrors.convertNonParquetTablesException(tableIdentifier, checkProvider) + case checkProvider + if target.catalogTable.exists(ConvertToDeltaCommand.isHiveStyleParquetTable) || + checkProvider.equalsIgnoreCase("parquet") => + ConvertToDeltaCommand.getParquetTable( + spark, target.targetDir, target.catalogTable, partitionSchema) + case checkProvider + if icebergEnabled && checkProvider.equalsIgnoreCase("iceberg") => + if (partitionSchema.isDefined) { + throw DeltaErrors.partitionSchemaInIcebergTables + } + ConvertToDeltaCommand.getIcebergTable(spark, target.targetDir, None, None) + case other => + throw DeltaErrors.convertNonParquetTablesException(tableIdentifier, other) } case None => throw DeltaErrors.missingProviderForConvertException(target.targetDir) @@ -442,10 +448,10 @@ trait ConvertTargetTable { } class ParquetTable( - spark: SparkSession, - basePath: String, - catalogTable: Option[CatalogTable], - userPartitionSchema: Option[StructType]) extends ConvertTargetTable with DeltaLogging { + val spark: SparkSession, + val basePath: String, + val catalogTable: Option[CatalogTable], + val userPartitionSchema: Option[StructType]) extends ConvertTargetTable with DeltaLogging { // Validate user provided partition schema if catalogTable is available. if (catalogTable.isDefined && userPartitionSchema.isDefined && !catalogTable.get.partitionSchema.equals(userPartitionSchema.get)) { @@ -734,7 +740,7 @@ class MetadataLogFileManifest( override def close(): Unit = allFiles.unpersist() } -object ConvertToDeltaCommand { +trait ConvertToDeltaCommandUtils extends DeltaLogging { val timestampPartitionPattern = "yyyy-MM-dd HH:mm:ss[.S]" def createAddFile( targetFile: ConvertTargetFile, @@ -841,4 +847,43 @@ object ConvertToDeltaCommand { // Allow partition column name starting with underscore and dot DeltaFileOperations.defaultHiddenFileFilter(fileName) && !fileName.contains("=") } + + def getParquetTable( + spark: SparkSession, + targetDir: String, + catalogTable: Option[CatalogTable], + partitionSchema: Option[StructType]): ConvertTargetTable = { + val qualifiedDir = ConvertToDeltaCommand.getQualifiedPath(spark, new Path(targetDir)).toString + new ParquetTable(spark, qualifiedDir, catalogTable, partitionSchema) + } + + def getIcebergTable( + spark: SparkSession, + targetDir: String, + sparkTable: Option[Table], + tableSchema: Option[StructType]): ConvertTargetTable = { + try { + val clazz = Utils.classForName("org.apache.spark.sql.delta.IcebergTable") + if (sparkTable.isDefined) { + val constFromTable = clazz.getConstructor( + classOf[SparkSession], + Utils.classForName("org.apache.iceberg.Table"), + classOf[Option[StructType]]) + val method = sparkTable.get.getClass.getMethod("table") + constFromTable.newInstance(spark, method.invoke(sparkTable.get), tableSchema) + } else { + val baseDir = ConvertToDeltaCommand.getQualifiedPath(spark, new Path(targetDir)).toString + val constFromPath = clazz.getConstructor( + classOf[SparkSession], classOf[String], classOf[Option[StructType]]) + constFromPath.newInstance(spark, baseDir, tableSchema) + } + } catch { + case e: InvocationTargetException => + logError(s"Got error when creating an Iceberg Converter", e) + // Unwrap better error messages + throw e.getCause + } + } } + +object ConvertToDeltaCommand extends ConvertToDeltaCommandUtils diff --git a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 6c77404d601..0419b4d6abd 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -741,6 +741,19 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(true) + val DELTA_CONVERT_ICEBERG_ENABLED = + buildConf("convert.iceberg.enabled") + .internal() + .doc("If enabled, Iceberg tables can be converted into a Delta table.") + .booleanConf + .createWithDefault(true) + + val DELTA_CONVERT_ICEBERG_PARTITION_EVOLUTION_ENABLED = + buildConf("convert.iceberg.partitionEvolution.enabled") + .doc("If enabled, support conversion of iceberg tables experienced partition evolution.") + .booleanConf + .createWithDefault(false) + val DELTA_OPTIMIZE_MIN_FILE_SIZE = buildConf("optimize.minFileSize") .internal() diff --git a/delta-iceberg-compat/src/main/scala/org/apache/iceberg/transforms/IcebergPartitionUtil.scala b/delta-iceberg-compat/src/main/scala/org/apache/iceberg/transforms/IcebergPartitionUtil.scala new file mode 100644 index 00000000000..4e9ee9b3832 --- /dev/null +++ b/delta-iceberg-compat/src/main/scala/org/apache/iceberg/transforms/IcebergPartitionUtil.scala @@ -0,0 +1,175 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.transforms + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.delta.DeltaColumnMapping +import org.apache.spark.sql.delta.sources.DeltaSourceUtils.GENERATION_EXPRESSION_METADATA_KEY +import org.apache.spark.sql.delta.util.{DateFormatter, TimestampFormatter} +import org.apache.iceberg.{PartitionField, PartitionSpec, Schema, StructLike} +import org.apache.iceberg.spark.SparkSchemaUtil +import org.apache.iceberg.types.Type.TypeID + +import org.apache.spark.sql.types.{DateType, IntegerType, MetadataBuilder, StringType, StructField} + +/** + * Utils to translate Iceberg's partition expressions to Delta generated column expressions. + */ +object IcebergPartitionUtil { + + // scalastyle:off line.size.limit + /** + * Convert the partition values stored in Iceberg metadata to string values, which we will + * directly use in the partitionValues field of AddFiles. Here is how we generate the string + * value from the Iceberg stored partition value for each of the transforms: + * + * Identity + * - Iceberg source code: https://github.com/apache/iceberg/blob/4c98a0f6408d4ccd0d47b076b2f7743d836d28ec/api/src/main/java/org/apache/iceberg/transforms/Identity.java + * - Source column type: any + * - Stored partition value type: same as source type + * - String value generation: for timestamp and date, use our Spark formatter; other types use toString + * + * Timestamps (year, month, day, hour) + * - Iceberg source code: https://github.com/apache/iceberg/blob/4c98a0f6408d4ccd0d47b076b2f7743d836d28ec/api/src/main/java/org/apache/iceberg/transforms/Timestamps.java + * - Source column type: timestamp + * - Stored partition value type: integer + * - String value generation: use Iceberg's Timestamps.toHumanString (which uses yyyy-MM-dd-HH format) + * + * Dates (year, month, day) + * - Iceberg source code: https://github.com/apache/iceberg/blob/4c98a0f6408d4ccd0d47b076b2f7743d836d28ec/api/src/main/java/org/apache/iceberg/transforms/Dates.java + * - Source column type: date + * - Stored partition value type: integer + * - String value generation: use Iceberg's Dates.toHumanString (which uses yyyy-MM-dd format) + * + * Truncate + * - Iceberg source code: https://github.com/apache/iceberg/blob/4c98a0f6408d4ccd0d47b076b2f7743d836d28ec/api/src/main/java/org/apache/iceberg/transforms/Truncate.java + * - Source column type: string + * - Stored partition value type: string + * - String value generation: directly use toString + */ + // scalastyle:on line.size.limit + def partitionValueToString( + partField: PartitionField, + partValue: Object, + schema: Schema, + dateFormatter: DateFormatter, + timestampFormatter: TimestampFormatter): String = { + if (partValue == null) return null + partField.transform() match { + case _: Identity[_] => + // Identity transform + // We use our own date and timestamp formatter for date and timestamp types, while simply + // use toString for other input types. + val sourceField = schema.findField(partField.sourceId()) + val sourceType = sourceField.`type`() + if (sourceType.typeId() == TypeID.DATE) { + // convert epoch days to Spark date formatted string + dateFormatter.format(partValue.asInstanceOf[Int]) + } else if (sourceType.typeId == TypeID.TIMESTAMP) { + // convert timestamps to Spark timestamp formatted string + timestampFormatter.format(partValue.asInstanceOf[Long]) + } else { + // all other types can directly toString + partValue.toString + } + case ts: Timestamps => + // Matches all transforms on Timestamp input type: YEAR, MONTH, DAY, HOUR + // We directly use Iceberg's toHumanString(), which takes a timestamp type source column and + // generates the partition value in the string format as follows: + // - YEAR: yyyy + // - MONTH: yyyy-MM + // - DAY: yyyy-MM-dd + // - HOUR: yyyy-MM-dd-HH + ts.toHumanString(partValue.asInstanceOf[Int]) + case dt: Dates => + // Matches all transform on Date input type: YEAR, MONTH, DAY + // We directly use Iceberg's toHumanString(), which takes a date type source column and + // generates the partition value in the string format as follows: + // - YEAR: yyyy + // - MONTH: yyyy-MM + // - DAY: yyyy-MM-dd + dt.toHumanString(partValue.asInstanceOf[Int]) + case _: Truncate[_] => + // Truncate transform + // While Iceberg Truncate transform supports multiple input types, our converter + // only supports string and block all other input types. So simply toString suffices. + partValue.toString + case other => + throw new UnsupportedOperationException( + s"unsupported partition transform expression when converting to Delta: $other") + } + } + + def getPartitionFields(partSpec: PartitionSpec, schema: Schema): Seq[StructField] = { + // Skip removed partition fields due to partition evolution. + partSpec.fields.asScala.toSeq.collect { + case partField if !partField.transform().isInstanceOf[VoidTransform[_]] => + val sourceColumnName = schema.findColumnName(partField.sourceId()) + val sourceField = schema.findField(partField.sourceId()) + val sourceType = sourceField.`type`() + + val metadataBuilder = new MetadataBuilder() + + val (transformExpr, targetType) = partField.transform() match { + // binary partition values are problematic in Delta, so we block converting if the iceberg + // table has a binary type partition column + case _: Identity[_] if sourceType.typeId() != TypeID.BINARY => + // copy id only for identity transform because source id will be the converted column id + // ids for other columns will be assigned later automatically during schema evolution + metadataBuilder + .putLong(DeltaColumnMapping.COLUMN_MAPPING_METADATA_ID_KEY, sourceField.fieldId()) + ("", SparkSchemaUtil.convert(sourceType)) + + case Timestamps.YEAR | Dates.YEAR => + (s"year($sourceColumnName)", IntegerType) + + case Timestamps.DAY | Dates.DAY => + (s"cast($sourceColumnName as date)", DateType) + + case t: Truncate[_] if sourceType.typeId() == TypeID.STRING => + (s"substring($sourceColumnName, 0, ${t.width()})", StringType) + + case Timestamps.MONTH | Dates.MONTH => + (s"date_format($sourceColumnName, 'yyyy-MM')", StringType) + + case Timestamps.HOUR => + (s"date_format($sourceColumnName, 'yyyy-MM-dd-HH')", StringType) + + case other => + throw new UnsupportedOperationException( + s"Unsupported partition transform expression when converting to Delta: " + + s"transform: $other, source data type: ${sourceType.typeId()}") + } + + if (transformExpr != "") { + metadataBuilder.putString(GENERATION_EXPRESSION_METADATA_KEY, transformExpr) + } + + Option(sourceField.doc()).foreach { comment => + metadataBuilder.putString("comment", comment) + } + + val metadata = metadataBuilder.build() + + StructField(partField.name(), + targetType, + nullable = sourceField.isOptional(), + metadata = metadata) + } + } +} diff --git a/delta-iceberg-compat/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchProcedureException.scala b/delta-iceberg-compat/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchProcedureException.scala new file mode 100644 index 00000000000..2bd6e707b0f --- /dev/null +++ b/delta-iceberg-compat/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchProcedureException.scala @@ -0,0 +1,23 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.connector.catalog.Identifier + +class NoSuchProcedureException(ident: Identifier) + extends AnalysisException("Procedure " + ident + " not found") diff --git a/delta-iceberg-compat/src/main/scala/org/apache/spark/sql/delta/IcebergSchemaUtils.scala b/delta-iceberg-compat/src/main/scala/org/apache/spark/sql/delta/IcebergSchemaUtils.scala new file mode 100644 index 00000000000..98e94412a28 --- /dev/null +++ b/delta-iceberg-compat/src/main/scala/org/apache/spark/sql/delta/IcebergSchemaUtils.scala @@ -0,0 +1,53 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import org.apache.spark.sql.delta.schema.SchemaMergingUtils +import org.apache.iceberg.Schema +import org.apache.iceberg.spark.SparkSchemaUtil + +import org.apache.spark.sql.types.{MetadataBuilder, StructType} + +object IcebergSchemaUtils { + + /** + * Given an iceberg schema, convert it to a Spark schema. This conversion will keep the Iceberg + * column IDs (used to read Parquet files) in the field metadata + * + * @param icebergSchema + * @return StructType for the converted schema + */ + def convertIcebergSchemaToSpark(icebergSchema: Schema): StructType = { + // Convert from Iceberg schema to Spark schema but without the column IDs + val baseConvertedSchema = SparkSchemaUtil.convert(icebergSchema) + + // For each field, find the column ID (fieldId) and add to the StructField metadata + SchemaMergingUtils.transformColumns(baseConvertedSchema) { (path, field, _) => + // This should be safe to access fields + // scalastyle:off + // https://github.com/apache/iceberg/blob/d98224a82b104888281d4e901ccf948f9642590b/api/src/main/java/org/apache/iceberg/types/IndexByName.java#L171 + // scalastyle:on + val fieldPath = (path :+ field.name).mkString(".") + val id = icebergSchema.findField(fieldPath).fieldId() + field.copy( + metadata = new MetadataBuilder() + .withMetadata(field.metadata) + .putLong(DeltaColumnMapping.COLUMN_MAPPING_METADATA_ID_KEY, id) + .build()) + } + } +} diff --git a/delta-iceberg-compat/src/main/scala/org/apache/spark/sql/delta/IcebergTable.scala b/delta-iceberg-compat/src/main/scala/org/apache/spark/sql/delta/IcebergTable.scala new file mode 100644 index 00000000000..62eb8afc5d7 --- /dev/null +++ b/delta-iceberg-compat/src/main/scala/org/apache/spark/sql/delta/IcebergTable.scala @@ -0,0 +1,266 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import java.util.Locale + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.delta.DeltaColumnMapping +import org.apache.spark.sql.delta.SerializableFileStatus +import org.apache.spark.sql.delta.commands.{ConvertTargetFile, ConvertTargetFileManifest, ConvertTargetTable, ConvertToDeltaCommand} +import org.apache.spark.sql.delta.schema.SchemaMergingUtils +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.util.{DateFormatter, TimestampFormatter} +import org.apache.hadoop.fs.Path +import org.apache.iceberg.Table +import org.apache.iceberg.TableProperties +import org.apache.iceberg.hadoop.HadoopTables +import org.apache.iceberg.transforms.IcebergPartitionUtil + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{AnalysisException, Dataset, SparkSession} +import org.apache.spark.sql.execution.datasources.PartitioningUtils +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.SerializableConfiguration + +/** + * A target Iceberg table for conversion to a Delta table. + * + * @param icebergTable the Iceberg table underneath. + * @param existingSchema schema used for incremental update, none for initial conversion. + */ +class IcebergTable( + spark: SparkSession, + icebergTable: Table, + existingSchema: Option[StructType]) extends ConvertTargetTable { + + def this(spark: SparkSession, basePath: String, existingSchema: Option[StructType]) = + // scalastyle:off deltahadoopconfiguration + this(spark, new HadoopTables(spark.sessionState.newHadoopConf).load(basePath), existingSchema) + // scalastyle:on deltahadoopconfiguration + + private val partitionEvolutionEnabled = + spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CONVERT_ICEBERG_PARTITION_EVOLUTION_ENABLED) + + private val fieldPathToPhysicalName = + existingSchema.map { + SchemaMergingUtils.explode(_).collect { + case (path, field) if DeltaColumnMapping.hasPhysicalName(field) => + path.map(_.toLowerCase(Locale.ROOT)) -> DeltaColumnMapping.getPhysicalName(field) + }.toMap + }.getOrElse(Map.empty[Seq[String], String]) + + private val convertedSchema = { + // Reuse physical names of existing columns. + val mergedSchema = DeltaColumnMapping.setPhysicalNames( + IcebergSchemaUtils.convertIcebergSchemaToSpark(icebergTable.schema()), + fieldPathToPhysicalName) + + // Assign physical names to new columns. + DeltaColumnMapping.assignPhysicalNames(mergedSchema) + } + + override val requiredColumnMappingMode: DeltaColumnMappingMode = IdMapping + + override val properties: Map[String, String] = { + icebergTable.properties().asScala.toMap + (DeltaConfigs.COLUMN_MAPPING_MODE.key -> "id") + } + + override val partitionSchema: StructType = { + // Reuse physical names of existing columns. + val mergedPartitionSchema = DeltaColumnMapping.setPhysicalNames( + StructType( + IcebergPartitionUtil.getPartitionFields(icebergTable.spec(), icebergTable.schema())), + fieldPathToPhysicalName) + + // Assign physical names to new partition columns. + DeltaColumnMapping.assignPhysicalNames(mergedPartitionSchema) + } + + val tableSchema: StructType = PartitioningUtils.mergeDataAndPartitionSchema( + convertedSchema, + partitionSchema, + spark.sessionState.conf.caseSensitiveAnalysis)._1 + + checkConvertible() + + val fileManifest = new IcebergFileManifest(spark, icebergTable, partitionSchema) + + lazy val numFiles: Long = fileManifest.numFiles + + override val format: String = "iceberg" + + def checkConvertible(): Unit = { + /** + * Having multiple partition specs implies that the Iceberg table has experienced + * partition evolution. (https://iceberg.apache.org/evolution/#partition-evolution) + * We don't support the conversion of such tables right now. + * + * Note that this simple check won't consider the underlying data, so there might be cases + * s.t. the data itself is partitioned using a single spec despite multiple specs created + * in the past. we do not account for that atm due to the complexity of data introspection + */ + + if (!partitionEvolutionEnabled && icebergTable.specs().size() > 1) { + throw new UnsupportedOperationException(IcebergTable.ERR_MULTIPLE_PARTITION_SPECS) + } + + /** + * Existing Iceberg Table that has data imported from table without field ids will need + * to add a custom property to enable the mapping for Iceberg. + * Therefore, we can simply check for the existence of this property to see if there was + * a custom mapping within Iceberg. + * + * Ref: https://www.mail-archive.com/dev@iceberg.apache.org/msg01638.html + */ + if (icebergTable.properties().containsKey(TableProperties.DEFAULT_NAME_MAPPING)) { + throw new UnsupportedOperationException(IcebergTable.ERR_CUSTOM_NAME_MAPPING) + } + + /** + * Delta does not support case sensitive columns while Iceberg does. We should check for + * this here to throw a better message tailored to converting to Delta than the default + * AnalysisException + */ + try { + SchemaMergingUtils.checkColumnNameDuplication(tableSchema, "during convert to Delta") + } catch { + case e: AnalysisException if e.getMessage.contains("during convert to Delta") => + throw new UnsupportedOperationException( + IcebergTable.caseSensitiveConversionExceptionMsg(e.getMessage)) + } + } +} + +object IcebergTable { + /** Error message constants */ + val ERR_MULTIPLE_PARTITION_SPECS = + s"""This Iceberg table has undergone partition evolution. Iceberg tables that had partition + | columns removed can be converted without data loss by setting the SQL configuration + | '${DeltaSQLConf.DELTA_CONVERT_ICEBERG_PARTITION_EVOLUTION_ENABLED.key}' to true. Tables that + | had data columns converted to partition columns will not be able to read the pre-partition + | column values.""".stripMargin + val ERR_CUSTOM_NAME_MAPPING = "Cannot convert Iceberg tables with column name mapping" + + def caseSensitiveConversionExceptionMsg(conflictingColumns: String): String = + s"""Cannot convert table to Delta as the table contains column names that only differ by case. + |$conflictingColumns. Delta does not support case sensitive column names. + |Please rename these columns before converting to Delta. + """.stripMargin +} + +class IcebergFileManifest( + spark: SparkSession, + table: Table, + partitionSchema: StructType) extends ConvertTargetFileManifest with Logging { + + // scalastyle:off sparkimplicits + import spark.implicits._ + // scalastyle:on sparkimplicits + + final val VOID_TRANSFORM = "void" + + private var fileSparkResults: Option[Dataset[ConvertTargetFile]] = None + + private var _numFiles: Option[Long] = None + + val basePath = table.location() + + def numFiles: Long = { + if (_numFiles.isEmpty) getFileSparkResults() + _numFiles.get + } + + def allFiles: Dataset[ConvertTargetFile] = { + if (fileSparkResults.isEmpty) getFileSparkResults() + fileSparkResults.get + } + + private def getFileSparkResults(): Unit = { + // scalastyle:off deltahadoopconfiguration + val hadoopConf = spark.sessionState.newHadoopConf() + // scalastyle:on deltahadoopconfiguration + val serializableConfiguration = new SerializableConfiguration(hadoopConf) + val conf = spark.sparkContext.broadcast(serializableConfiguration) + val format = table + .properties() + .getOrDefault( + TableProperties.DEFAULT_FILE_FORMAT, TableProperties.DEFAULT_FILE_FORMAT_DEFAULT) + + if (format != "parquet") { + throw new UnsupportedOperationException( + s"Cannot convert Iceberg tables with file format $format. Only parquet is supported.") + } + + val schemaBatchSize = + spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_IMPORT_BATCH_SIZE_SCHEMA_INFERENCE) + + val partFields = table.spec().fields().asScala + val icebergSchema = table.schema() + // Prune removed partition fields. + val physicalNameToFieldIndex = partFields.zipWithIndex.collect { + case (field, index) if field.transform().toString != VOID_TRANSFORM => + DeltaColumnMapping.getPhysicalName(partitionSchema(field.name)) -> index + }.toMap + + val dateFormatter = DateFormatter() + val timestampFormatter = TimestampFormatter(ConvertToDeltaCommand.timestampPartitionPattern, + java.util.TimeZone.getDefault) + + var numFiles = 0L + val res = table.newScan().planFiles().iterator().asScala.grouped(schemaBatchSize).map { batch => + logInfo(s"Getting file statuses for a batch of ${batch.size} of files; " + + s"finished $numFiles files so far") + numFiles += batch.length + val filePathWithPartValues = batch.map { fileScanTask => + val filePath = fileScanTask.file().path().toString + val partitionValues = if (spark.sessionState.conf.getConf( + DeltaSQLConf.DELTA_CONVERT_ICEBERG_USE_NATIVE_PARTITION_VALUES)) { + val icebergPartitionValues = fileScanTask.file().partition() + val physicalNameToPartValueMap = physicalNameToFieldIndex + .map { case (physicalName, fieldIndex) => + val partValue = icebergPartitionValues.get(fieldIndex, classOf[java.lang.Object]) + val partValueAsString = IcebergPartitionUtil.partitionValueToString( + partFields(fieldIndex), partValue, icebergSchema, dateFormatter, timestampFormatter) + (physicalName, partValueAsString) + } + Some(physicalNameToPartValueMap) + } else None + (filePath, partitionValues) + } + val numParallelism = Math.min(Math.max(filePathWithPartValues.size, 1), + spark.sparkContext.defaultParallelism) + + val rdd = spark.sparkContext.parallelize(filePathWithPartValues, numParallelism) + .mapPartitions { iterator => + iterator.map { case (filePath, partValues) => + val path = new Path(filePath) + val fs = path.getFileSystem(conf.value.value) + val fileStatus = fs.getFileStatus(path) + ConvertTargetFile(SerializableFileStatus.fromStatus(fileStatus), partValues) + } + } + spark.createDataset(rdd) + }.reduce(_.union(_)) + + fileSparkResults = Some(res.cache()) + _numFiles = Some(numFiles) + } + + override def close(): Unit = fileSparkResults.map(_.unpersist()) +} diff --git a/delta-iceberg-compat/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/delta-iceberg-compat/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister new file mode 100644 index 00000000000..9402b69a2d4 --- /dev/null +++ b/delta-iceberg-compat/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister @@ -0,0 +1 @@ +org.apache.iceberg.spark.source.IcebergSource diff --git a/delta-iceberg-compat/src/test/scala/org/apache/spark/sql/delta/ConvertIcebergToDeltaSuite.scala b/delta-iceberg-compat/src/test/scala/org/apache/spark/sql/delta/ConvertIcebergToDeltaSuite.scala new file mode 100644 index 00000000000..e3881a8c2e2 --- /dev/null +++ b/delta-iceberg-compat/src/test/scala/org/apache/spark/sql/delta/ConvertIcebergToDeltaSuite.scala @@ -0,0 +1,1358 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +// scalastyle:off import.ordering.noEmptyLine +import java.io.File +import java.sql.Timestamp +import java.text.SimpleDateFormat +import java.util.TimeZone +import java.util.concurrent.TimeUnit + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.apache.spark.sql.delta.commands.ConvertToDeltaCommand +import org.apache.spark.sql.delta.schema.{SchemaMergingUtils, SchemaUtils} +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.stats.StatsUtils +import org.apache.spark.sql.delta.test.DeltaSQLCommandTest +import org.apache.commons.io.FileUtils +import org.apache.hadoop.fs.Path +import org.apache.iceberg.{PartitionField, Table, TableProperties} +import org.apache.iceberg.hadoop.HadoopTables + +import org.apache.spark.SparkException +import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.functions.col +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils +// scalastyle:on import.ordering.noEmptyLine + +trait ConvertIcebergToDeltaUtils extends SharedSparkSession + with DeltaSQLCommandTest { + + protected var warehousePath: File = null + protected lazy val table: String = "local.db.table" + protected lazy val tablePath: String = "file://" + warehousePath.getCanonicalPath + "/db/table" + protected lazy val nestedTable: String = "local.db.nested_table" + protected lazy val nestedTablePath: String = + "file://" + warehousePath.getCanonicalPath + "/db/nested_table" + + + override def beforeAll(): Unit = { + warehousePath = Utils.createTempDir() + super.beforeAll() + } + + override def afterAll(): Unit = { + super.afterAll() + if (warehousePath != null) Utils.deleteRecursively(warehousePath) + } + + override def afterEach(): Unit = { + sql(s"DROP TABLE IF EXISTS $table") + super.afterEach() + } + + /** + * Setting the java default timezone, as we use java.util.TimeZone.getDefault for partition + * values... + * + * In production clusters, the default timezone is always set as UTC. + */ + def withDefaultTimeZone(timeZoneId: String)(func: => Unit): Unit = { + val previousTimeZone = TimeZone.getDefault() + try { + TimeZone.setDefault(TimeZone.getTimeZone(timeZoneId)) + func + } finally { + TimeZone.setDefault(previousTimeZone) + } + } + + protected override def sparkConf = super.sparkConf + .set( + "spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") + .set( + "spark.sql.catalog.local.type", "hadoop") + .set( + "spark.sql.catalog.local.warehouse", warehousePath.getCanonicalPath) + .set("spark.sql.session.timeZone", "UTC") + + protected val schemaDDL = "id bigint, data string, ts timestamp, dt date" + protected val schema = StructType.fromDDL(schemaDDL) + + protected def readIcebergHadoopTable(tablePath: String): Table = { + // scalastyle:off deltahadoopconfiguration + new HadoopTables(spark.sessionState.newHadoopConf).load(tablePath) + // scalastyle:on deltahadoopconfiguration + } +} + +trait ConvertIcebergToDeltaSuiteBase + extends QueryTest + with ConvertIcebergToDeltaUtils + with StatsUtils { + + protected def convert(tableIdentifier: String, partitioning: Option[String] = None): Unit + + test("table with deleted files") { + withTable(table) { + spark.sql( + s"""CREATE TABLE $table (id bigint, data string) + |USING iceberg PARTITIONED BY (data)""".stripMargin) + spark.sql(s"INSERT INTO $table VALUES (1, 'a'), (2, 'b'), (3, 'c')") + spark.sql(s"DELETE FROM $table WHERE data > 'a'") + checkAnswer( + spark.sql(s"SELECT * from $table"), Row(1, "a") :: Nil) + + convert(s"iceberg.`$tablePath`") + assert(SchemaMergingUtils.equalsIgnoreCaseAndCompatibleNullability( + spark.read.format("delta").load(tablePath).schema, + new StructType().add("id", LongType).add("data", StringType))) + checkAnswer( + spark.read.format("delta").load(tablePath), + Row(1, "a") :: Nil) + } + } + + + test("non-parquet table") { + withTable(table) { + spark.sql( + s"""CREATE TABLE $table (id bigint, data string) + |USING iceberg PARTITIONED BY (data) + |TBLPROPERTIES ('write.format.default'='orc') + |""".stripMargin) + spark.sql(s"INSERT INTO $table VALUES (1, 'a'), (2, 'b'), (3, 'c')") + val e = intercept[UnsupportedOperationException] { + convert(s"iceberg.`$tablePath`") + } + assert(e.getMessage.contains("Cannot convert") && e.getMessage.contains("orc")) + } + } + + test("external location") { + withTempDir { dir => + withTable(table) { + spark.sql( + s"""CREATE TABLE $table (id bigint, data string) + |USING iceberg PARTITIONED BY (data)""".stripMargin) + spark.sql(s"INSERT INTO $table VALUES (1, 'a'), (2, 'b')") + spark.sql(s"INSERT INTO $table VALUES (3, 'c')") + ConvertToDeltaCommand( + TableIdentifier(tablePath, Some("iceberg")), + None, + Some(dir.getCanonicalPath)).run(spark) + + checkAnswer( + spark.read.format("delta").load(dir.getCanonicalPath), + Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil) + } + } + } + + test("table with renamed columns") { + withTable(table) { + spark.sql( + s"""CREATE TABLE $table (id bigint, data string) + |USING iceberg PARTITIONED BY (data)""".stripMargin) + spark.sql(s"INSERT INTO $table VALUES (1, 'a'), (2, 'b')") + spark.sql("ALTER TABLE local.db.table RENAME COLUMN id TO id2") + spark.sql(s"INSERT INTO $table VALUES (3, 'c')") + convert(s"iceberg.`$tablePath`") + + // The converted delta table will get the updated schema + assert( + SchemaMergingUtils.equalsIgnoreCaseAndCompatibleNullability( + spark.read.format("delta").load(tablePath).schema, + new StructType().add("id2", LongType).add("data", StringType))) + + // Parquet files still have the old schema + assert( + SchemaMergingUtils.equalsIgnoreCaseAndCompatibleNullability( + spark.read.format("parquet").load(tablePath + "/data").schema, + new StructType().add("id", LongType).add("data", StringType))) + + val properties = readIcebergHadoopTable(tablePath).properties() + + // This confirms that name mapping is not used for this case + assert(properties.get(TableProperties.DEFAULT_NAME_MAPPING) == null) + + // As of right now, the data added before rename will be nulls. + checkAnswer( + spark.read.format("delta").load(tablePath), + Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil) + } + } + + test("columns starting with numbers") { + val table2 = "local.db.table2" + val tablePath2 = tablePath + "2" + withTable(table2) { + spark.sql( + s"""CREATE TABLE $table2 (1id bigint, 2data string) + |USING iceberg PARTITIONED BY (2data)""".stripMargin) + spark.sql(s"INSERT INTO $table2 VALUES (1, 'a'), (2, 'b')") + spark.sql(s"INSERT INTO $table2 VALUES (3, 'c')") + assert(spark.sql(s"select * from $table2").schema == + new StructType().add("1id", LongType).add("2data", StringType)) + + checkAnswer( + spark.sql(s"select * from $table2"), + Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil) + + val properties = readIcebergHadoopTable(tablePath2).properties() + + // This confirms that name mapping is not used for this case + assert(properties.get(TableProperties.DEFAULT_NAME_MAPPING) == null) + + convert(s"iceberg.`$tablePath2`") + // The converted delta table gets the updated schema + assert( + SchemaMergingUtils.equalsIgnoreCaseAndCompatibleNullability( + spark.read.format("delta").load(tablePath2).schema, + new StructType().add("1id", LongType).add("2data", StringType))) + + // parquet file schema has been modified + assert( + spark.read.format("parquet").load(tablePath2 + "/data").schema == + new StructType() + .add("_1id", LongType) + .add("_2data", StringType) + // this is the partition column, which stays as-is + .add("2data", StringType)) + + checkAnswer( + spark.read.format("delta").load(tablePath2), + Row(1, "a") :: Row(2, "b") :: Row(3, "c") :: Nil) + } + } + + test("nested schema") { + withTable(table) { + def createDDL(tname: String): String = + s"""CREATE TABLE $tname (id bigint, person struct) + |USING iceberg PARTITIONED BY (truncate(person.name, 2))""".stripMargin + def insertDDL(tname: String): String = + s"INSERT INTO $tname VALUES (1, ('aaaaa', 10)), (2, ('bbbbb', 20))" + testNestedColumnIDs(createDDL(nestedTable), insertDDL(nestedTable)) + + spark.sql(createDDL(table)) + + spark.sql(s"INSERT INTO $table VALUES (1, ('aaaaa', 10)), (2, ('bbbbb', 20))") + checkAnswer( + spark.sql(s"SELECT * from $table"), + Row(1, Row("aaaaa", 10)) :: Row(2, Row("bbbbb", 20)) :: Nil) + + convert(s"iceberg.`$tablePath`") + + val tblSchema = spark.read.format("delta").load(tablePath).schema + + val expectedSchema = new StructType() + .add("id", LongType) + .add("person", new StructType().add("name", StringType).add("phone", IntegerType)) + .add("person.name_trunc", StringType) + + assert(SchemaMergingUtils.equalsIgnoreCaseAndCompatibleNullability(expectedSchema, tblSchema)) + + checkAnswer( + spark.read.format("delta").load(tablePath), + Row(1, Row("aaaaa", 10), "aa") :: Row(2, Row("bbbbb", 20), "bb") :: Nil) + + assert( + spark.sql(s"select * from delta.`$tablePath` where person.name > 'b'") + .inputFiles.length == 1) + + spark.sql( + s""" + |insert into $table (id, person) + |values (3, struct("ccccc", 30)) + |""".stripMargin) + + val insertDataSchema = StructType.fromDDL("id bigint, person struct") + val df = spark.createDataFrame(Seq(Row(3L, Row("ccccc", 30))).asJava, insertDataSchema) + df.write.format("delta").mode("append").save(tablePath) + + checkAnswer( + // check the raw parquet partition directories written out by Iceberg + spark.sql(s"select * from parquet.`$tablePath/data`"), + spark.sql(s"select * from delta.`$tablePath`") + ) + assert( + spark.sql(s"select * from delta.`$tablePath` where person.name > 'b'") + .inputFiles.length == 2) + } + } + + private def schemaTestNoDataSkipping( + createTableSql: String, + initialInsertValuesSql: String, + expectedInitialRows: Seq[Row], + expectedSchema: StructType, + finalInsertValuesSql: String) : Unit = { + withTable(table) { + spark.sql(s"DROP TABLE IF EXISTS $table") + spark.sql(s"CREATE TABLE $table $createTableSql USING iceberg") + spark.sql(s"INSERT INTO $table VALUES $initialInsertValuesSql") + checkAnswer(spark.sql(s"SELECT * FROM $table"), expectedInitialRows) + + convert(s"iceberg.`$tablePath`") + + val tblSchema = spark.read.format("delta").load(tablePath).schema + + assert(SchemaMergingUtils.equalsIgnoreCaseAndCompatibleNullability(expectedSchema, tblSchema)) + + checkAnswer(spark.read.format("delta").load(tablePath), expectedInitialRows) + + spark.sql( + s""" + |INSERT INTO $table + |VALUES $finalInsertValuesSql + |""".stripMargin) + + spark.sql( + s""" + |INSERT INTO delta.`$tablePath` + |VALUES $finalInsertValuesSql + |""".stripMargin) + + checkAnswer( + // check the raw parquet partition directories written out by Iceberg + spark.sql(s"SELECT * FROM parquet.`$tablePath/data`"), + spark.sql(s"SELECT * FROM delta.`$tablePath`") + ) + } + } + + test("array of struct schema") { + val createTableSql = "(id bigint, grades array>)" + val initialInsertValuesSql = "(1, array(('mat', 10), ('cs', 90))), (2, array(('eng', 80)))" + val expectedInitialRows = Row(1, Seq(Row("mat", 10), Row("cs", 90))) :: + Row(2, Seq(Row("eng", 80))) :: Nil + val arrayType = ArrayType(new StructType().add("class", StringType).add("score", IntegerType)) + val expectedSchema = new StructType() + .add("id", LongType) + .add("grades", arrayType) + val finalInsertValuesSql = "(3, array(struct(\"mat\", 100), struct(\"cs\", 100)))" + + schemaTestNoDataSkipping(createTableSql, initialInsertValuesSql, expectedInitialRows, + expectedSchema, finalInsertValuesSql) + } + + test("map schema") { + val createTableSql = "(id bigint, grades map)" + val initialInsertValuesSql = "(1, map('mat', 10, 'cs', 90)), (2, map('eng', 80))" + val expectedInitialRows = Row(1, Map[String, Int]("mat" -> 10, "cs" -> 90)) :: + Row(2, Map[String, Int]("eng" -> 80)) :: Nil + val expectedSchema = new StructType() + .add("id", LongType) + .add("grades", MapType(StringType, IntegerType)) + val finalInsertValuesSql = "(3, map(\"mat\", 100, \"cs\", 100))" + + schemaTestNoDataSkipping(createTableSql, initialInsertValuesSql, expectedInitialRows, + expectedSchema, finalInsertValuesSql) + } + + test("partition schema is not allowed") { + withTable(table) { + spark.sql( + s"""CREATE TABLE $table (id bigint, data string) + |USING iceberg PARTITIONED BY (data) + |""".stripMargin) + spark.sql(s"INSERT INTO $table VALUES (1, 'a'), (2, 'b'), (3, 'c')") + val e = intercept[IllegalArgumentException] { + convert(s"iceberg.`$tablePath`", Some("data string")) + } + assert(e.getMessage.contains("Partition schema cannot be specified")) + } + } + + test("copy over Iceberg table properties") { + withTable(table) { + spark.sql( + s"""CREATE TABLE $table (id bigint, data string) + |USING iceberg PARTITIONED BY (data)""".stripMargin) + spark.sql(s"INSERT INTO $table VALUES (1, 'a'), (2, 'b'), (3, 'c')") + spark.sql( + s"""ALTER TABLE $table SET TBLPROPERTIES( + | 'read.split.target-size'='268435456' + |)""".stripMargin) + convert(s"iceberg.`$tablePath`") + checkAnswer( + spark.sql(s"SHOW TBLPROPERTIES delta.`$tablePath`") + .filter(col("key").startsWith("read.")), + Row("read.split.target-size", "268435456") :: Nil + ) + } + } + + test("converted table columns have metadata containing iceberg column ids") { + + val nested1 = s"""CREATE TABLE $nestedTable (name string, age int, + |pokemon array>) + |USING iceberg""".stripMargin + + val insert1 = s"""INSERT INTO $nestedTable VALUES ('Ash', 10, + |array(struct('Charizard', 'Fire/Flying'), struct('Pikachu', 'Electric'))) + """.stripMargin + testNestedColumnIDs(nested1, insert1) + + val nested2 = s"""CREATE TABLE $nestedTable (name string, + |info struct, id:int>) + |USING iceberg""".stripMargin + + val insert2 = s"""INSERT INTO $nestedTable VALUES ('Zigzagoon', + |struct(struct('Hoenn', 'Common'), 263)) + """.stripMargin + testNestedColumnIDs(nested2, insert2) + + val nested3 = s"""CREATE TABLE $nestedTable (name string, + |moves map>) + |USING iceberg""".stripMargin + + val insert3 = s"""INSERT INTO $nestedTable VALUES ('Heatran', + |map('Fire Fang', struct(17, 7))) + """.stripMargin + testNestedColumnIDs(nested3, insert3) + } + + test("comments are retained from Iceberg") { + withTable(table) { + spark.sql( + s"""CREATE TABLE $table (id bigint comment "myexample", data string comment "myexample") + |USING iceberg PARTITIONED BY (data)""".stripMargin) + spark.sql(s"INSERT INTO $table VALUES (1, 'a'), (2, 'b'), (3, 'c')") + + convert(s"iceberg.`$tablePath`") + + val readSchema = spark.read.format("delta").load(tablePath).schema + readSchema.foreach { field => + assert(field.getComment().contains("myexample")) + } + } + } + + private def testNestedColumnIDs(createString: String, insertString: String): Unit = { + // Nested schema + withTable(nestedTable) { + // Create table and insert into it + spark.sql(createString) + + spark.sql(insertString) + + // Convert to Delta + convert(s"iceberg.`$nestedTablePath`") + + // Check Delta schema + val schema = DeltaLog.forTable(spark, new Path(nestedTablePath)).update().schema + + // Get initial Iceberg schema + val icebergTable = readIcebergHadoopTable(nestedTablePath) + val icebergSchema = icebergTable.schema() + + // Check all nested fields to see if they all have a column ID then check the iceberg schema + // for whether that column ID corresponds to the same column name + val columnIds = mutable.Set[Long]() + SchemaMergingUtils.transformColumns(schema) { (_, field, _) => + assert(DeltaColumnMapping.hasColumnId(field)) + // nest column ids should be distinct + val id = DeltaColumnMapping.getColumnId(field) + assert(!columnIds.contains(id)) + columnIds.add(id) + // the id can either be a data schema id or a identity transform partition field + // or it is generated bc it's a non-identity transform partition field + assert( + Option(icebergSchema.findField(id)).map(_.name()).contains(field.name) || + icebergTable.spec().fields().asScala.map(_.name()).contains(field.name) + ) + field + } + } + } + + test("conversion should fail if had partition evolution / multiple partition specs") { + /** + * Per https://iceberg.apache.org/evolution/#partition-evolution, if partition evolution happens + * in Iceberg, multiple partition specs are persisted, thus convert to Delta cannot be + * supported w/o repartitioning because Delta only supports one consistent spec + */ + withTable(table) { + spark.sql( + s"""CREATE TABLE $table (id bigint, data string, data2 string) + |USING iceberg PARTITIONED BY (data)""".stripMargin) + spark.sql(s"INSERT INTO $table VALUES (1, 'a', 'x'), (2, 'b', 'y'), (3, 'c', 'z')") + // add new partition spec + readIcebergHadoopTable(tablePath).updateSpec().addField("data2").commit() + spark.sql(s"INSERT INTO $table VALUES (1, 'a', 'x'), (2, 'b', 'y'), (3, 'c', 'z')") + // partition evolution happens, convert will fail + val e1 = intercept[UnsupportedOperationException] { + convert(s"iceberg.`$tablePath`") + } + assert(e1.getMessage.contains(IcebergTable.ERR_MULTIPLE_PARTITION_SPECS)) + + // drop old partition spec + readIcebergHadoopTable(tablePath).updateSpec().removeField("data2").commit() + spark.sql(s"INSERT INTO $table VALUES (1, 'a', 'x'), (2, 'b', 'y'), (3, 'c', 'z')") + // partition spec is reverted, but partition evolution happens already + // use assert explicitly bc we do not want checks in IcebergPartitionUtils to run first + assert(readIcebergHadoopTable(tablePath).specs().size() > 1) + } + } + + test("convert Iceberg table with not null columns") { + withTable(table) { + spark.sql( + s"""CREATE TABLE $table (id bigint NOT NULL, data string, name string NOT NULL) + |USING iceberg PARTITIONED BY (id)""".stripMargin) + spark.sql(s"INSERT INTO $table VALUES (1, 'a', 'b'), (2, 'b', 'c'), (3, 'c', 'd')") + convert(s"iceberg.`$tablePath`") + val data = spark.read.format("delta").load(tablePath) + // verify data is converted properly + checkAnswer(data, Seq(Row(1, "a", "b"), Row(2, "b", "c"), Row(3, "c", "d"))) + + // Verify schema contains not null constraint where appropriate + val dataSchema = data.schema + dataSchema.foreach { field => + // both partition columns and data columns should have the correct nullability + if (field.name == "id" || field.name == "name") { + assert(!field.nullable) + } else { + assert(field.nullable) + } + } + + // Should not be able to write nulls to not null data column + var ex = intercept[AnalysisException] { + spark.sql(s"INSERT INTO $table VALUES (4, 'd', null)") + } + assert(ex.getMessage.contains("""Cannot write nullable values to non-null column 'name'""")) + + // Should not be able to write nulls to not null partition column + ex = intercept[AnalysisException] { + spark.sql(s"INSERT INTO $table VALUES (null, 'e', 'e')") + } + assert(ex.getMessage.contains("""Cannot write nullable values to non-null column 'id'""")) + + // Should be able to write nulls to nullable column + spark.sql(s"INSERT INTO $table VALUES (5, null, 'e')") + } + } + + test("convert Iceberg table with case sensitive columns") { + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") { + withTable(table) { + spark.sql( + s"""CREATE TABLE $table (i bigint NOT NULL, I string) + |USING iceberg PARTITIONED BY (I)""".stripMargin) + spark.sql(s"INSERT INTO $table VALUES (1, 'a'), (2, 'b'), (3, 'c')") + val ex = intercept[UnsupportedOperationException] { + convert(s"iceberg.`$tablePath`") + } + + assert(ex.getMessage.contains("contains column names that only differ by case")) + } + } + } + + test("should block converting Iceberg table with name mapping") { + withTable(table) { + spark.sql( + s"""CREATE TABLE $table (id bigint, data string) + |USING iceberg PARTITIONED BY (data) + |""".stripMargin + ) + spark.sql( + s"""ALTER TABLE $table SET TBLPROPERTIES( + | 'schema.name-mapping.default' = + | '[{"field-id": 1, "names": ["my_id"]},{"field-id": 2, "names": ["my_data"]}]' + |)""".stripMargin) + + val e = intercept[UnsupportedOperationException] { + convert(s"iceberg.`$tablePath`") + } + assert(e.getMessage.contains(IcebergTable.ERR_CUSTOM_NAME_MAPPING)) + + } + } + + private def testNullPartitionValues(): Unit = { + withTable(table) { + spark.sql( + s"""CREATE TABLE $table (id bigint, data string, dt date) + |USING iceberg PARTITIONED BY (dt)""".stripMargin) + spark.sql(s"INSERT INTO $table" + + s" VALUES (1, 'a', null), (2, 'b', null), (3, 'c', cast('2021-01-03' as date))") + convert(s"iceberg.`$tablePath`") + val data = spark.read.format("delta").load(tablePath) + val fmt = new SimpleDateFormat("yyyy-MM-dd") + checkAnswer(data, + Seq( + Row(1, "a", null), + Row(2, "b", null), + Row(3, "c", new java.sql.Date(fmt.parse("2021-01-03").getTime)))) + } + } + + test("partition columns are null") { + withSQLConf(DeltaSQLConf.DELTA_CONVERT_ICEBERG_USE_NATIVE_PARTITION_VALUES.key -> "false") { + val e = intercept[RuntimeException] { + testNullPartitionValues() + } + assert(e.getMessage.contains("Failed to cast partition value")) + } + + withSQLConf( + DeltaSQLConf.DELTA_CONVERT_PARTITION_VALUES_IGNORE_CAST_FAILURE.key -> "true", + DeltaSQLConf.DELTA_CONVERT_ICEBERG_USE_NATIVE_PARTITION_VALUES.key -> "false") { + testNullPartitionValues() + } + + // default setting should work + testNullPartitionValues() + } + + test("arbitrary name") { + def col(name: String): String = name + "with_special_chars_;{}()\n\t=" + + // turns out Iceberg would fail when partition col names have special chars + def partCol(name: String): String = "0123" + name + + withTable(table) { + spark.sql( + s"""CREATE TABLE $table ( + | `${col("data")}` int, + | `${partCol("part1")}` bigint, + | `${partCol("part2")}` string) + |USING iceberg + |PARTITIONED BY ( + | `${partCol("part1")}`, + | truncate(`${partCol("part2")}`, 4)) + |""".stripMargin) + + spark.sql( + s""" + |INSERT INTO $table + |VALUES (123, 1234567890123, 'str11') + |""".stripMargin) + + convert(s"iceberg.`$tablePath`") + + spark.sql( + s""" + |INSERT INTO delta.`$tablePath` + |VALUES (456, 4567890123456, 'str22', 'str2') + |""".stripMargin) + + checkAnswer(spark.sql(s"select * from delta.`$tablePath`"), + Seq( + Row(123, 1234567890123L, "str11", "str1"), + Row(456, 4567890123456L, "str22", "str2"))) + + // projection and filter + checkAnswer( + spark.table(s"delta.`$tablePath`") + .select(s"`${col("data")}`", s"`${partCol("part1")}`") + .where(s"`${partCol("part2")}` = 'str22'"), + Seq(Row(456, 4567890123456L))) + } + } + + test("partition by identity, using native partition values") { + withDefaultTimeZone("UTC") { + withTable(table) { + spark.sql( + s"""CREATE TABLE $table ( + | data_binary binary, + | part_ts timestamp, + | part_date date, + | part_bool boolean, + | part_int integer, + | part_long long, + | part_float float, + | part_double double, + | part_decimal decimal(3, 2), + | part_string string + | ) + |USING iceberg PARTITIONED BY (part_ts, part_date, part_bool, part_int, part_long, + | part_float, part_double, part_decimal, part_string)""".stripMargin) + + def insertData(targetTable: String): Unit = { + spark.sql( + s""" + |INSERT INTO $targetTable + |VALUES (cast('this is binary' as binary), + | cast(1635728400000 as timestamp), + | cast('2021-11-15' as date), + | true, + | 123, + | 12345678901234, + | 123.4, + | 123.4, + | 1.23, + | 'this is a string')""".stripMargin) + } + + insertData(table) + withTempDir { dir => + val deltaPath = dir.getCanonicalPath + ConvertToDeltaCommand( + tableIdentifier = TableIdentifier(tablePath, Some("iceberg")), + partitionSchema = None, + Some(deltaPath)).run(spark) + // check that all the partition value types can be converted correctly + checkAnswer(spark.table(s"delta.`$deltaPath`"), spark.table(table)) + + insertData(s"delta.`$deltaPath`") + insertData(table) + // check that new writes to both Delta and Iceberg can be read back the same + checkAnswer(spark.table(s"delta.`$deltaPath`"), spark.table(table)) + } + } + } + } + + test("block convert: binary type partition columns") { + withTable(table) { + spark.sql( + s"""CREATE TABLE $table ( + | data int, + | part binary) + |USING iceberg + |PARTITIONED BY (part) + |""".stripMargin) + spark.sql(s"insert into $table values (123, cast('str1' as binary))") + val e = intercept[UnsupportedOperationException] { + convert(s"iceberg.`$tablePath`") + } + assert(e.getMessage.contains("Unsupported partition transform expression")) + } + } + + test("block convert: partition transform truncate on non-string type") { + withTable(table) { + spark.sql( + s"""CREATE TABLE $table ( + | data int, + | part int) + |USING iceberg + |PARTITIONED BY (truncate(part, 3)) + |""".stripMargin) + spark.sql(s"insert into $table values (123, 123456)") + val e = intercept[UnsupportedOperationException] { + convert(s"iceberg.`$tablePath`") + } + assert(e.getMessage.contains("Unsupported partition transform expression")) + } + } +} + +class ConvertIcebergToDeltaScalaSuite extends ConvertIcebergToDeltaSuiteBase { + override protected def convert( + tableIdentifier: String, + partitioning: Option[String] = None): Unit = { + if (partitioning.isDefined) { + io.delta.tables.DeltaTable.convertToDelta(spark, tableIdentifier, partitioning.get) + } else { + io.delta.tables.DeltaTable.convertToDelta(spark, tableIdentifier) + } + } +} + +class ConvertIcebergToDeltaSQLSuite extends ConvertIcebergToDeltaSuiteBase { + override protected def convert( + tableIdentifier: String, + partitioning: Option[String] = None): Unit = { + val statement = partitioning.map(p => s" PARTITIONED BY ($p)").getOrElse("") + spark.sql(s"CONVERT TO DELTA ${tableIdentifier}${statement}") + } +} + +class ConvertIcebergToDeltaPartitioningSuite extends QueryTest + with ConvertIcebergToDeltaUtils +{ + + import testImplicits._ + + protected override def test(testName: String, testTags: org.scalatest.Tag*) + (testFun: => Any) + (implicit pos: org.scalactic.source.Position): Unit = { + Seq("true", "false").foreach { flag => + val msg = if (flag == "true") "- with native partition values" + else "- with inferred partition values" + super.test(testName + msg, testTags : _*) { + withSQLConf(DeltaSQLConf.DELTA_CONVERT_ICEBERG_USE_NATIVE_PARTITION_VALUES.key -> flag) { + testFun + } + }(pos) + } + } + + private def mergeSchema(dataSchema: StructType, partitionSchema: StructType): StructType = { + StructType(dataSchema.fields ++ + partitionSchema.fields.filter { partField => + !dataSchema.fields.exists(f => spark.sessionState.conf.resolver(partField.name, f.name))}) + } + + private def createIcebergTable( + tableName: String, + partitionColumns: Seq[String]): Unit = { + spark.sql(s"drop table if exists $tableName") + if (partitionColumns.isEmpty) { + spark.sql( + s"""CREATE TABLE $tableName (id bigint, data string, ts timestamp, dt date) + |USING iceberg""".stripMargin) + } else { + spark.sql( + s"""CREATE TABLE $tableName (id bigint, data string, ts timestamp, dt date) + |USING iceberg PARTITIONED BY (${partitionColumns.mkString(",")})""".stripMargin) + } + spark.sql( + s""" + |INSERT INTO $tableName + |VALUES (1, 'abc', cast('2021-06-01 18:00:00' as timestamp), + | cast('2021-06-01' as date))""".stripMargin) + + spark.sql( + s""" + |INSERT INTO $tableName + |VALUES (2, 'ace', cast('2022-07-01 20:00:00' as timestamp), + | cast('2022-07-01' as date))""".stripMargin) + } + + private def testConvertToDelta( + tableName: String, + tablePath: String, + partitionSchemaDDL: String, + checkSkipping: Map[String, Int] = Map(), + deltaDir: Option[File] = None): Unit = withTempDir { dir => + val deltaPath = deltaDir.getOrElse(dir).getCanonicalPath + // for easy testing, convert it at an external location + ConvertToDeltaCommand( + tableIdentifier = TableIdentifier(tablePath, Some("iceberg")), + partitionSchema = None, + Some(deltaPath)).run(spark) + + // no stats collection on new writes too + sql( + s""" + |ALTER TABLE delta.`$deltaPath` + |SET TBLPROPERTIES ( + | '${DeltaConfigs.DATA_SKIPPING_NUM_INDEXED_COLS.key}' = '0')""".stripMargin) + + val expectedPartitionSchema = StructType.fromDDL(partitionSchemaDDL) + + // check converted table schema + val icebergTable = readIcebergHadoopTable(tablePath) + val icebergSchema = icebergTable.schema() + val convertedSchema = DeltaLog.forTable(spark, new Path(deltaPath)).update().schema + + val columnIds = mutable.Set[Long]() + val schemaWithoutMetadata = + SchemaMergingUtils.transformColumns(convertedSchema) { (_, field, _) => + // all columns should have the columnID metadata + assert(DeltaColumnMapping.hasColumnId(field)) + // all columns should have physical name metadata + assert(DeltaColumnMapping.hasPhysicalName(field)) + // nest column ids should be distinct + val id = DeltaColumnMapping.getColumnId(field) + assert(!columnIds.contains(id)) + columnIds.add(id) + // the id can either be a data schema id or a identity transform partition field + // or it is generated because it's a non-identity transform partition field + assert( + Option(icebergSchema.findField(id)).map(_.name()).contains(field.name) || + icebergTable.spec().fields().asScala.map(_.name()).contains(field.name) + ) + field.copy(metadata = Metadata.empty) + } + + assert(schemaWithoutMetadata == mergeSchema(schema, expectedPartitionSchema)) + + // check partition columns + assert(expectedPartitionSchema.map(_.name) == + DeltaLog.forTable(spark, new Path(deltaPath)).update().metadata.partitionColumns) + + checkAnswer( + // the converted delta table will have partition columns + spark.sql(s"select ${schema.fields.map(_.name).mkString(",")} from delta.`$deltaPath`"), + spark.sql(s"select * from $tableName")) + + // add the same data to both Delta table and iceberg table + val df = spark.sql( + s""" + |SELECT 3L AS id, 'acf' AS data, + | cast('2023-07-01 03:00:00' as timestamp) AS ts, + | cast('2023-07-01' as date) AS dt + |""".stripMargin) + df.write.format("delta").mode("append").save(deltaPath) + + spark.sql( + s""" + |INSERT INTO $tableName (id, data, ts, dt) + |VALUES (3, 'acf', cast('2023-07-01 03:00:00' as timestamp), + | cast('2023-07-01' as date))""".stripMargin) + + // add a check for full scan too + (checkSkipping ++ Map("" -> 3)).foreach { case (filter, numFilesScanned) => + val filterExpr = if (filter == "") "" else s"where $filter" + checkAnswer( + // the converted delta table will have partition columns + spark.sql( + s"select ${schema.fields.map(_.name).mkString(",")} " + + s"from delta.`$deltaPath` where $filter"), + spark.sql(s"select * from $tableName $filterExpr")) + + if (!partitionSchemaDDL.equals("ts_month string") && + !partitionSchemaDDL.equals("dt_month string")) { + checkAnswer( + // check the raw parquet partition directories written out by Iceberg + spark.sql(s"select * from parquet.`$tablePath/data` $filterExpr"), + spark.sql(s"select * from delta.`$deltaPath` $filterExpr") + ) + } else { + withSQLConf("spark.sql.sources.partitionColumnTypeInference.enabled" -> "false") { + checkAnswer( + // check the raw parquet partition directories written out by Iceberg + // and not infer partition column type for ts_month and dt_month + // since: 2020-01 will be inferred as a date and cast it to 2020-01-01 + spark.sql(s"select * from parquet.`$tablePath/data` $filterExpr"), + spark.sql(s"select * from delta.`$deltaPath` $filterExpr") + ) + } + } + assert( + spark.sql(s"select * from delta.`$deltaPath` $filterExpr").inputFiles.length == + numFilesScanned) + } + } + + test("partition by year") { + withTable(table) { + createIcebergTable(table, Seq("years(ts)")) + testConvertToDelta( + table, + tablePath, + "ts_year int", + Map( + "ts < cast('2021-06-01 00:00:00' as timestamp)" -> 1, + "ts <= cast('2021-06-01 00:00:00' as timestamp)" -> 1, + "ts > cast('2021-06-01 00:00:00' as timestamp)" -> 3, + "ts > cast('2022-01-01 00:00:00' as timestamp)" -> 2)) + } + + withTable(table) { + createIcebergTable(table, Seq("years(dt)")) + testConvertToDelta( + table, + tablePath, + "dt_year int", + Map( + "dt < cast('2021-06-01' as date)" -> 1, + "dt <= cast('2021-06-01' as date)" -> 1, + "dt > cast('2021-06-01' as date)" -> 3, + "dt = cast('2022-08-01' as date)" -> 1) + ) + } + } + + test("partition by day") { + withTable(table) { + createIcebergTable(table, Seq("days(ts)")) + testConvertToDelta( + table, + tablePath, + "ts_day date", + Map("ts < cast('2021-07-01 00:00:00' as timestamp)" -> 1)) + } + + withTable(table) { + createIcebergTable(table, Seq("days(dt)")) + testConvertToDelta( + table, + tablePath, + "dt_day date", + Map( + "dt < cast('2021-06-01' as date)" -> 1, + "dt <= cast('2021-06-01' as date)" -> 1, + "dt > cast('2021-06-01' as date)" -> 3, + "dt = cast('2022-07-01' as date)" -> 1) + ) + } + } + + test("partition by truncate string") { + withTable(table) { + createIcebergTable(table, Seq("truncate(data, 2)")) + testConvertToDelta( + table, + tablePath, + "data_trunc string", + Map( + "data >= 'ac'" -> 2, + "data >= 'ad'" -> 0 + ) + ) + } + } + + test("partition by identity") { + withTable(table) { + createIcebergTable(table, Seq("data")) + + withTempDir { dir => + val deltaDir = new File(dir, "delta-table") + testConvertToDelta( + table, + tablePath, + "data string", + deltaDir = Some(deltaDir) + ) + + spark.read.format("delta").load(deltaDir.getCanonicalPath).inputFiles.foreach { fileName => + val sourceFile = new File(fileName.stripPrefix("file:")) + val targetFile = new File(dir, sourceFile.getName) + FileUtils.copyFile(sourceFile, targetFile) + val parquetFileSchema = + spark.read.format("parquet").load(targetFile.getCanonicalPath).schema + if (fileName.contains("acf")) { // new file written by delta + SchemaMergingUtils.equalsIgnoreCaseAndCompatibleNullability( + parquetFileSchema, StructType(schema.fields.filter(_.name != "data"))) + } else { + SchemaMergingUtils.equalsIgnoreCaseAndCompatibleNullability(parquetFileSchema, schema) + } + } + } + } + } + + test("df writes and Insert Into with composite partitioning") { + withTable(table) { + createIcebergTable(table, Seq("years(dt), truncate(data, 3), id")) + + withTempDir { dir => + val deltaPath = new File(dir.getCanonicalPath, "/delta") + testConvertToDelta( + table, + tablePath, + "dt_year int, data_trunc string, id bigint", + Map( + "data >= 'ac'" -> 2, + "data >= 'acg'" -> 0, + "dt = cast('2022-07-01' as date) and data >= 'ac'" -> 1 + ), + Some(deltaPath) + ) + + // for Dataframe, we don't need to explicitly mention partition columns + Seq((4L, + new java.sql.Date(TimeUnit.DAYS.toMillis(10)), + new Timestamp(TimeUnit.DAYS.toMillis(10)), + "bcddddd")) + .toDF("id", "dt", "ts", "data") + .write.format("delta").mode("append").save(deltaPath.getCanonicalPath) + + checkAnswer( + spark.read.format("delta").load(deltaPath.getCanonicalPath).where("id = 4") + .select("id", "data", "dt_year", "data_trunc"), + Row( + 4, + "bcddddd", + // generated partition columns + 1970, "bcd") :: Nil) + + val tempTablePath = dir.getCanonicalPath + "/temp" + Seq(( + 5, + new java.sql.Date(TimeUnit.DAYS.toMillis(20)), + new Timestamp(TimeUnit.DAYS.toMillis(20)), + "c") + ).toDF("id", "dt", "ts", "data") + .write.format("delta").save(tempTablePath) + + val e = intercept[AnalysisException] { + spark.sql( + s""" + | INSERT INTO delta.`$deltaPath` + | SELECT * from delta.`$tempTablePath` + |""".stripMargin) + } + assert(e.getMessage.contains("not enough data columns")) + } + } + } + + test("partition by month") { + withTable(table) { + createIcebergTable(table, Seq("months(ts)")) + testConvertToDelta( + table, + tablePath, + "ts_month string", + Map( + "ts < cast('2021-06-01 00:00:00' as timestamp)" -> 1, + "ts <= cast('2021-06-01 00:00:00' as timestamp)" -> 1, + "ts > cast('2021-06-01 00:00:00' as timestamp)" -> 3, + "ts >= cast('2021-06-01 00:00:00' as timestamp)" -> 3, + "ts < cast('2021-05-01 00:00:00' as timestamp)" -> 0, + "ts > cast('2021-07-01 00:00:00' as timestamp)" -> 2, + "ts = cast('2023-07-30 00:00:00' as timestamp)" -> 1, + "ts > cast('2023-08-01 00:00:00' as timestamp)" -> 0)) + } + + withTable(table) { + createIcebergTable(table, Seq("months(dt)")) + testConvertToDelta( + table, + tablePath, + "dt_month string", + Map( + "dt < cast('2021-06-01' as date)" -> 1, + "dt <= cast('2021-06-01' as date)" -> 1, + "dt > cast('2021-06-01' as date)" -> 3, + "dt >= cast('2021-06-01' as date)" -> 3, + "dt < cast('2021-05-01' as date)" -> 0, + "dt > cast('2021-07-01' as date)" -> 2, + "dt = cast('2023-07-30' as date)" -> 1, + "dt > cast('2023-08-01' as date)" -> 0)) + } + } + + test("partition by hour") { + withTable(table) { + createIcebergTable(table, Seq("hours(ts)")) + testConvertToDelta( + table, + tablePath, + "ts_hour string", + Map( + "ts < cast('2021-06-01 18:00:00' as timestamp)" -> 1, + "ts <= cast('2021-06-01 18:00:00' as timestamp)" -> 1, + "ts > cast('2021-06-01 18:00:00' as timestamp)" -> 3, + "ts >= cast('2021-06-01 18:30:00' as timestamp)" -> 3, + "ts < cast('2021-06-01 17:59:59' as timestamp)" -> 0, + "ts = cast('2021-06-01 18:30:10' as timestamp)" -> 1, + "ts > cast('2022-07-01 20:00:00' as timestamp)" -> 2, + "ts > cast('2023-07-01 02:00:00' as timestamp)" -> 1, + "ts > cast('2023-07-01 04:00:00' as timestamp)" -> 0)) + } + } + + ///////////////////////////////// + // 5-DIGIT-YEAR TIMESTAMP TEST // + ///////////////////////////////// + + /** + * Create an iceberg table with five-digit-year timestamp + */ + private def createFutureIcebergTable( + tableName: String, + partitionColumns: Seq[String]): Unit = { + spark.sql(s"drop table if exists $tableName") + spark.sql( + s"""CREATE TABLE $tableName (id bigint, data string, ts timestamp, dt date) + |USING iceberg PARTITIONED BY (${partitionColumns.mkString(",")})""".stripMargin) + // insert some data + spark.sql( + s""" + |INSERT INTO $tableName + |VALUES (1, 'abc', cast('13168-11-15 18:00:00' as timestamp), + | cast('13168-11-15' as date))""".stripMargin) + spark.sql( + s""" + |INSERT INTO $tableName + |VALUES (2, 'abc', cast('2021-08-24 18:00:00' as timestamp), + | cast('2021-08-24' as date))""".stripMargin) + } + + /** + * Test ConvertToDelta from an iceberg table with five-digit-year timestamp + * We only test for correctness. Make sure we are not silently drop data + */ + private def testFutureConvertToDelta( + tableName: String, + tablePath: String, + partitionSchemaDDL: String, + filters: Seq[String] = Seq(), + deltaDir: Option[File] = None, + policy: String): Unit = withTempDir { dir => + val deltaPath = deltaDir.getOrElse(dir).getCanonicalPath + // for easy testing, convert it at an external location + ConvertToDeltaCommand( + tableIdentifier = TableIdentifier(tablePath, Some("iceberg")), + partitionSchema = None, + Some(deltaPath)).run(spark) + + // no stats collection on new writes too + sql( + s""" + |ALTER TABLE delta.`$deltaPath` + |SET TBLPROPERTIES ( + | '${DeltaConfigs.DATA_SKIPPING_NUM_INDEXED_COLS.key}' = '0')""".stripMargin) + + val expectedPartitionSchema = StructType.fromDDL(partitionSchemaDDL) + // check converted table schema + assert( + spark.read.format("delta").load(deltaPath).schema + // remove the additional annotations in metadata + .map(_.copy(metadata = Metadata.empty)) == + mergeSchema(schema, expectedPartitionSchema)) + + // check partition columns + assert(expectedPartitionSchema.map(_.name) == + DeltaLog.forTable(spark, new Path(deltaPath)).update().metadata.partitionColumns) + + checkAnswer( + // the converted delta table will have partition columns + spark.sql(s"select ${schema.fields.map(_.name).mkString(",")} from delta.`$deltaPath`"), + spark.sql(s"select * from $tableName")) + + // add the same data to both Delta table and iceberg table + // insert into delta + val df1 = spark.sql( + s""" + |SELECT 3L AS id, 'acf' AS data, + | cast('11267-07-15 18:00:00' as timestamp) AS ts, + | cast('11267-07-15' as date) AS dt + |""".stripMargin) + df1.write.format("delta").mode("append").save(deltaPath) + val df2 = spark.sql( + s""" + |SELECT 4L AS id, 'acf' AS data, + | cast('2008-07-15 18:00:00' as timestamp) AS ts, + | cast('2008-07-15' as date) AS dt + |""".stripMargin) + df2.write.format("delta").mode("append").save(deltaPath) + + // insert into iceberg + spark.sql( + s""" + |INSERT INTO $tableName (id, data, ts, dt) + |VALUES (3, 'acf', cast('11267-07-15 18:00:00' as timestamp), + | cast('11267-07-15' as date))""".stripMargin) + spark.sql( + s""" + |INSERT INTO $tableName (id, data, ts, dt) + |VALUES (4, 'acf', cast('2008-07-15 18:00:00' as timestamp), + | cast('2008-07-15' as date))""".stripMargin) + + filters.foreach { filter => + val filterExpr = if (filter == "") "" else s"where $filter" + if (policy == "EXCEPTION" && filterExpr != "" && + partitionSchemaDDL != "ts_year int" && partitionSchemaDDL != "ts_day date") { + val msg = intercept[SparkException] { + spark.sql(s"select * from delta.`$deltaPath` $filterExpr").collect() + }.getMessage + assert(msg.contains("spark.sql.legacy.timeParserPolicy")) + } else { + // check results of iceberg == delta + checkAnswer( + // the converted delta table will have partition columns + spark.sql(s"select ${schema.fields.map(_.name).mkString(",")} from delta.`$deltaPath`"), + spark.sql(s"select * from $tableName")) + } + } + } + + Seq("EXCEPTION", "CORRECTED", "LEGACY").foreach { policy => + test(s"future timestamp: partition by month when timeParserPolicy is: $policy") { + withSQLConf("spark.sql.legacy.timeParserPolicy" -> policy) { + withTable(table) { + createFutureIcebergTable(table, Seq("months(ts)")) + testFutureConvertToDelta( + table, + tablePath, + "ts_month string", + Seq("", + "ts > cast('2021-06-01 00:00:00' as timestamp)", + "ts < cast('12000-06-01 00:00:00' as timestamp)", + "ts >= cast('13000-06-01 00:00:00' as timestamp)", + "ts <= cast('2009-06-01 00:00:00' as timestamp)", + "ts = cast('11267-07-15 00:00:00' as timestamp)"), + policy = policy) + } + } + } + } + + Seq("EXCEPTION", "CORRECTED", "LEGACY").foreach { policy => + test(s"future timestamp: partition by hour when timeParserPolicy is: $policy") { + withSQLConf("spark.sql.legacy.timeParserPolicy" -> policy) { + withTable(table) { + createFutureIcebergTable(table, Seq("hours(ts)")) + testFutureConvertToDelta( + table, + tablePath, + "ts_hour string", + Seq("", + "ts > cast('2021-06-01 18:00:00' as timestamp)", + "ts < cast('12000-06-01 18:00:00' as timestamp)", + "ts >= cast('13000-06-01 19:00:00' as timestamp)", + "ts <= cast('2009-06-01 16:00:00' as timestamp)", + "ts = cast('11267-07-15 18:30:00' as timestamp)"), + policy = policy) + } + } + } + } + + Seq("EXCEPTION", "CORRECTED", "LEGACY").foreach { policy => + test(s"future timestamp: partition by year when timeParserPolicy is: $policy") { + withSQLConf("spark.sql.legacy.timeParserPolicy" -> policy) { + withTable(table) { + createFutureIcebergTable(table, Seq("years(ts)")) + testFutureConvertToDelta( + table, + tablePath, + "ts_year int", + Seq("", + "ts > cast('2021-06-01 18:00:00' as timestamp)", + "ts < cast('12000-06-01 18:00:00' as timestamp)", + "ts >= cast('13000-06-01 19:00:00' as timestamp)", + "ts <= cast('2009-06-01 16:00:00' as timestamp)", + "ts = cast('11267-07-15 18:30:00' as timestamp)"), + policy = policy) + } + } + } + } + + Seq("EXCEPTION", "CORRECTED", "LEGACY").foreach { policy => + test(s"future timestamp: partition by day when timeParserPolicy is: $policy") { + withSQLConf("spark.sql.legacy.timeParserPolicy" -> policy) { + withTable(table) { + createFutureIcebergTable(table, Seq("days(ts)")) + testFutureConvertToDelta( + table, + tablePath, + "ts_day date", + Seq("", + "ts > cast('2021-06-01 18:00:00' as timestamp)", + "ts < cast('12000-06-01 18:00:00' as timestamp)", + "ts >= cast('13000-06-01 19:00:00' as timestamp)", + "ts <= cast('2009-06-01 16:00:00' as timestamp)", + "ts = cast('11267-07-15 18:30:00' as timestamp)"), + policy = policy) + } + } + } + } +} From da2aa6ed86ecd18a6fea81abaf106b2aaef00685 Mon Sep 17 00:00:00 2001 From: jackierwzhang Date: Mon, 31 Oct 2022 12:15:12 -0700 Subject: [PATCH 2/9] cmt --- .../resources/error/delta-error-classes.json | 2 +- .../apache/spark/sql/delta/DeltaErrors.scala | 3 +-- .../delta/commands/ConvertToDeltaCommand.scala | 18 ++++++++++++------ 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/core/src/main/resources/error/delta-error-classes.json b/core/src/main/resources/error/delta-error-classes.json index 27fcf087e16..36b2d307a9c 100644 --- a/core/src/main/resources/error/delta-error-classes.json +++ b/core/src/main/resources/error/delta-error-classes.json @@ -1204,7 +1204,7 @@ }, "DELTA_PARTITION_SCHEMA_IN_ICEBERG_TABLES" : { "message" : [ - "Partition schema cannot be specified when converting Iceberg tables" + "Partition schema cannot be specified when converting Iceberg tables. It is automatically inferred." ], "sqlState" : "42000" }, diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index c560beca557..b5d384cad76 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -2454,8 +2454,7 @@ trait DeltaErrorsBase } def partitionSchemaInIcebergTables: Throwable = { - new DeltaIllegalArgumentException( - errorClass = "DELTA_PARTITION_SCHEMA_IN_ICEBERG_TABLES", messageParameters = null) + new DeltaIllegalArgumentException(errorClass = "DELTA_PARTITION_SCHEMA_IN_ICEBERG_TABLES") } } diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala index 6f19746e227..85117fd36ea 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala @@ -75,8 +75,14 @@ abstract class ConvertToDeltaCommandBase( protected lazy val icebergEnabled: Boolean = conf.getConf(DeltaSQLConf.DELTA_CONVERT_ICEBERG_ENABLED) - protected def isSupportedProvider(lowerCaseProvider: String): Boolean = { - lowerCaseProvider == "parquet" || (icebergEnabled && lowerCaseProvider == "iceberg") + protected def isParquetPathProvider(provider: String): Boolean = + provider.equalsIgnoreCase("provider") + + protected def isIcebergPathProvider(provider: String): Boolean = + icebergEnabled && provider.equalsIgnoreCase("iceberg") + + protected def isSupportedPathTableProvider(provider: String): Boolean = { + isParquetPathProvider(provider) || isIcebergPathProvider(provider) } override def run(spark: SparkSession): Seq[Row] = { @@ -198,7 +204,8 @@ abstract class ConvertToDeltaCommandBase( override def isPathIdentifier(tableIdent: TableIdentifier): Boolean = { val provider = tableIdent.database.getOrElse("") // If db doesnt exist or db is called delta/tahoe then check if path exists - (DeltaSourceUtils.isDeltaDataSourceName(provider) || isSupportedProvider(provider)) && + (DeltaSourceUtils.isDeltaDataSourceName(provider) || + isSupportedPathTableProvider(provider)) && new Path(tableIdent.table).isAbsolute } @@ -288,11 +295,10 @@ abstract class ConvertToDeltaCommandBase( case Some(providerName) => providerName.toLowerCase(Locale.ROOT) match { case checkProvider if target.catalogTable.exists(ConvertToDeltaCommand.isHiveStyleParquetTable) || - checkProvider.equalsIgnoreCase("parquet") => + isParquetPathProvider(checkProvider) => ConvertToDeltaCommand.getParquetTable( spark, target.targetDir, target.catalogTable, partitionSchema) - case checkProvider - if icebergEnabled && checkProvider.equalsIgnoreCase("iceberg") => + case checkProvider if isIcebergPathProvider(checkProvider) => if (partitionSchema.isDefined) { throw DeltaErrors.partitionSchemaInIcebergTables } From fab36a395196039bf8dc9b3387a81839e8b429f4 Mon Sep 17 00:00:00 2001 From: jackierwzhang Date: Mon, 31 Oct 2022 14:56:12 -0700 Subject: [PATCH 3/9] fix --- .../apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala index 85117fd36ea..21a64f110bb 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala @@ -76,7 +76,7 @@ abstract class ConvertToDeltaCommandBase( conf.getConf(DeltaSQLConf.DELTA_CONVERT_ICEBERG_ENABLED) protected def isParquetPathProvider(provider: String): Boolean = - provider.equalsIgnoreCase("provider") + provider.equalsIgnoreCase("parquet") protected def isIcebergPathProvider(provider: String): Boolean = icebergEnabled && provider.equalsIgnoreCase("iceberg") From 69caa2e6617aa802019eb3bda2314d538e892745 Mon Sep 17 00:00:00 2001 From: jackierwzhang Date: Mon, 31 Oct 2022 17:29:46 -0700 Subject: [PATCH 4/9] retrigger test From fee04dca4a24d22a7b872aa0e44b4b15107db0b0 Mon Sep 17 00:00:00 2001 From: jackierwzhang Date: Tue, 1 Nov 2022 12:21:15 -0700 Subject: [PATCH 5/9] fix --- .../resources/error/delta-error-classes.json | 6 +++++ .../apache/spark/sql/delta/DeltaErrors.scala | 11 +++++++- .../commands/ConvertToDeltaCommand.scala | 14 ++++++---- .../delta/ConvertIcebergToDeltaSuite.scala | 27 +++++++++++++++++++ 4 files changed, 52 insertions(+), 6 deletions(-) diff --git a/core/src/main/resources/error/delta-error-classes.json b/core/src/main/resources/error/delta-error-classes.json index 36b2d307a9c..c656fc6bacb 100644 --- a/core/src/main/resources/error/delta-error-classes.json +++ b/core/src/main/resources/error/delta-error-classes.json @@ -946,6 +946,12 @@ ], "sqlState" : "42000" }, + "DELTA_MISSING_ICEBERG_CLASS" : { + "message" : [ + "Iceberg class path is not found. Please ensure Delta Iceberg support is installed.", + "Pleas refer to for more details." + ] + }, "DELTA_MISSING_NOT_NULL_COLUMN_VALUE" : { "message" : [ "Column , which has a NOT NULL constraint, is missing from the data being written into the table." diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index b5d384cad76..8ef96e2c19d 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -98,7 +98,8 @@ trait DocsPath { "concurrentModificationExceptionMsg", "incorrectLogStoreImplementationException", "sourceNotDeterministicInMergeException", - "columnMappingAdviceMessage" + "columnMappingAdviceMessage", + "icebergClassMissing" ) } @@ -2456,6 +2457,14 @@ trait DeltaErrorsBase def partitionSchemaInIcebergTables: Throwable = { new DeltaIllegalArgumentException(errorClass = "DELTA_PARTITION_SCHEMA_IN_ICEBERG_TABLES") } + + def icebergClassMissing(sparkConf: SparkConf, cause: Throwable): Throwable = { + new DeltaIllegalStateException( + errorClass = "DELTA_MISSING_ICEBERG_CLASS", + messageParameters = Array( + generateDocsLink(sparkConf, "delta-utility.html#convert-a-parquet-table-to-a-delta-table")), + cause = cause) + } } object DeltaErrors extends DeltaErrorsBase diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala index 21a64f110bb..c62b7d5f927 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala @@ -747,7 +747,12 @@ class MetadataLogFileManifest( } trait ConvertToDeltaCommandUtils extends DeltaLogging { + val timestampPartitionPattern = "yyyy-MM-dd HH:mm:ss[.S]" + + var icebergSparkTableClassPath = "org.apache.spark.sql.delta.IcebergTable" + var icebergLibTableClassPath = "org.apache.iceberg.Table" + def createAddFile( targetFile: ConvertTargetFile, basePath: Path, @@ -869,11 +874,11 @@ trait ConvertToDeltaCommandUtils extends DeltaLogging { sparkTable: Option[Table], tableSchema: Option[StructType]): ConvertTargetTable = { try { - val clazz = Utils.classForName("org.apache.spark.sql.delta.IcebergTable") + val clazz = Utils.classForName(icebergSparkTableClassPath) if (sparkTable.isDefined) { val constFromTable = clazz.getConstructor( classOf[SparkSession], - Utils.classForName("org.apache.iceberg.Table"), + Utils.classForName(icebergLibTableClassPath), classOf[Option[StructType]]) val method = sparkTable.get.getClass.getMethod("table") constFromTable.newInstance(spark, method.invoke(sparkTable.get), tableSchema) @@ -884,10 +889,9 @@ trait ConvertToDeltaCommandUtils extends DeltaLogging { constFromPath.newInstance(spark, baseDir, tableSchema) } } catch { - case e: InvocationTargetException => + case e @ (_: InvocationTargetException | _: ClassNotFoundException) => logError(s"Got error when creating an Iceberg Converter", e) - // Unwrap better error messages - throw e.getCause + throw DeltaErrors.icebergClassMissing(spark.sparkContext.getConf, e) } } } diff --git a/delta-iceberg-compat/src/test/scala/org/apache/spark/sql/delta/ConvertIcebergToDeltaSuite.scala b/delta-iceberg-compat/src/test/scala/org/apache/spark/sql/delta/ConvertIcebergToDeltaSuite.scala index e3881a8c2e2..5756f5f63c3 100644 --- a/delta-iceberg-compat/src/test/scala/org/apache/spark/sql/delta/ConvertIcebergToDeltaSuite.scala +++ b/delta-iceberg-compat/src/test/scala/org/apache/spark/sql/delta/ConvertIcebergToDeltaSuite.scala @@ -135,6 +135,33 @@ trait ConvertIcebergToDeltaSuiteBase } + test("missing iceberg library should throw a sensical error") { + val validIcebergSparkTableClassPath = ConvertToDeltaCommand.icebergSparkTableClassPath + val validIcebergLibTableClassPath = ConvertToDeltaCommand.icebergLibTableClassPath + + Seq( + () => { + ConvertToDeltaCommand.icebergSparkTableClassPath = validIcebergSparkTableClassPath + "2" + }).foreach { makeInvalid => + try { + makeInvalid() + withTable(table) { + spark.sql( + s"""CREATE TABLE $table (`1 id` bigint, 2data string) + |USING iceberg PARTITIONED BY (2data)""".stripMargin) + spark.sql(s"INSERT INTO $table VALUES (1, 'a'), (2, 'b'), (3, 'c')") + val e = intercept[DeltaIllegalStateException] { + convert(s"iceberg.`$tablePath`") + } + assert(e.getErrorClass == "DELTA_MISSING_ICEBERG_CLASS") + } + } finally { + ConvertToDeltaCommand.icebergSparkTableClassPath = validIcebergSparkTableClassPath + ConvertToDeltaCommand.icebergLibTableClassPath = validIcebergLibTableClassPath + } + } + } + test("non-parquet table") { withTable(table) { spark.sql( From 4a4f7561113cb650a17bdcde13bb949346f677cb Mon Sep 17 00:00:00 2001 From: jackierwzhang Date: Tue, 1 Nov 2022 13:22:57 -0700 Subject: [PATCH 6/9] fix --- .../spark/sql/delta/commands/ConvertToDeltaCommand.scala | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala b/core/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala index c62b7d5f927..a8fb00e05fe 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/commands/ConvertToDeltaCommand.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema.SchemaMergingUtils import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf} import org.apache.spark.sql.delta.util._ +import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession} @@ -889,9 +890,13 @@ trait ConvertToDeltaCommandUtils extends DeltaLogging { constFromPath.newInstance(spark, baseDir, tableSchema) } } catch { - case e @ (_: InvocationTargetException | _: ClassNotFoundException) => - logError(s"Got error when creating an Iceberg Converter", e) + case e: ClassNotFoundException => + logError(s"Failed to find Iceberg class", e) throw DeltaErrors.icebergClassMissing(spark.sparkContext.getConf, e) + case e: InvocationTargetException => + logError(s"Got error when creating an Iceberg Converter", e) + // The better error is within the cause + throw ExceptionUtils.getRootCause(e) } } } From 4f96c10a400133cf9686fc0c4b920a6b69a21030 Mon Sep 17 00:00:00 2001 From: jackierwzhang Date: Tue, 1 Nov 2022 15:13:47 -0700 Subject: [PATCH 7/9] fix fix --- core/src/main/resources/error/delta-error-classes.json | 4 ++-- .../scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/resources/error/delta-error-classes.json b/core/src/main/resources/error/delta-error-classes.json index c656fc6bacb..32375429b66 100644 --- a/core/src/main/resources/error/delta-error-classes.json +++ b/core/src/main/resources/error/delta-error-classes.json @@ -948,8 +948,8 @@ }, "DELTA_MISSING_ICEBERG_CLASS" : { "message" : [ - "Iceberg class path is not found. Please ensure Delta Iceberg support is installed.", - "Pleas refer to for more details." + "Iceberg class was not found. Please ensure Delta Iceberg support is installed.", + "Please refer to for more details." ] }, "DELTA_MISSING_NOT_NULL_COLUMN_VALUE" : { diff --git a/core/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala b/core/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala index d9a8364a9c4..d814c3b1682 100644 --- a/core/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/delta/DeltaErrorsSuite.scala @@ -82,7 +82,8 @@ trait DeltaErrorsSuiteBase "sourceNotDeterministicInMergeException" -> DeltaErrors.sourceNotDeterministicInMergeException(spark), "columnMappingAdviceMessage" -> - DeltaErrors.columnRenameNotSupported + DeltaErrors.columnRenameNotSupported, + "icebergClassMissing" -> DeltaErrors.icebergClassMissing(sparkConf, new Throwable()) ) def otherMessagesToTest: Map[String, String] = Map( From 896b2293917b052e3186bfc0718778f9fddf3782 Mon Sep 17 00:00:00 2001 From: jackierwzhang Date: Tue, 1 Nov 2022 16:34:15 -0700 Subject: [PATCH 8/9] fix --- .../main/scala/org/apache/spark/sql/delta/DeltaErrors.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala b/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala index 8ef96e2c19d..0684a0c8d02 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala @@ -2462,7 +2462,8 @@ trait DeltaErrorsBase new DeltaIllegalStateException( errorClass = "DELTA_MISSING_ICEBERG_CLASS", messageParameters = Array( - generateDocsLink(sparkConf, "delta-utility.html#convert-a-parquet-table-to-a-delta-table")), + generateDocsLink( + sparkConf, "/delta-utility.html#convert-a-parquet-table-to-a-delta-table")), cause = cause) } } From b14bfa085c7bc81e1aa4bf29613fbd92ea64420b Mon Sep 17 00:00:00 2001 From: jackierwzhang Date: Wed, 2 Nov 2022 12:00:37 -0700 Subject: [PATCH 9/9] fix dep name --- build.sbt | 4 ++-- .../org/apache/iceberg/transforms/IcebergPartitionUtil.scala | 0 .../sql/catalyst/analysis/NoSuchProcedureException.scala | 0 .../scala/org/apache/spark/sql/delta/IcebergSchemaUtils.scala | 0 .../main/scala/org/apache/spark/sql/delta/IcebergTable.scala | 0 .../services/org.apache.spark.sql.sources.DataSourceRegister | 0 .../apache/spark/sql/delta/ConvertIcebergToDeltaSuite.scala | 0 7 files changed, 2 insertions(+), 2 deletions(-) rename {delta-iceberg-compat => delta-iceberg}/src/main/scala/org/apache/iceberg/transforms/IcebergPartitionUtil.scala (100%) rename {delta-iceberg-compat => delta-iceberg}/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchProcedureException.scala (100%) rename {delta-iceberg-compat => delta-iceberg}/src/main/scala/org/apache/spark/sql/delta/IcebergSchemaUtils.scala (100%) rename {delta-iceberg-compat => delta-iceberg}/src/main/scala/org/apache/spark/sql/delta/IcebergTable.scala (100%) rename {delta-iceberg-compat => delta-iceberg}/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister (100%) rename {delta-iceberg-compat => delta-iceberg}/src/test/scala/org/apache/spark/sql/delta/ConvertIcebergToDeltaSuite.scala (100%) diff --git a/build.sbt b/build.sbt index d96f17df87e..b797590a0f5 100644 --- a/build.sbt +++ b/build.sbt @@ -194,10 +194,10 @@ lazy val storageS3DynamoDB = (project in file("storage-s3-dynamodb")) ) ) -lazy val deltaIcebergCompat = (project in file("delta-iceberg-compat")) +lazy val deltaIceberg = (project in file("delta-iceberg")) .dependsOn(core % "compile->compile;test->test;provided->provided") .settings ( - name := "delta-iceberg-compat", + name := "delta-iceberg", commonSettings, scalaStyleSettings, releaseSettings, diff --git a/delta-iceberg-compat/src/main/scala/org/apache/iceberg/transforms/IcebergPartitionUtil.scala b/delta-iceberg/src/main/scala/org/apache/iceberg/transforms/IcebergPartitionUtil.scala similarity index 100% rename from delta-iceberg-compat/src/main/scala/org/apache/iceberg/transforms/IcebergPartitionUtil.scala rename to delta-iceberg/src/main/scala/org/apache/iceberg/transforms/IcebergPartitionUtil.scala diff --git a/delta-iceberg-compat/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchProcedureException.scala b/delta-iceberg/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchProcedureException.scala similarity index 100% rename from delta-iceberg-compat/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchProcedureException.scala rename to delta-iceberg/src/main/scala/org/apache/spark/sql/catalyst/analysis/NoSuchProcedureException.scala diff --git a/delta-iceberg-compat/src/main/scala/org/apache/spark/sql/delta/IcebergSchemaUtils.scala b/delta-iceberg/src/main/scala/org/apache/spark/sql/delta/IcebergSchemaUtils.scala similarity index 100% rename from delta-iceberg-compat/src/main/scala/org/apache/spark/sql/delta/IcebergSchemaUtils.scala rename to delta-iceberg/src/main/scala/org/apache/spark/sql/delta/IcebergSchemaUtils.scala diff --git a/delta-iceberg-compat/src/main/scala/org/apache/spark/sql/delta/IcebergTable.scala b/delta-iceberg/src/main/scala/org/apache/spark/sql/delta/IcebergTable.scala similarity index 100% rename from delta-iceberg-compat/src/main/scala/org/apache/spark/sql/delta/IcebergTable.scala rename to delta-iceberg/src/main/scala/org/apache/spark/sql/delta/IcebergTable.scala diff --git a/delta-iceberg-compat/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/delta-iceberg/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister similarity index 100% rename from delta-iceberg-compat/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister rename to delta-iceberg/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister diff --git a/delta-iceberg-compat/src/test/scala/org/apache/spark/sql/delta/ConvertIcebergToDeltaSuite.scala b/delta-iceberg/src/test/scala/org/apache/spark/sql/delta/ConvertIcebergToDeltaSuite.scala similarity index 100% rename from delta-iceberg-compat/src/test/scala/org/apache/spark/sql/delta/ConvertIcebergToDeltaSuite.scala rename to delta-iceberg/src/test/scala/org/apache/spark/sql/delta/ConvertIcebergToDeltaSuite.scala