diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala index d066ae5bcd2f..b8893ac80dfd 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala @@ -64,6 +64,8 @@ object HoodieProcedures { mapBuilder.put(RunBootstrapProcedure.NAME, RunBootstrapProcedure.builder) mapBuilder.put(ShowBootstrapMappingProcedure.NAME, ShowBootstrapMappingProcedure.builder) mapBuilder.put(ShowBootstrapPartitionsProcedure.NAME, ShowBootstrapPartitionsProcedure.builder) + mapBuilder.put(UpgradeTableProcedure.NAME, UpgradeTableProcedure.builder) + mapBuilder.put(DowngradeTableProcedure.NAME, DowngradeTableProcedure.builder) mapBuilder.build } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala new file mode 100644 index 000000000000..792d26b18447 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.command.procedures + +import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion +import org.apache.hudi.common.table.{HoodieTableMetaClient, HoodieTableVersion} +import org.apache.hudi.common.util.Option +import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, HoodieWriteConfig} +import org.apache.hudi.index.HoodieIndex +import org.apache.hudi.table.upgrade.{SparkUpgradeDowngradeHelper, UpgradeDowngrade} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.Row +import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} + +import java.util.function.Supplier +import scala.util.{Failure, Success, Try} + +class UpgradeOrDowngradeProcedure extends BaseProcedure with ProcedureBuilder with Logging { + private val PARAMETERS = Array[ProcedureParameter]( + ProcedureParameter.required(0, "table", DataTypes.StringType, None), + ProcedureParameter.required(1, "toVersion", DataTypes.StringType, None) + ) + + private val OUTPUT_TYPE = new StructType(Array[StructField]( + StructField("result", DataTypes.BooleanType, nullable = true, Metadata.empty)) + ) + + def parameters: Array[ProcedureParameter] = PARAMETERS + + def outputType: StructType = OUTPUT_TYPE + + override def call(args: ProcedureArgs): Seq[Row] = { + super.checkArgs(PARAMETERS, args) + + val tableName = getArgValueOrDefault(args, PARAMETERS(0)) + val toVersion = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String] + val basePath = getBasePath(tableName) + + val config = getWriteConfigWithTrue(basePath) + val metaClient = HoodieTableMetaClient.builder + .setConf(jsc.hadoopConfiguration) + .setBasePath(config.getBasePath) + .setLoadActiveTimelineOnLoad(false) + .setConsistencyGuardConfig(config.getConsistencyGuardConfig) + .setLayoutVersion(Option.of(new TimelineLayoutVersion(config.getTimelineLayoutVersion))) + .setFileSystemRetryConfig(config.getFileSystemRetryConfig) + .build + + val result = Try { + new UpgradeDowngrade(metaClient, config, new HoodieSparkEngineContext(jsc), SparkUpgradeDowngradeHelper.getInstance) + .run(HoodieTableVersion.valueOf(toVersion), null) + } match { + case Success(_) => + logInfo(s"Table at $basePath upgraded / downgraded to version $toVersion.") + true + case Failure(e) => + logWarning(s"Failed: Could not upgrade/downgrade table at $basePath to version $toVersion.", e) + false + } + + Seq(Row(result)) + } + + private def getWriteConfigWithTrue(basePath: String) = { + HoodieWriteConfig.newBuilder + .withPath(basePath) + .withRollbackUsingMarkers(true) + .withCompactionConfig(HoodieCompactionConfig.newBuilder.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER).build) + .withIndexConfig(HoodieIndexConfig.newBuilder.withIndexType(HoodieIndex.IndexType.BLOOM).build) + .build + } + + override def build = new UpgradeOrDowngradeProcedure() +} + +object UpgradeTableProcedure { + val NAME = "upgrade_table" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new UpgradeOrDowngradeProcedure() + } +} + +object DowngradeTableProcedure { + val NAME = "downgrade_table" + + def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] { + override def get() = new UpgradeOrDowngradeProcedure() + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala new file mode 100644 index 000000000000..55c184ab561d --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hudi.procedure + +import org.apache.hadoop.fs.Path +import org.apache.hudi.common.config.HoodieConfig +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, HoodieTableVersion} +import org.apache.spark.api.java.JavaSparkContext +import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase + +import java.io.IOException + +class TestUpgradeOrDowngradeProcedure extends HoodieSparkSqlTestBase { + + test("Test Call downgrade_table and upgrade_table Procedure") { + withTempDir { tmp => + val tableName = generateTableName + val tablePath = s"${tmp.getCanonicalPath}/$tableName" + // create table + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | location '$tablePath' + | tblproperties ( + | primaryKey = 'id', + | preCombineField = 'ts' + | ) + """.stripMargin) + // Check required fields + checkExceptionContain(s"""call downgrade_table(table => '$tableName')""")( + s"Argument: toVersion is required") + + var metaClient = HoodieTableMetaClient.builder + .setConf(new JavaSparkContext(spark.sparkContext).hadoopConfiguration()) + .setBasePath(tablePath) + .build + + // verify hoodie.table.version of the original table + assertResult(HoodieTableVersion.FOUR.versionCode) { + metaClient.getTableConfig.getTableVersion.versionCode() + } + assertTableVersionFromPropertyFile(metaClient, HoodieTableVersion.FOUR.versionCode) + + // downgrade table to ZERO + checkAnswer(s"""call downgrade_table(table => '$tableName', toVersion => 'ZERO')""")(Seq(true)) + + // verify the downgraded hoodie.table.version + metaClient = HoodieTableMetaClient.reload(metaClient) + assertResult(HoodieTableVersion.ZERO.versionCode) { + metaClient.getTableConfig.getTableVersion.versionCode() + } + assertTableVersionFromPropertyFile(metaClient, HoodieTableVersion.ZERO.versionCode) + + // upgrade table to ONE + checkAnswer(s"""call upgrade_table(table => '$tableName', toVersion => 'ONE')""")(Seq(true)) + + // verify the upgraded hoodie.table.version + metaClient = HoodieTableMetaClient.reload(metaClient) + assertResult(HoodieTableVersion.ONE.versionCode) { + metaClient.getTableConfig.getTableVersion.versionCode() + } + assertTableVersionFromPropertyFile(metaClient, HoodieTableVersion.ONE.versionCode) + } + } + + @throws[IOException] + private def assertTableVersionFromPropertyFile(metaClient: HoodieTableMetaClient, versionCode: Int): Unit = { + val propertyFile = new Path(metaClient.getMetaPath + "/" + HoodieTableConfig.HOODIE_PROPERTIES_FILE) + // Load the properties and verify + val fsDataInputStream = metaClient.getFs.open(propertyFile) + val hoodieConfig = HoodieConfig.create(fsDataInputStream) + fsDataInputStream.close() + assertResult(Integer.toString(versionCode)) { + hoodieConfig.getString(HoodieTableConfig.VERSION) + } + } +}