Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-4100] CTAS failed to clean up when given an illegal MANAGED table definition #5588

Merged
merged 1 commit into from
May 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -763,4 +763,22 @@ class TestCreateTable extends HoodieSparkSqlTestBase {
assertResult(true)(shown.contains("COMMENT 'This is a simple hudi table'"))
}
}

test("Test CTAS using an illegal definition -- a COW table with compaction enabled.") {
val tableName = generateTableName
checkExceptionContain(
s"""
| create table $tableName using hudi
| tblproperties(
| primaryKey = 'id',
| type = 'cow',
| hoodie.compact.inline='true'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I do not see the check logic for the invalid definition in this PR, am I missing it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, what i mean is the check logic in hudi

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current HoodieSparkCopyOnWriteTable #scheduleCompaction and #compact throws new HoodieNotSupportedException("Compaction is not supported on a CopyOnWrite table");

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jinxing64 but how did the create table with hoodie.compact.inline=true call HoodieSparkCopyOnWriteTable #scheduleCompaction or #compact? ps: you can add my wechat(xleesf) to communicate offline

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jinxing64 but how did the create table with hoodie.compact.inline=true call HoodieSparkCopyOnWriteTable #scheduleCompaction or #compact? ps: you can add my wechat(xleesf) to communicate offline

Copy link
Contributor Author

@jinxing64 jinxing64 May 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for late reply ~ I applied on "wechat" :)

Stacktrace as below:

org.apache.hudi.exception.HoodieNotSupportedException: Compaction is not supported on a CopyOnWrite table
	at org.apache.hudi.table.HoodieSparkCopyOnWriteTable.scheduleCompaction(HoodieSparkCopyOnWriteTable.java:169)
	at org.apache.hudi.client.BaseHoodieWriteClient.scheduleTableServiceInternal(BaseHoodieWriteClient.java:1348)
	at org.apache.hudi.client.BaseHoodieWriteClient.scheduleTableService(BaseHoodieWriteClient.java:1325)
	at org.apache.hudi.client.BaseHoodieWriteClient.scheduleCompactionAtInstant(BaseHoodieWriteClient.java:1003)
	at org.apache.hudi.client.BaseHoodieWriteClient.scheduleCompaction(BaseHoodieWriteClient.java:994)
	at org.apache.hudi.client.BaseHoodieWriteClient.inlineScheduleCompaction(BaseHoodieWriteClient.java:1252)
	at org.apache.hudi.client.BaseHoodieWriteClient.inlineCompaction(BaseHoodieWriteClient.java:1238)
	at org.apache.hudi.client.BaseHoodieWriteClient.runTableServicesInline(BaseHoodieWriteClient.java:552)
	at org.apache.hudi.client.BaseHoodieWriteClient.commitStats(BaseHoodieWriteClient.java:245)
	at org.apache.hudi.client.SparkRDDWriteClient.commit(SparkRDDWriteClient.java:122)
	at org.apache.hudi.HoodieSparkSqlWriter$.commitAndPerformPostOperations(HoodieSparkSqlWriter.scala:651)
	at org.apache.hudi.HoodieSparkSqlWriter$.write(HoodieSparkSqlWriter.scala:315)
	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:163)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my understanding about the compaction triggering mechanism -- when inlineCompaction is enabled, HoodieSparkTable#scheduleCompaction is invoked from HoodieWriteClient. Then ScheduleCompactionActionExecutor decides the triggering timing;

The exception above (Compaction is not supported) is thrown from HoodieSparkCopyOnWriteTable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry I do not see the check logic for the invalid definition in this PR, am I missing it?

| )
| AS
| select 1 as id, 'a1' as name, 10 as price, 1000 as ts
|""".stripMargin)("Compaction is not supported on a CopyOnWrite table")
val dbPath = spark.sessionState.catalog.getDatabaseMetadata("default").locationUri.getPath
val tablePath = s"${dbPath}/${tableName}"
assertResult(false)(existsPath(tablePath))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.spark.sql.hudi.{HoodieSqlCommonUtils, ProvidesHoodieConfig}
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{Dataset, SaveMode, SparkSession, _}

import java.net.URI
import java.util
import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConverter}

