Skip to content

Commit

Permalink
Support SHALLOW CLONVERT Iceberg tables for Delta Lake
Browse files Browse the repository at this point in the history
As a followup to the SHALLOW CLONE [support](#1505) for Delta Lake, it would be great if we could enable SHALLOW CLONE on an Iceberg table as well. This will be a CLONVERT (CLONE + CONVERT) operation, in which we will create a Delta catalog table with files pointing to the original Iceberg table in one transaction.

1. It allows users to quickly experiment with Delta Lake without modifying the original Iceberg table's data.
2. It simplifies the user flow by combining a Delta catalog table creation with an Iceberg conversion.

Similar to SHALLOW CLONE, it will work as follows:

1. Clone a Iceberg catalog table (after the setup [here](https://iceberg.apache.org/docs/latest/getting-started/))

```
CREATE TABLE [IF NOT EXISTS] delta SHALLOW CLONE iceberg.db.table [TBLPROPERTIES clause] [LOCATION path]
```

2. Clone a path-based Iceberg table

```
CREATE TABLE [IF NOT EXISTS] delta SHALLOW CLONE iceberg.`/path/to/iceberg/table`[TBLPROPERTIES clause] [LOCATION path]
```

Closes #1522

New unit tests.

No.

GitOrigin-RevId: e01994e037cf44e06f4ef3d6f185f5925dd77e48
  • Loading branch information
mingdai-db authored and allisonport-db committed Dec 15, 2022
1 parent 803d149 commit ff805d0
Show file tree
Hide file tree
Showing 11 changed files with 542 additions and 23 deletions.
2 changes: 1 addition & 1 deletion core/src/main/antlr4/io/delta/sql/parser/DeltaSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ temporalClause
;

qualifiedName
: identifier ('.' identifier)*
: identifier ('.' identifier)* ('.' identifier)*
;

propertyList
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,11 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
}

// Get source for clone (and time travel source if necessary)
val sourceRelation = UnresolvedRelation(visitTableIdentifier(ctx.source))
// The source relation can be an Iceberg table in form of `catalog.db.table` so we visit
// a multipart identifier instead of TableIdentifier (which does not support 3L namespace)
// in Spark 3.3. In Spark 3.4 we should have TableIdentifier supporting 3L namespace so we
// could revert back to that.
val sourceRelation = new UnresolvedRelation(visitMultipartIdentifier(ctx.source))
val maybeTimeTravelSource = maybeTimeTravelChild(ctx.clause, sourceRelation)
val targetRelation = UnresolvedRelation(target)

Expand Down Expand Up @@ -406,6 +410,10 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
}
}

protected def visitMultipartIdentifier(ctx: QualifiedNameContext): Seq[String] = withOrigin(ctx) {
ctx.identifier.asScala.map(_.getText).toSeq
}

override def visitPassThrough(ctx: PassThroughContext): LogicalPlan = null

override def visitColTypeList(ctx: ColTypeListContext): Seq[StructField] = withOrigin(ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@
package org.apache.spark.sql.delta

import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.catalyst.TimeTravel
import org.apache.spark.sql.delta.DeltaErrors.{TemporallyUnstableInputException, TimestampEarlierThanCommitRetentionException}
import org.apache.spark.sql.delta.catalog.DeltaTableV2
import org.apache.spark.sql.delta.catalog.IcebergTablePlaceHolder
import org.apache.spark.sql.delta.commands._
import org.apache.spark.sql.delta.commands.cdc.CDCReader
import org.apache.spark.sql.delta.constraints.{AddConstraint, DropConstraint}
Expand All @@ -38,6 +40,8 @@ import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.analysis.UnresolvedTableValuedFunction
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, HiveTableRelation}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.CloneTableStatement
import org.apache.spark.sql.catalyst.plans.logical.RestoreTableStatement
Expand Down Expand Up @@ -127,6 +131,29 @@ class DeltaAnalysis(session: SparkSession)
resolveCloneCommand(
cloneStatement.target, new CloneDeltaSource(traveledTable), cloneStatement)

case DataSourceV2Relation(table: IcebergTablePlaceHolder, _, _, _, _) =>
resolveCloneCommand(
cloneStatement.target,
CloneIcebergSource(
table.tableIdentifier, sparkTable = None, tableSchema = None, session),
cloneStatement)

