Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support converting Iceberg for CONVERT TO DELTA command in Delta Lake #1463

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,21 @@ lazy val storageS3DynamoDB = (project in file("storage-s3-dynamodb"))
)
)

lazy val deltaIceberg = (project in file("delta-iceberg"))
.dependsOn(core % "compile->compile;test->test;provided->provided")
.settings (
name := "delta-iceberg",
commonSettings,
scalaStyleSettings,
releaseSettings,
libraryDependencies ++= Seq( {
val (expMaj, expMin, _) = getMajorMinorPatch(sparkVersion)
("org.apache.iceberg" % s"iceberg-spark-runtime-$expMaj.$expMin" % "1.0.0" % "provided")
.cross(CrossVersion.binary)
}
)
)

/**
* Get list of python files and return the mapping between source files and target paths
* in the generated package JAR.
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,12 @@
],
"sqlState" : "42000"
},
"DELTA_MISSING_ICEBERG_CLASS" : {
"message" : [
"Iceberg class was not found. Please ensure Delta Iceberg support is installed.",
"Please refer to <docLink> for more details."
]
},
"DELTA_MISSING_NOT_NULL_COLUMN_VALUE" : {
"message" : [
"Column <columnName>, which has a NOT NULL constraint, is missing from the data being written into the table."
Expand Down Expand Up @@ -1202,6 +1208,12 @@
],
"sqlState" : "42000"
},
"DELTA_PARTITION_SCHEMA_IN_ICEBERG_TABLES" : {
"message" : [
"Partition schema cannot be specified when converting Iceberg tables. It is automatically inferred."
],
"sqlState" : "42000"
},
"DELTA_PATH_DOES_NOT_EXIST" : {
"message" : [
"<path> doesn't exist"
Expand Down
16 changes: 15 additions & 1 deletion core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ trait DocsPath {
"concurrentModificationExceptionMsg",
"incorrectLogStoreImplementationException",
"sourceNotDeterministicInMergeException",
"columnMappingAdviceMessage"
"columnMappingAdviceMessage",
"icebergClassMissing"
)
}

Expand Down Expand Up @@ -2452,6 +2453,19 @@ trait DeltaErrorsBase
unsupportedOptions.mkString(","))
)
}

def partitionSchemaInIcebergTables: Throwable = {
new DeltaIllegalArgumentException(errorClass = "DELTA_PARTITION_SCHEMA_IN_ICEBERG_TABLES")
}

def icebergClassMissing(sparkConf: SparkConf, cause: Throwable): Throwable = {
new DeltaIllegalStateException(
errorClass = "DELTA_MISSING_ICEBERG_CLASS",
messageParameters = Array(
generateDocsLink(
sparkConf, "/delta-utility.html#convert-a-parquet-table-to-a-delta-table")),
cause = cause)
}
}

object DeltaErrors extends DeltaErrorsBase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.sql.delta.commands

// scalastyle:off import.ordering.noEmptyLine
import java.io.Closeable
import java.lang.reflect.InvocationTargetException
import java.util.Locale

import scala.collection.JavaConverters._
Expand All @@ -29,22 +30,22 @@ import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import org.apache.spark.sql.delta.sources.{DeltaSourceUtils, DeltaSQLConf}
import org.apache.spark.sql.delta.util._
import org.apache.hadoop.conf.Configuration
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}

import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{Analyzer, NoSuchTableException}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog, V1Table}
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog, V1Table}
import org.apache.spark.sql.execution.command.LeafRunnableCommand
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetToSparkSchemaConverter}
import org.apache.spark.sql.execution.streaming.{FileStreamSink, MetadataLogFileIndex}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.util.SerializableConfiguration
import org.apache.spark.util.{SerializableConfiguration, Utils}

/**
* Convert an existing parquet table to a delta table by creating delta logs based on
Expand Down Expand Up @@ -72,8 +73,17 @@ abstract class ConvertToDeltaCommandBase(
partitionSchema: Option[StructType],
deltaPath: Option[String]) extends LeafRunnableCommand with DeltaCommand {

protected def isSupportedProvider(lowerCaseProvider: String): Boolean = {
lowerCaseProvider == "parquet"
protected lazy val icebergEnabled: Boolean =
conf.getConf(DeltaSQLConf.DELTA_CONVERT_ICEBERG_ENABLED)

protected def isParquetPathProvider(provider: String): Boolean =
provider.equalsIgnoreCase("parquet")

protected def isIcebergPathProvider(provider: String): Boolean =
icebergEnabled && provider.equalsIgnoreCase("iceberg")

protected def isSupportedPathTableProvider(provider: String): Boolean = {
isParquetPathProvider(provider) || isIcebergPathProvider(provider)
}

override def run(spark: SparkSession): Seq[Row] = {
Expand Down Expand Up @@ -195,7 +205,8 @@ abstract class ConvertToDeltaCommandBase(
override def isPathIdentifier(tableIdent: TableIdentifier): Boolean = {
val provider = tableIdent.database.getOrElse("")
// If db doesnt exist or db is called delta/tahoe then check if path exists
(DeltaSourceUtils.isDeltaDataSourceName(provider) || isSupportedProvider(provider)) &&
(DeltaSourceUtils.isDeltaDataSourceName(provider) ||
isSupportedPathTableProvider(provider)) &&
new Path(tableIdent.table).isAbsolute
}

Expand Down Expand Up @@ -280,19 +291,21 @@ abstract class ConvertToDeltaCommandBase(
}

/** Get the instance of the convert target table, which provides file manifest and schema */
protected def getTargetTable(
spark: SparkSession,
target: ConvertTarget): ConvertTargetTable = {
val qualifiedDir =
ConvertToDeltaCommand.getQualifiedPath(spark, new Path(target.targetDir)).toString
protected def getTargetTable(spark: SparkSession, target: ConvertTarget): ConvertTargetTable = {
target.provider match {
case Some(providerName) => providerName.toLowerCase(Locale.ROOT) match {
case _ if target.catalogTable.exists(ConvertToDeltaCommand.isHiveStyleParquetTable) =>
new ParquetTable(spark, qualifiedDir, target.catalogTable, partitionSchema)
case checkProvider if checkProvider.equalsIgnoreCase("parquet") =>
new ParquetTable(spark, qualifiedDir, target.catalogTable, partitionSchema)
case checkProvider =>
throw DeltaErrors.convertNonParquetTablesException(tableIdentifier, checkProvider)
case checkProvider
if target.catalogTable.exists(ConvertToDeltaCommand.isHiveStyleParquetTable) ||
isParquetPathProvider(checkProvider) =>
ConvertToDeltaCommand.getParquetTable(
spark, target.targetDir, target.catalogTable, partitionSchema)
case checkProvider if isIcebergPathProvider(checkProvider) =>
if (partitionSchema.isDefined) {
throw DeltaErrors.partitionSchemaInIcebergTables
}
ConvertToDeltaCommand.getIcebergTable(spark, target.targetDir, None, None)
case other =>
throw DeltaErrors.convertNonParquetTablesException(tableIdentifier, other)
}
case None =>
throw DeltaErrors.missingProviderForConvertException(target.targetDir)
Expand Down Expand Up @@ -442,10 +455,10 @@ trait ConvertTargetTable {
}

class ParquetTable(
spark: SparkSession,
basePath: String,
catalogTable: Option[CatalogTable],
userPartitionSchema: Option[StructType]) extends ConvertTargetTable with DeltaLogging {
val spark: SparkSession,
val basePath: String,
val catalogTable: Option[CatalogTable],
val userPartitionSchema: Option[StructType]) extends ConvertTargetTable with DeltaLogging {
// Validate user provided partition schema if catalogTable is available.
if (catalogTable.isDefined && userPartitionSchema.isDefined
&& !catalogTable.get.partitionSchema.equals(userPartitionSchema.get)) {
Expand Down Expand Up @@ -734,8 +747,13 @@ class MetadataLogFileManifest(
override def close(): Unit = allFiles.unpersist()
}

object ConvertToDeltaCommand {
trait ConvertToDeltaCommandUtils extends DeltaLogging {

val timestampPartitionPattern = "yyyy-MM-dd HH:mm:ss[.S]"

var icebergSparkTableClassPath = "org.apache.spark.sql.delta.IcebergTable"
var icebergLibTableClassPath = "org.apache.iceberg.Table"

def createAddFile(
targetFile: ConvertTargetFile,
basePath: Path,
Expand Down Expand Up @@ -841,4 +859,46 @@ object ConvertToDeltaCommand {
// Allow partition column name starting with underscore and dot
DeltaFileOperations.defaultHiddenFileFilter(fileName) && !fileName.contains("=")
}

def getParquetTable(
spark: SparkSession,
targetDir: String,
catalogTable: Option[CatalogTable],
partitionSchema: Option[StructType]): ConvertTargetTable = {
val qualifiedDir = ConvertToDeltaCommand.getQualifiedPath(spark, new Path(targetDir)).toString
new ParquetTable(spark, qualifiedDir, catalogTable, partitionSchema)
}

def getIcebergTable(
spark: SparkSession,
targetDir: String,
sparkTable: Option[Table],
tableSchema: Option[StructType]): ConvertTargetTable = {
try {
val clazz = Utils.classForName(icebergSparkTableClassPath)
if (sparkTable.isDefined) {
val constFromTable = clazz.getConstructor(
jackierwzhang marked this conversation as resolved.
Show resolved Hide resolved
classOf[SparkSession],
Utils.classForName(icebergLibTableClassPath),
classOf[Option[StructType]])
val method = sparkTable.get.getClass.getMethod("table")
constFromTable.newInstance(spark, method.invoke(sparkTable.get), tableSchema)
} else {
val baseDir = ConvertToDeltaCommand.getQualifiedPath(spark, new Path(targetDir)).toString
val constFromPath = clazz.getConstructor(
classOf[SparkSession], classOf[String], classOf[Option[StructType]])
constFromPath.newInstance(spark, baseDir, tableSchema)
}
} catch {
case e: ClassNotFoundException =>
logError(s"Failed to find Iceberg class", e)
throw DeltaErrors.icebergClassMissing(spark.sparkContext.getConf, e)
case e: InvocationTargetException =>
logError(s"Got error when creating an Iceberg Converter", e)
// The better error is within the cause
throw ExceptionUtils.getRootCause(e)
}
}
}

object ConvertToDeltaCommand extends ConvertToDeltaCommandUtils
Original file line number Diff line number Diff line change
Expand Up @@ -741,6 +741,19 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(true)

val DELTA_CONVERT_ICEBERG_ENABLED =
buildConf("convert.iceberg.enabled")
.internal()
.doc("If enabled, Iceberg tables can be converted into a Delta table.")
.booleanConf
.createWithDefault(true)

val DELTA_CONVERT_ICEBERG_PARTITION_EVOLUTION_ENABLED =
buildConf("convert.iceberg.partitionEvolution.enabled")
.doc("If enabled, support conversion of iceberg tables experienced partition evolution.")
.booleanConf
.createWithDefault(false)

val DELTA_OPTIMIZE_MIN_FILE_SIZE =
buildConf("optimize.minFileSize")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ trait DeltaErrorsSuiteBase
"sourceNotDeterministicInMergeException" ->
DeltaErrors.sourceNotDeterministicInMergeException(spark),
"columnMappingAdviceMessage" ->
DeltaErrors.columnRenameNotSupported
DeltaErrors.columnRenameNotSupported,
"icebergClassMissing" -> DeltaErrors.icebergClassMissing(sparkConf, new Throwable())
)

def otherMessagesToTest: Map[String, String] = Map(
Expand Down
Loading