Skip to content

Commit

Permalink
[HUDI-5277] Close HoodieWriteClient before exiting RunClusteringProce…
Browse files Browse the repository at this point in the history
…dure (apache#7300)
  • Loading branch information
stream2000 authored and Alexey Kudinkin committed Dec 14, 2022
1 parent 050bb83 commit 571a49f
Showing 1 changed file with 37 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hudi.command.procedures

import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL}
import org.apache.hudi.client.SparkRDDWriteClient
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieTimeline}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.util.ValidationUtils.checkArgument
Expand Down Expand Up @@ -104,36 +105,43 @@ class RunClusteringProcedure extends BaseProcedure
.iterator().asScala.map(_.getLeft.getTimestamp).toSeq.sortBy(f => f)
logInfo(s"Pending clustering instants: ${pendingClustering.mkString(",")}")

val client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, conf)
val instantTime = HoodieActiveTimeline.createNewInstantTime
if (client.scheduleClusteringAtInstant(instantTime, HOption.empty())) {
pendingClustering ++= Seq(instantTime)
}
logInfo(s"Clustering instants to run: ${pendingClustering.mkString(",")}.")

val startTs = System.currentTimeMillis()
pendingClustering.foreach(client.cluster(_, true))
logInfo(s"Finish clustering all the instants: ${pendingClustering.mkString(",")}," +
s" time cost: ${System.currentTimeMillis() - startTs}ms.")

val clusteringInstants = metaClient.reloadActiveTimeline().getInstants.iterator().asScala
.filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION && pendingClustering.contains(p.getTimestamp))
.toSeq
.sortBy(f => f.getTimestamp)
.reverse

val clusteringPlans = clusteringInstants.map(instant =>
ClusteringUtils.getClusteringPlan(metaClient, instant)
)

if (showInvolvedPartitions) {
clusteringPlans.map { p =>
Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(),
p.get().getLeft.getState.name(), HoodieCLIUtils.extractPartitions(p.get().getRight.getInputGroups.asScala))
var client: SparkRDDWriteClient[_] = null
try {
client = HoodieCLIUtils.createHoodieClientFromPath(sparkSession, basePath, conf)
val instantTime = HoodieActiveTimeline.createNewInstantTime
if (client.scheduleClusteringAtInstant(instantTime, HOption.empty())) {
pendingClustering ++= Seq(instantTime)
}
logInfo(s"Clustering instants to run: ${pendingClustering.mkString(",")}.")

val startTs = System.currentTimeMillis()
pendingClustering.foreach(client.cluster(_, true))
logInfo(s"Finish clustering all the instants: ${pendingClustering.mkString(",")}," +
s" time cost: ${System.currentTimeMillis() - startTs}ms.")

val clusteringInstants = metaClient.reloadActiveTimeline().getInstants.iterator().asScala
.filter(p => p.getAction == HoodieTimeline.REPLACE_COMMIT_ACTION && pendingClustering.contains(p.getTimestamp))
.toSeq
.sortBy(f => f.getTimestamp)
.reverse

val clusteringPlans = clusteringInstants.map(instant =>
ClusteringUtils.getClusteringPlan(metaClient, instant)
)

if (showInvolvedPartitions) {
clusteringPlans.map { p =>
Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(),
p.get().getLeft.getState.name(), HoodieCLIUtils.extractPartitions(p.get().getRight.getInputGroups.asScala))
}
} else {
clusteringPlans.map { p =>
Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(), p.get().getLeft.getState.name(), "*")
}
}
} else {
clusteringPlans.map { p =>
Row(p.get().getLeft.getTimestamp, p.get().getRight.getInputGroups.size(), p.get().getLeft.getState.name(), "*")
} finally {
if (client != null) {
client.close()
}
}
}
Expand Down

0 comments on commit 571a49f

Please sign in to comment.