case DataSourceV2Relation(table, _, _, _, _)
if table.getClass.getName.endsWith("org.apache.iceberg.spark.source.SparkTable") =>
val tableIdent = Try {
CatalystSqlParser.parseTableIdentifier(table.name())
} match {
case Success(ident) => ident
case Failure(_: ParseException) =>
// Fallback to 2-level identifier to make compatible with older Apache spark,
// this ident will NOT be used to look up the Iceberg tables later.
CatalystSqlParser.parseMultipartIdentifier(table.name()).tail.asTableIdentifier
case Failure(e) => throw e
}
resolveCloneCommand(
cloneStatement.target,
CloneIcebergSource(tableIdent, Some(table), tableSchema = None, session),
cloneStatement)

case u: UnresolvedRelation =>
u.failAnalysis(msg = s"Table not found: ${u.multipartIdentifier.quoted}")
Expand Down Expand Up @@ -436,7 +463,13 @@ class DeltaAnalysis(session: SparkSession)
case Some(existingCatalog) => existingCatalog.identifier
case None => TableIdentifier(path.toString, Some("delta"))
}
val cloneSourceTable = sourceTbl
// Reuse the existing schema so that the physical name of columns are consistent
val cloneSourceTable = sourceTbl match {
case source: CloneIcebergSource =>
// Reuse the existing schema so that the physical name of columns are consistent
source.copy(tableSchema = Some(deltaTableV2.snapshot.metadata.schema))
case other => other
}
val catalogTable = createCatalogTableForCloneCommand(
path,
byPath = existingTable.isEmpty,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,15 @@ class DeltaCatalog extends DelegatingCatalogExtension
case o => o
}
} catch {
case _: NoSuchDatabaseException | _: NoSuchNamespaceException | _: NoSuchTableException
if isPathIdentifier(ident) =>
newDeltaPathTable(ident)
case e @ (
_: NoSuchDatabaseException | _: NoSuchNamespaceException | _: NoSuchTableException) =>
if (isPathIdentifier(ident)) {
newDeltaPathTable(ident)
} else if (isIcebergPathIdentifier(ident)) {
IcebergTablePlaceHolder(TableIdentifier(ident.name(), Some("iceberg")))
} else {
throw e
}
case e: AnalysisException if gluePermissionError(e) && isPathIdentifier(ident) =>
logWarning("Received an access denied error from Glue. Assuming this " +
s"identifier ($ident) is path based.", e)
Expand Down Expand Up @@ -648,6 +654,13 @@ trait SupportsPathIdentifier extends TableCatalog { self: DeltaCatalog =>
ident.namespace().length == 1 && DeltaSourceUtils.isDeltaDataSourceName(ident.namespace().head)
}

private def hasIcebergNamespace(ident: Identifier): Boolean = {
ident.namespace().length == 1 && ident.namespace().head.equalsIgnoreCase("iceberg")
}

protected def isIcebergPathIdentifier(ident: Identifier): Boolean = {
hasIcebergNamespace(ident) && new Path(ident.name()).isAbsolute
}

protected def isPathIdentifier(ident: Identifier): Boolean = {
// Should be a simple check of a special PathIdentifier class in the future
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.catalog

import scala.collection.JavaConverters._

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.connector.catalog.{Table, TableCapability}
import org.apache.spark.sql.types.StructType

/** A place holder used to resolve Iceberg table as a relation during analysis */
case class IcebergTablePlaceHolder(tableIdentifier: TableIdentifier) extends Table {

override def name(): String = tableIdentifier.unquotedString

override def schema(): StructType = new StructType()

override def capabilities(): java.util.Set[TableCapability] = Set.empty[TableCapability].asJava
}
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ trait CloneSource extends Closeable {
// Clone source table formats
object CloneSourceFormat {
val DELTA = "Delta"
val ICEBERG = "Iceberg"
val PARQUET = "Parquet"
val UNKNOWN = "Unknown"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.{Column, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.connector.catalog.Table
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{LongType, StructType}
Expand Down Expand Up @@ -245,3 +246,21 @@ class CloneParquetSource(

override def close(): Unit = convertTargetTable.fileManifest.close()
}

/**
* A iceberg table source to be cloned from
*/
case class CloneIcebergSource(
tableIdentifier: TableIdentifier,
sparkTable: Option[Table],
tableSchema: Option[StructType],
spark: SparkSession) extends CloneParquetSource(tableIdentifier, None, spark) {

override lazy val convertTargetTable: ConvertTargetTable =
ConvertToDeltaCommand.getIcebergTable(spark, tableIdentifier.table, sparkTable, tableSchema)

override def format: String = CloneSourceFormat.ICEBERG

override def name: String =
sparkTable.map(_.name()).getOrElse(s"iceberg.`${tableIdentifier.table}`")
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.util.control.NonFatal

import org.apache.spark.sql.delta.{DeltaErrors, DeltaLog, DeltaTableIdentifier, OptimisticTransaction}
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.catalog.IcebergTablePlaceHolder
import org.apache.spark.sql.delta.files.TahoeBatchFileIndex
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSourceUtils
Expand Down Expand Up @@ -188,6 +189,8 @@ trait DeltaCommand extends DeltaLogging {
case LogicalRelation(HadoopFsRelation(_, _, _, _, _, _), _, None, _) => false
// is table
case LogicalRelation(HadoopFsRelation(_, _, _, _, _, _), _, Some(_), _) => true
// is iceberg table
case DataSourceV2Relation(_: IcebergTablePlaceHolder, _, _, _, _) => false
// could not resolve table/db
case _: UnresolvedRelation =>
throw new NoSuchTableException(tableIdent.database.getOrElse(""), tableIdent.table)
Expand Down
51 changes: 35 additions & 16 deletions core/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.{TableIdentifier, TimeTravel}
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.catalyst.plans.logical.CloneTableStatement

Expand Down Expand Up @@ -126,6 +127,7 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper {
target: String,
sourceFormat: String = "delta",
sourceIsTable: Boolean = true,
sourceIs3LTable: Boolean = false,
targetIsTable: Boolean = true,
targetLocation: Option[String] = None,
versionAsOf: Option[Long] = None,
Expand All @@ -146,23 +148,30 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper {
isCreate = isCreate,
isReplace = isReplace,
tableProperties = tableProperties
)) == CloneTableStatement(
if (versionAsOf.isEmpty && timestampAsOf.isEmpty) {
UnresolvedRelation(tblId(source, if (sourceIsTable) null else sourceFormat))
)) == {
val sourceRelation = if (sourceIs3LTable) {
new UnresolvedRelation(source.split('.'))
} else {
TimeTravel(
UnresolvedRelation(tblId(source, if (sourceIsTable) null else sourceFormat)),
timestampAsOf.map(Literal(_)),
versionAsOf,
Some("sql"))
},
UnresolvedRelation(tblId(target)),
ifNotExists = false,
isReplaceCommand = isReplace,
isCreateCommand = isCreate,
tablePropertyOverrides = tableProperties,
targetLocation = targetLocation
)
UnresolvedRelation(tblId(source, if (sourceIsTable) null else sourceFormat))
}
CloneTableStatement(
if (versionAsOf.isEmpty && timestampAsOf.isEmpty) {
sourceRelation
} else {
TimeTravel(
sourceRelation,
timestampAsOf.map(Literal(_)),
versionAsOf,
Some("sql"))
},
UnresolvedRelation(tblId(target)),
ifNotExists = false,
isReplaceCommand = isReplace,
isCreateCommand = isCreate,
tablePropertyOverrides = tableProperties,
targetLocation = targetLocation
)
}
}
}
// scalastyle:on argcount
Expand All @@ -183,6 +192,16 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper {
checkCloneStmt(parser, source = "t1", target = "t1", targetLocation = Some("/new/path"))
// Clone with time travel
checkCloneStmt(parser, source = "t1", target = "t1", versionAsOf = Some(1L))
// Clone with 3L table (only useful for Iceberg table now)
checkCloneStmt(parser, source = "local.iceberg.table", target = "t1", sourceIs3LTable = true)
// Yet target cannot be a 3L table yet
intercept[ParseException] {
checkCloneStmt(parser, source = "local.iceberg.table", target = "catalog.delta.table",
sourceIs3LTable = true)
}
// Custom source format with path
checkCloneStmt(parser, source = "/path/to/iceberg", target = "t1", sourceFormat = "iceberg",
sourceIsTable = false)
}

private def unresolvedAttr(colName: String*): UnresolvedAttribute = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ class IcebergFileManifest(
}
}
spark.createDataset(rdd)
}.reduce(_.union(_))
}.reduceOption(_.union(_)).getOrElse(spark.emptyDataset[ConvertTargetFile])

fileSparkResults = Some(res.cache())
_numFiles = Some(numFiles)
Expand Down
Loading

0 comments on commit ff805d0

Please sign in to comment.