Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-3506] Add call procedure for CommitsCommand #5974

Merged
merged 4 commits into from
Jun 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hudi.command.procedures

import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}

import java.util.function.Supplier
import scala.collection.JavaConverters._

class CommitsCompareProcedure() extends BaseProcedure with ProcedureBuilder {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.required(1, "path", DataTypes.StringType, None)
)

private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("compare_detail", DataTypes.StringType, nullable = true, Metadata.empty)
))

def parameters: Array[ProcedureParameter] = PARAMETERS

def outputType: StructType = OUTPUT_TYPE

override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)

val table = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String]
val path = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[String]

val hoodieCatalogTable = HoodieCatalogTable(sparkSession, new TableIdentifier(table))
val basePath = hoodieCatalogTable.tableLocation
val source = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build
val target = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(path).build
val sourceTimeline = source.getActiveTimeline.getCommitsTimeline.filterCompletedInstants
val targetTimeline = target.getActiveTimeline.getCommitsTimeline.filterCompletedInstants
val targetLatestCommit =
if (targetTimeline.getInstants.iterator.hasNext) targetTimeline.lastInstant.get.getTimestamp
else "0"
val sourceLatestCommit =
if (sourceTimeline.getInstants.iterator.hasNext) sourceTimeline.lastInstant.get.getTimestamp
else "0"

if (sourceLatestCommit != null && HoodieTimeline.compareTimestamps(targetLatestCommit, HoodieTimeline.GREATER_THAN, sourceLatestCommit)) { // source is behind the target
val commitsToCatchup = targetTimeline.findInstantsAfter(sourceLatestCommit, Integer.MAX_VALUE).getInstants.iterator().asScala.map(instant => instant.getTimestamp)
Seq(Row("Source " + source.getTableConfig.getTableName + " is behind by " + commitsToCatchup.size + " commits. Commits to catch up - " + commitsToCatchup))
} else {
val commitsToCatchup = sourceTimeline.findInstantsAfter(targetLatestCommit, Integer.MAX_VALUE).getInstants.iterator().asScala.map(instant => instant.getTimestamp)
Seq(Row("Source " + source.getTableConfig.getTableName + " is ahead by " + commitsToCatchup.size + " commits. Commits to catch up - " + commitsToCatchup))
}
}

override def build: Procedure = new CommitsCompareProcedure()
}

object CommitsCompareProcedure {
val NAME = "commits_compare"

def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new CommitsCompareProcedure()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ object HoodieProcedures {
mapBuilder.put(ShowClusteringProcedure.NAME, ShowClusteringProcedure.builder)
mapBuilder.put(ShowCommitsProcedure.NAME, ShowCommitsProcedure.builder)
mapBuilder.put(ShowCommitsMetadataProcedure.NAME, ShowCommitsMetadataProcedure.builder)
mapBuilder.put(ShowArchivedCommitsProcedure.NAME, ShowArchivedCommitsProcedure.builder)
mapBuilder.put(ShowArchivedCommitsMetadataProcedure.NAME, ShowArchivedCommitsMetadataProcedure.builder)
mapBuilder.put(ShowCommitFilesProcedure.NAME, ShowCommitFilesProcedure.builder)
mapBuilder.put(ShowCommitPartitionsProcedure.NAME, ShowCommitPartitionsProcedure.builder)
mapBuilder.put(ShowCommitWriteStatsProcedure.NAME, ShowCommitWriteStatsProcedure.builder)
mapBuilder.put(CommitsCompareProcedure.NAME, CommitsCompareProcedure.builder)
mapBuilder.put(ShowSavepointsProcedure.NAME, ShowSavepointsProcedure.builder)
mapBuilder.put(DeleteMarkerProcedure.NAME, DeleteMarkerProcedure.builder)
mapBuilder.put(ShowRollbacksProcedure.NAME, ShowRollbacksProcedure.builder)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hudi.command.procedures

import org.apache.hudi.common.model.HoodieCommitMetadata
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieDefaultTimeline, HoodieInstant}
import org.apache.hudi.common.util.StringUtils
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType}

import java.time.ZonedDateTime
import java.util
import java.util.function.Supplier
import java.util.{Collections, Date}
import scala.collection.JavaConverters._

