Skip to content

Commit

Permalink
Set hoodie.query.as.ro.table in serde properties
Browse files Browse the repository at this point in the history
  • Loading branch information
jinxing64 committed May 15, 2022
1 parent 461556f commit 1678247
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ extends HoodieLeafRunnableCommand {

var rtTableOpt: Option[CatalogTable] = None
var roTableOpt: Option[CatalogTable] = None
if (catalog.tableExists(roIdt)) {
if (catalog.tableExists(rtIdt)) {
val rtTable = catalog.getTableMetadata(rtIdt)
if (rtTable.storage.locationUri.equals(hoodieTable.table.storage.locationUri)) {
rtTable.properties.get(ConfigUtils.IS_QUERY_AS_RO_TABLE) match {
rtTable.storage.properties.get(ConfigUtils.IS_QUERY_AS_RO_TABLE) match {
case Some(v) if v.equalsIgnoreCase("false") => rtTableOpt = Some(rtTable)
case _ => // do-nothing
}
Expand All @@ -107,7 +107,7 @@ extends HoodieLeafRunnableCommand {
if (catalog.tableExists(roIdt)) {
val roTable = catalog.getTableMetadata(roIdt)
if (roTable.storage.locationUri.equals(hoodieTable.table.storage.locationUri)) {
roTable.properties.get(ConfigUtils.IS_QUERY_AS_RO_TABLE) match {
roTable.storage.properties.get(ConfigUtils.IS_QUERY_AS_RO_TABLE) match {
case Some(v) if v.equalsIgnoreCase("true") => roTableOpt = Some(roTable)
case _ => // do-nothing
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.spark.sql.hudi

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.SessionCatalog

class TestDropTable extends HoodieSparkSqlTestBase {

test("Test Drop Table") {
Expand Down Expand Up @@ -98,10 +101,11 @@ class TestDropTable extends HoodieSparkSqlTestBase {
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts',
| hoodie.query.as.ro.table='true'
| preCombineField = 'ts'
| )
""".stripMargin)
alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_ro"),
Map("hoodie.query.as.ro.table" -> "true"))

spark.sql(
s"""
Expand All @@ -110,10 +114,11 @@ class TestDropTable extends HoodieSparkSqlTestBase {
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts',
| hoodie.query.as.ro.table='false'
| preCombineField = 'ts'
| )
""".stripMargin)
alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_rt"),
Map("hoodie.query.as.ro.table" -> "false"))

spark.sql(s"drop table ${tableName} purge")
checkAnswer("show tables")()
Expand Down Expand Up @@ -145,10 +150,11 @@ class TestDropTable extends HoodieSparkSqlTestBase {
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts',
| hoodie.query.as.ro.table='true'
| preCombineField = 'ts'
| )
""".stripMargin)
alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_ro"),
Map("hoodie.query.as.ro.table" -> "true"))

spark.sql(
s"""
Expand All @@ -157,10 +163,11 @@ class TestDropTable extends HoodieSparkSqlTestBase {
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts',
| hoodie.query.as.ro.table='false'
| preCombineField = 'ts'
| )
""".stripMargin)
alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_rt"),
Map("hoodie.query.as.ro.table" -> "false"))

spark.sql(s"drop table ${tableName}_ro")
checkAnswer("show tables")(
Expand Down Expand Up @@ -199,10 +206,11 @@ class TestDropTable extends HoodieSparkSqlTestBase {
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts',
| hoodie.query.as.ro.table='true'
| preCombineField = 'ts'
| )
""".stripMargin)
alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_ro"),
Map("hoodie.query.as.ro.table" -> "true"))

spark.sql(
s"""
Expand All @@ -211,15 +219,23 @@ class TestDropTable extends HoodieSparkSqlTestBase {
| tblproperties (
| type = 'mor',
| primaryKey = 'id',
| preCombineField = 'ts',
| hoodie.query.as.ro.table='false'
| preCombineField = 'ts'
| )
""".stripMargin)
alterSerdeProperties(spark.sessionState.catalog, TableIdentifier(s"${tableName}_rt"),
Map("hoodie.query.as.ro.table" -> "false"))

spark.sql(s"drop table ${tableName}_ro purge")
checkAnswer("show tables")()
}
}


private def alterSerdeProperties(sessionCatalog: SessionCatalog, tableIdt: TableIdentifier,
newProperties: Map[String, String]): Unit = {
val catalogTable = spark.sessionState.catalog.getTableMetadata(tableIdt)
val storage = catalogTable.storage
val storageProperties = storage.properties ++ newProperties
val newCatalogTable = catalogTable.copy(storage = storage.copy(properties = storageProperties))
sessionCatalog.alterTable(newCatalogTable)
}
}

0 comments on commit 1678247

Please sign in to comment.