Skip to content

Commit

Permalink
[HUDI-4087] Support dropping RO and RT table in DropHoodieTableCommand (
Browse files Browse the repository at this point in the history
#5564)

* [HUDI-4087] Support dropping RO and RT table in DropHoodieTableCommand

* Set hoodie.query.as.ro.table in serde properties
  • Loading branch information
jinxing64 authored May 17, 2022
1 parent d52d133 commit d422f69
Show file tree
Hide file tree
Showing 2 changed files with 200 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@ import org.apache.hadoop.fs.Path
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieTableType
import org.apache.hudi.hive.util.ConfigUtils
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, HoodieCatalogTable}
import org.apache.spark.sql.hive.HiveClientUtils
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.isEnableHive
import org.apache.spark.sql.catalyst.catalog._

import scala.util.control.NonFatal

Expand Down Expand Up @@ -69,13 +67,13 @@ extends HoodieLeafRunnableCommand {
val catalog = sparkSession.sessionState.catalog

// Drop table in the catalog
val enableHive = isEnableHive(sparkSession)
if (enableHive) {
dropHiveDataSourceTable(sparkSession, hoodieCatalogTable)
if (HoodieTableType.MERGE_ON_READ == hoodieCatalogTable.tableType && purge) {
val (rtTableOpt, roTableOpt) = getTableRTAndRO(catalog, hoodieCatalogTable)
rtTableOpt.foreach(table => catalog.dropTable(table.identifier, true, false))
roTableOpt.foreach(table => catalog.dropTable(table.identifier, true, false))
catalog.dropTable(table.identifier.copy(table = hoodieCatalogTable.tableName), ifExists, purge)
} else {
if (catalog.tableExists(tableIdentifier)) {
catalog.dropTable(tableIdentifier, ifExists, purge)
}
catalog.dropTable(table.identifier, ifExists, purge)
}

// Recursively delete table directories
Expand All @@ -88,42 +86,33 @@ extends HoodieLeafRunnableCommand {
}
}

private def dropHiveDataSourceTable(
sparkSession: SparkSession,
hoodieCatalogTable: HoodieCatalogTable): Unit = {
val table = hoodieCatalogTable.table
val dbName = table.identifier.database.get
val tableName = hoodieCatalogTable.tableName

// check database exists
val dbExists = sparkSession.sessionState.catalog.databaseExists(dbName)
if (!dbExists) {
throw new NoSuchDatabaseException(dbName)
}

if (HoodieTableType.MERGE_ON_READ == hoodieCatalogTable.tableType && purge) {
val snapshotTableName = tableName + MOR_SNAPSHOT_TABLE_SUFFIX
val roTableName = tableName + MOR_READ_OPTIMIZED_TABLE_SUFFIX

dropHiveTable(sparkSession, dbName, snapshotTableName)
dropHiveTable(sparkSession, dbName, roTableName)
private def getTableRTAndRO(catalog: SessionCatalog,
hoodieTable: HoodieCatalogTable): (Option[CatalogTable], Option[CatalogTable]) = {
val rtIdt = hoodieTable.table.identifier.copy(
table = s"${hoodieTable.tableName}${MOR_SNAPSHOT_TABLE_SUFFIX}")
val roIdt = hoodieTable.table.identifier.copy(
table = s"${hoodieTable.tableName}${MOR_READ_OPTIMIZED_TABLE_SUFFIX}")

var rtTableOpt: Option[CatalogTable] = None
var roTableOpt: Option[CatalogTable] = None
if (catalog.tableExists(rtIdt)) {
val rtTable = catalog.getTableMetadata(rtIdt)
if (rtTable.storage.locationUri.equals(hoodieTable.table.storage.locationUri)) {
rtTable.storage.properties.get(ConfigUtils.IS_QUERY_AS_RO_TABLE) match {
case Some(v) if v.equalsIgnoreCase("false") => rtTableOpt = Some(rtTable)
case _ => // do-nothing
}
}
}

dropHiveTable(sparkSession, dbName, tableName, purge)
}

private def dropHiveTable(
sparkSession: SparkSession,
dbName: String,
tableName: String,
purge: Boolean = false): Unit = {
// check table exists
if (sparkSession.sessionState.catalog.tableExists(new TableIdentifier(tableName, Option(dbName)))) {
val client = HiveClientUtils.newClientForMetadata(sparkSession.sparkContext.conf,
sparkSession.sessionState.newHadoopConf())

// drop hive table.
client.dropTable(dbName, tableName, ifExists, purge)
if (catalog.tableExists(roIdt)) {
val roTable = catalog.getTableMetadata(roIdt)
if (roTable.storage.locationUri.equals(hoodieTable.table.storage.locationUri)) {
roTable.storage.properties.get(ConfigUtils.IS_QUERY_AS_RO_TABLE) match {
case Some(v) if v.equalsIgnoreCase("true") => roTableOpt = Some(roTable)
case _ => // do-nothing
}
}
}
(rtTableOpt, roTableOpt)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.spark.sql.hudi

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.SessionCatalog

class TestDropTable extends HoodieSparkSqlTestBase {

test("Test Drop Table") {
Expand Down Expand Up @@ -72,4 +75,167 @@ class TestDropTable extends HoodieSparkSqlTestBase {
}
}
}

test("Test Drop RO & RT table by purging base table.") {
withTempDir { tmp =>
val tableName = generateTableName
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)

spark.sql(
s"""
|create table ${tableName}_ro using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_ro"),
Map("hoodie.query.as.ro.table" -> "true"))

spark.sql(
s"""
|create table ${tableName}_rt using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_rt"),
Map("hoodie.query.as.ro.table" -> "false"))

spark.sql(s"drop table ${tableName} purge")
checkAnswer("show tables")()
}
}

test("Test Drop RO & RT table by one by one.") {
withTempDir { tmp =>
val tableName = generateTableName
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)

spark.sql(
s"""
|create table ${tableName}_ro using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_ro"),
Map("hoodie.query.as.ro.table" -> "true"))

spark.sql(
s"""
|create table ${tableName}_rt using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_rt"),
Map("hoodie.query.as.ro.table" -> "false"))

spark.sql(s"drop table ${tableName}_ro")
checkAnswer("show tables")(
Seq("default", tableName, false), Seq("default", s"${tableName}_rt", false))

spark.sql(s"drop table ${tableName}_rt")
checkAnswer("show tables")(Seq("default", tableName, false))

spark.sql(s"drop table ${tableName}")
checkAnswer("show tables")()
}
}

test("Test Drop RO table with purge") {
withTempDir { tmp =>
val tableName = generateTableName
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| ts long
|) using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)

spark.sql(
s"""
|create table ${tableName}_ro using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_ro"),
Map("hoodie.query.as.ro.table" -> "true"))

spark.sql(
s"""
|create table ${tableName}_rt using hudi
| location '${tmp.getCanonicalPath}/$tableName'
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts'
| )
""".stripMargin)
alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_rt"),
Map("hoodie.query.as.ro.table" -> "false"))

spark.sql(s"drop table ${tableName}_ro purge")
checkAnswer("show tables")()
}
}

private def alterSerdeProperties(sessionCatalog: SessionCatalog, tableIdt: TableIdentifier,
newProperties: Map[String, String]): Unit = {
val catalogTable = spark.sessionState.catalog.getTableMetadata(tableIdt)
val storage = catalogTable.storage
val storageProperties = storage.properties ++ newProperties
val newCatalogTable = catalogTable.copy(storage = storage.copy(properties = storageProperties))
sessionCatalog.alterTable(newCatalogTable)
}
}

0 comments on commit d422f69

Please sign in to comment.