class ShowArchivedCommitsProcedure(includeExtraMetadata: Boolean) extends BaseProcedure with ProcedureBuilder {
private val PARAMETERS = Array[ProcedureParameter](
ProcedureParameter.required(0, "table", DataTypes.StringType, None),
ProcedureParameter.optional(1, "limit", DataTypes.IntegerType, 10),
ProcedureParameter.optional(2, "startTs", DataTypes.StringType, ""),
ProcedureParameter.optional(3, "endTs", DataTypes.StringType, "")
)

private val OUTPUT_TYPE = new StructType(Array[StructField](
StructField("commit_time", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("total_bytes_written", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_files_added", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_files_updated", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_partitions_written", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_records_written", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_update_records_written", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_errors", DataTypes.LongType, nullable = true, Metadata.empty)
))

private val METADATA_OUTPUT_TYPE = new StructType(Array[StructField](
StructField("commit_time", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("action", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("partition", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("file_id", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("previous_commit", DataTypes.StringType, nullable = true, Metadata.empty),
StructField("num_writes", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("num_inserts", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("num_deletes", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("num_update_writes", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_errors", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_log_blocks", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_corrupt_logblocks", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_rollback_blocks", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_log_records", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_updated_records_compacted", DataTypes.LongType, nullable = true, Metadata.empty),
StructField("total_bytes_written", DataTypes.LongType, nullable = true, Metadata.empty)
))

def parameters: Array[ProcedureParameter] = PARAMETERS

def outputType: StructType = if (includeExtraMetadata) METADATA_OUTPUT_TYPE else OUTPUT_TYPE

override def call(args: ProcedureArgs): Seq[Row] = {
super.checkArgs(PARAMETERS, args)

val table = getArgValueOrDefault(args, PARAMETERS(0)).get.asInstanceOf[String]
val limit = getArgValueOrDefault(args, PARAMETERS(1)).get.asInstanceOf[Int]
var startTs = getArgValueOrDefault(args, PARAMETERS(2)).get.asInstanceOf[String]
var endTs = getArgValueOrDefault(args, PARAMETERS(3)).get.asInstanceOf[String]

val hoodieCatalogTable = HoodieCatalogTable(sparkSession, new TableIdentifier(table))
val basePath = hoodieCatalogTable.tableLocation
val metaClient = HoodieTableMetaClient.builder.setConf(jsc.hadoopConfiguration()).setBasePath(basePath).build

// start time for commits, default: now - 10 days
// end time for commits, default: now - 1 day
if (StringUtils.isNullOrEmpty(startTs)) startTs = getTimeDaysAgo(10)
if (StringUtils.isNullOrEmpty(endTs)) endTs = getTimeDaysAgo(1)

val archivedTimeline = metaClient.getArchivedTimeline
try {
archivedTimeline.loadInstantDetailsInMemory(startTs, endTs)
val timelineRange = archivedTimeline.findInstantsInRange(startTs, endTs)
if (includeExtraMetadata) {
getCommitsWithMetadata(timelineRange, limit)
} else {
getCommits(timelineRange, limit)
}
} finally {
// clear the instant details from memory after printing to reduce usage
archivedTimeline.clearInstantDetailsFromMemory(startTs, endTs)
}
}

override def build: Procedure = new ShowArchivedCommitsProcedure(includeExtraMetadata)

private def getCommitsWithMetadata(timeline: HoodieDefaultTimeline,
limit: Int): Seq[Row] = {
import scala.collection.JavaConversions._

val (rows: util.ArrayList[Row], newCommits: util.ArrayList[HoodieInstant]) = getSortCommits(timeline)

for (i <- 0 until newCommits.size) {
val commit = newCommits.get(i)
val commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get, classOf[HoodieCommitMetadata])
for (partitionWriteStat <- commitMetadata.getPartitionToWriteStats.entrySet) {
for (hoodieWriteStat <- partitionWriteStat.getValue) {
rows.add(Row(
commit.getTimestamp, commit.getAction, hoodieWriteStat.getPartitionPath,
hoodieWriteStat.getFileId, hoodieWriteStat.getPrevCommit, hoodieWriteStat.getNumWrites,
hoodieWriteStat.getNumInserts, hoodieWriteStat.getNumDeletes, hoodieWriteStat.getNumUpdateWrites,
hoodieWriteStat.getTotalWriteErrors, hoodieWriteStat.getTotalLogBlocks, hoodieWriteStat.getTotalCorruptLogBlock,
hoodieWriteStat.getTotalRollbackBlocks, hoodieWriteStat.getTotalLogRecords,
hoodieWriteStat.getTotalUpdatedRecordsCompacted, hoodieWriteStat.getTotalWriteBytes))
}
}
}

rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList
}

private def getSortCommits(timeline: HoodieDefaultTimeline): (util.ArrayList[Row], util.ArrayList[HoodieInstant]) = {
val rows = new util.ArrayList[Row]
// timeline can be read from multiple files. So sort is needed instead of reversing the collection
val commits: util.List[HoodieInstant] = timeline.getCommitsTimeline.filterCompletedInstants
.getInstants.toArray().map(instant => instant.asInstanceOf[HoodieInstant]).toList.asJava
val newCommits = new util.ArrayList[HoodieInstant](commits)
Collections.sort(newCommits, HoodieInstant.COMPARATOR.reversed)
(rows, newCommits)
}

def getCommits(timeline: HoodieDefaultTimeline,
limit: Int): Seq[Row] = {
val (rows: util.ArrayList[Row], newCommits: util.ArrayList[HoodieInstant]) = getSortCommits(timeline)

for (i <- 0 until newCommits.size) {
val commit = newCommits.get(i)
val commitMetadata = HoodieCommitMetadata.fromBytes(timeline.getInstantDetails(commit).get, classOf[HoodieCommitMetadata])
rows.add(Row(commit.getTimestamp, commitMetadata.fetchTotalBytesWritten, commitMetadata.fetchTotalFilesInsert,
commitMetadata.fetchTotalFilesUpdated, commitMetadata.fetchTotalPartitionsWritten,
commitMetadata.fetchTotalRecordsWritten, commitMetadata.fetchTotalUpdateRecordsWritten,
commitMetadata.fetchTotalWriteErrors))
}

rows.stream().limit(limit).toArray().map(r => r.asInstanceOf[Row]).toList
}

def getTimeDaysAgo(numberOfDays: Int): String = {
val date = Date.from(ZonedDateTime.now.minusDays(numberOfDays).toInstant)
HoodieActiveTimeline.formatDate(date)
}
}

object ShowArchivedCommitsProcedure {
val NAME = "show_archived_commits"

def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new ShowArchivedCommitsProcedure(false)
}
}

object ShowArchivedCommitsMetadataProcedure {
val NAME = "show_archived_commits_metadata"

def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
override def get() = new ShowArchivedCommitsProcedure(true)
}
}

Loading