diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala index 18ea636c05719..fa5bbb33bbf95 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RunClusteringProcedure.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hudi.command.procedures import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL} +import org.apache.hudi.client.SparkRDDWriteClient import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline} import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} import org.apache.hudi.common.util.ValidationUtils.checkArgument @@ -104,36 +105,43 @@ class RunClusteringProcedure extends BaseProcedure .iterator().asScala.map(_.getLeft.getTimestamp).toSeq.sortBy(f => f) logInfo(s"Pending clustering instants: ${pendingClustering.mkString(",")}") - val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, conf) - val instantTime = HoodieActiveTimeline.createNewInstantTime - if (client.scheduleClusteringAtInstant(instantTime, HOption.empty())) { - pendingClustering ++= Seq(instantTime) - } - logInfo(s"Clustering instants to run: ${pendingClustering.mkString(",")}.") - - val startTs = System.currentTimeMillis() - pendingClustering.foreach(client.cluster(_, true)) - logInfo(s"Finish clustering all the instants: ${pendingClustering.mkString(",")}," + - s" time cost: ${System.currentTimeMillis() - startTs}ms.") - - val clusteringInstants = metaClient.reloadActiveTimeline().getInstants.iterator().asScala - .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION && pendingClustering.contains(p.getTimestamp)) - .toSeq - .sortBy(f => f.getTimestamp) - .reverse - - val clusteringPlans = clusteringInstants.map(instant => - ClusteringUtils.getClusteringPlan(metaClient, instant) - ) - - if (showInvolvedPartitions) { - clusteringPlans.map { p => - Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(), - p.get().getLeft.getState.name(), HoodieCLIUtils.extractPartitions(p.get().getRight.getInputGroups.asScala)) + var client: SparkRDDWriteClient[_] = null + try { + client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, conf) + val instantTime = HoodieActiveTimeline.createNewInstantTime + if (client.scheduleClusteringAtInstant(instantTime, HOption.empty())) { + pendingClustering ++= Seq(instantTime) + } + logInfo(s"Clustering instants to run: ${pendingClustering.mkString(",")}.") + + val startTs = System.currentTimeMillis() + pendingClustering.foreach(client.cluster(_, true)) + logInfo(s"Finish clustering all the instants: ${pendingClustering.mkString(",")}," + + s" time cost: ${System.currentTimeMillis() - startTs}ms.") + + val clusteringInstants = metaClient.reloadActiveTimeline().getInstants.iterator().asScala + .filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION && pendingClustering.contains(p.getTimestamp)) + .toSeq + .sortBy(f => f.getTimestamp) + .reverse + + val clusteringPlans = clusteringInstants.map(instant => + ClusteringUtils.getClusteringPlan(metaClient, instant) + ) + + if (showInvolvedPartitions) { + clusteringPlans.map { p => + Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(), + p.get().getLeft.getState.name(), HoodieCLIUtils.extractPartitions(p.get().getRight.getInputGroups.asScala)) + } + } else { + clusteringPlans.map { p => + Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(), p.get().getLeft.getState.name(), "*") + } } - } else { - clusteringPlans.map { p => - Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(), p.get().getLeft.getState.name(), "*") + } finally { + if (client != null) { + client.close() } } }