Expand All @@ -50,7 +51,9 @@ class HoodieCatalog extends DelegatingCatalogExtension

override def stageCreate(ident: Identifier, schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): StagedTable = {
if (sparkAdapter.isHoodieTable(properties)) {
HoodieStagedTable(ident, this, schema, partitions, properties, TableCreationMode.STAGE_CREATE)
val locUriAndTableType = deduceTableLocationURIAndTableType(ident, properties)
HoodieStagedTable(ident, locUriAndTableType, this, schema, partitions,
properties, TableCreationMode.STAGE_CREATE)
} else {
BasicStagedTable(
ident,
Expand All @@ -61,7 +64,9 @@ class HoodieCatalog extends DelegatingCatalogExtension

override def stageReplace(ident: Identifier, schema: StructType, partitions: Array[Transform], properties: util.Map[String, String]): StagedTable = {
if (sparkAdapter.isHoodieTable(properties)) {
HoodieStagedTable(ident, this, schema, partitions, properties, TableCreationMode.STAGE_REPLACE)
val locUriAndTableType = deduceTableLocationURIAndTableType(ident, properties)
HoodieStagedTable(ident, locUriAndTableType, this, schema, partitions,
properties, TableCreationMode.STAGE_REPLACE)
} else {
super.dropTable(ident)
BasicStagedTable(
Expand All @@ -76,8 +81,9 @@ class HoodieCatalog extends DelegatingCatalogExtension
partitions: Array[Transform],
properties: util.Map[String, String]): StagedTable = {
if (sparkAdapter.isHoodieTable(properties)) {
HoodieStagedTable(
ident, this, schema, partitions, properties, TableCreationMode.CREATE_OR_REPLACE)
val locUriAndTableType = deduceTableLocationURIAndTableType(ident, properties)
HoodieStagedTable(ident, locUriAndTableType, this, schema, partitions,
properties, TableCreationMode.CREATE_OR_REPLACE)
} else {
try super.dropTable(ident) catch {
case _: NoSuchTableException => // ignore the exception
Expand Down Expand Up @@ -112,7 +118,9 @@ class HoodieCatalog extends DelegatingCatalogExtension
schema: StructType,
partitions: Array[Transform],
properties: util.Map[String, String]): Table = {
createHoodieTable(ident, schema, partitions, properties, Map.empty, Option.empty, TableCreationMode.CREATE)
val locUriAndTableType = deduceTableLocationURIAndTableType(ident, properties)
createHoodieTable(ident, schema, locUriAndTableType, partitions, properties,
Map.empty, Option.empty, TableCreationMode.CREATE)
}

override def tableExists(ident: Identifier): Boolean = super.tableExists(ident)
Expand Down Expand Up @@ -193,8 +201,30 @@ class HoodieCatalog extends DelegatingCatalogExtension
loadTable(ident)
}

private def deduceTableLocationURIAndTableType(
ident: Identifier, properties: util.Map[String, String]): (URI, CatalogTableType) = {
val locOpt = if (isPathIdentifier(ident)) {
Option(ident.name())
} else {
Option(properties.get("location"))
}
val tableType = if (locOpt.nonEmpty) {
CatalogTableType.EXTERNAL
} else {
CatalogTableType.MANAGED
}
val locUriOpt = locOpt.map(CatalogUtils.stringToURI)
val tableIdent = ident.asTableIdentifier
val existingTableOpt = getExistingTableIfExists(tableIdent)
val locURI = locUriOpt
.orElse(existingTableOpt.flatMap(_.storage.locationUri))
.getOrElse(spark.sessionState.catalog.defaultTablePath(tableIdent))
(locURI, tableType)
}

def createHoodieTable(ident: Identifier,
schema: StructType,
locUriAndTableType: (URI, CatalogTableType),
partitions: Array[Transform],
allTableProperties: util.Map[String, String],
writeOptions: Map[String, String],
Expand All @@ -206,29 +236,17 @@ class HoodieCatalog extends DelegatingCatalogExtension
val newPartitionColumns = partitionColumns
val newBucketSpec = maybeBucketSpec

val isByPath = isPathIdentifier(ident)

val location = if (isByPath) Option(ident.name()) else Option(allTableProperties.get("location"))
val id = ident.asTableIdentifier

val locUriOpt = location.map(CatalogUtils.stringToURI)
val existingTableOpt = getExistingTableIfExists(id)
val loc = locUriOpt
.orElse(existingTableOpt.flatMap(_.storage.locationUri))
.getOrElse(spark.sessionState.catalog.defaultTablePath(id))
val storage = DataSource.buildStorageFormatFromOptions(writeOptions.--(needFilterProps))
.copy(locationUri = Option(loc))
val tableType =
if (location.isDefined) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we call deduceTableLocationURIAndTableType in createHoodieTable and HoodieStageTable instead of call it before createHoodieTable and HoodieStageTable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean -- HoodieStageTable invokes deduceTableLocationURIAndTableType to get the "path" and then invokes createHoodieTable. While inside createHoodieTable invokes deduceTableLocationURIAndTableType AGAIN to get the "path";

I'd prefer to invoke deduceTableLocationURIAndTableType only once between HoodieCatalog and HoodieStageTable. The deducing logic doesn't make much performance cost. But I'd rather keep the invoking logic straightforward.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the first version of this PR, I construct a whole HoodieCatalogTable first, and then pass it to HoodieStageTable and createHoodieTable -- but I think it might be better to only pass the necessary information(location and table type) to HoodieStageTable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we call deduceTableLocationURIAndTableType in createHoodieTable and HoodieStageTable instead of call it before createHoodieTable and HoodieStageTable?

.copy(locationUri = Option(locUriAndTableType._1))
val commentOpt = Option(allTableProperties.get("comment"))

val tablePropertiesNew = new util.HashMap[String, String](allTableProperties)
// put path to table properties.
tablePropertiesNew.put("path", loc.getPath)
tablePropertiesNew.put("path", locUriAndTableType._1.getPath)

val tableDesc = new CatalogTable(
identifier = id,
tableType = tableType,
identifier = ident.asTableIdentifier,
tableType = locUriAndTableType._2,
storage = storage,
schema = newSchema,
provider = Option("hudi"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hudi.DataSourceWriteOptions.RECORDKEY_FIELD
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.connector.catalog.{Identifier, StagedTable, SupportsWrite, TableCapability}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, V1Write, WriteBuilder}
import org.apache.spark.sql.sources.InsertableRelation
import org.apache.spark.sql.types.StructType

import java.net.URI
import java.util
import scala.collection.JavaConverters.{mapAsScalaMapConverter, setAsJavaSetConverter}

case class HoodieStagedTable(ident: Identifier,
locUriAndTableType: (URI, CatalogTableType),
catalog: HoodieCatalog,
override val schema: StructType,
partitions: Array[Transform],
Expand Down Expand Up @@ -59,13 +61,14 @@ case class HoodieStagedTable(ident: Identifier,
props.putAll(properties)
props.put("hoodie.table.name", ident.name())
props.put(RECORDKEY_FIELD.key, properties.get("primaryKey"))
catalog.createHoodieTable(ident, schema, partitions, props, writeOptions, sourceQuery, mode)
catalog.createHoodieTable(
ident, schema, locUriAndTableType, partitions, props, writeOptions, sourceQuery, mode)
}

override def name(): String = ident.name()

override def abortStagedChanges(): Unit = {
clearTablePath(properties.get("location"), catalog.spark.sparkContext.hadoopConfiguration)
clearTablePath(locUriAndTableType._1.getPath, catalog.spark.sparkContext.hadoopConfiguration)
}

private def clearTablePath(tablePath: String, conf: Configuration): Unit = {
Expand All @@ -85,13 +88,9 @@ case class HoodieStagedTable(ident: Identifier,
* WriteBuilder for creating a Hoodie table.
*/
private class HoodieV1WriteBuilder extends WriteBuilder {
override def build(): V1Write = new V1Write {
override def toInsertableRelation(): InsertableRelation = {
new InsertableRelation {
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
sourceQuery = Option(data)
}
}
override def build(): V1Write = () => {
(data: DataFrame, overwrite: Boolean) => {
sourceQuery = Option(data)
}
}
}
Expand Down