Skip to content

Commit

Permalink
[HUDI-4097] add table info to jobStatus (apache#5529)
Browse files Browse the repository at this point in the history
Co-authored-by: wqwl611 <wqwl611@gmail.com>
  • Loading branch information
2 people authored and yihua committed Jun 3, 2022
1 parent f906476 commit 87b4263
Show file tree
Hide file tree
Showing 26 changed files with 40 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata met
* @param metadata instance of {@link HoodieCommitMetadata}.
*/
protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table");
context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table: " + config.getTableName());
table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, instantTime,
table.isTableServiceAction(actionType)));
}
Expand Down Expand Up @@ -1038,7 +1038,7 @@ public void dropIndex(List<MetadataPartitionType> partitionTypes) {
HoodieInstant ownerInstant = new HoodieInstant(true, HoodieTimeline.INDEXING_ACTION, dropInstant);
this.txnManager.beginTransaction(Option.of(ownerInstant), Option.empty());
try {
context.setJobStatus(this.getClass().getSimpleName(), "Dropping partitions from metadata table");
context.setJobStatus(this.getClass().getSimpleName(), "Dropping partitions from metadata table: " + config.getTableName());
table.getMetadataWriter(dropInstant).ifPresent(w -> {
try {
((HoodieTableMetadataWriter) w).dropMetadataPartitions(partitionTypes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public List<ValidationOpResult> validateCompactionPlan(HoodieTableMetaClient met
if (plan.getOperations() != null) {
List<CompactionOperation> ops = plan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList());
context.setJobStatus(this.getClass().getSimpleName(), "Validate compaction operations");
context.setJobStatus(this.getClass().getSimpleName(), "Validate compaction operations: " + config.getTableName());
return context.map(ops, op -> {
try {
return validateCompactionOperation(metaClient, compactionInstant, op, Option.of(fsView));
Expand Down Expand Up @@ -351,7 +351,7 @@ private List<RenameOpResult> runRenamingOps(HoodieTableMetaClient metaClient,
} else {
LOG.info("The following compaction renaming operations needs to be performed to un-schedule");
if (!dryRun) {
context.setJobStatus(this.getClass().getSimpleName(), "Execute unschedule operations");
context.setJobStatus(this.getClass().getSimpleName(), "Execute unschedule operations: " + config.getTableName());
return context.map(renameActions, lfPair -> {
try {
LOG.info("RENAME " + lfPair.getLeft().getPath() + " => " + lfPair.getRight().getPath());
Expand Down Expand Up @@ -394,7 +394,7 @@ public List<Pair<HoodieLogFile, HoodieLogFile>> getRenamingActionsForUnschedulin
"Number of Compaction Operations :" + plan.getOperations().size() + " for instant :" + compactionInstant);
List<CompactionOperation> ops = plan.getOperations().stream()
.map(CompactionOperation::convertFromAvroRecordInstance).collect(Collectors.toList());
context.setJobStatus(this.getClass().getSimpleName(), "Generate compaction unscheduling operations");
context.setJobStatus(this.getClass().getSimpleName(), "Generate compaction unscheduling operations: " + config.getTableName());
return context.flatMap(ops, op -> {
try {
return getRenamingActionsForUnschedulingCompactionOperation(metaClient, compactionInstant, op,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -519,7 +519,7 @@ private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants, Hoo
new Path(metaClient.getMetaPath(), archivedInstant.getFileName())
).map(Path::toString).collect(Collectors.toList());

context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants");
context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants: " + config.getTableName());
Map<String, Boolean> resultDeleteInstantFiles = deleteFilesParallelize(metaClient, instantFiles, context, false);

for (Map.Entry<String, Boolean> result : resultDeleteInstantFiles.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public static List<HoodieBaseFile> getLatestBaseFilesForPartition(
public static List<Pair<String, HoodieBaseFile>> getLatestBaseFilesForAllPartitions(final List<String> partitions,
final HoodieEngineContext context,
final HoodieTable hoodieTable) {
context.setJobStatus(HoodieIndexUtils.class.getSimpleName(), "Load latest base files from all partitions");
context.setJobStatus(HoodieIndexUtils.class.getSimpleName(), "Load latest base files from all partitions: " + hoodieTable.getConfig().getTableName());
return context.flatMap(partitions, partitionPath -> {
List<Pair<String, HoodieBaseFile>> filteredFiles =
getLatestBaseFilesForPartition(partitionPath, hoodieTable).stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromFiles(
.map(pair -> Pair.of(pair.getKey(), pair.getValue().getFileId()))
.collect(toList());

context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on)");
context.setJobStatus(this.getClass().getName(), "Obtain key ranges for file slices (range pruning=on): " + config.getTableName());
return context.map(partitionPathFileIDList, pf -> {
try {
HoodieRangeInfoHandle rangeInfoHandle = new HoodieRangeInfoHandle(config, hoodieTable, pf);
Expand Down Expand Up @@ -209,7 +209,7 @@ private List<Pair<String, BloomIndexFileInfo>> getFileInfoForLatestBaseFiles(
protected List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromMetaIndex(
List<String> partitions, final HoodieEngineContext context, final HoodieTable<?, ?, ?, ?> hoodieTable) {
// also obtain file ranges, if range pruning is enabled
context.setJobStatus(this.getClass().getName(), "Load meta index key ranges for file slices");
context.setJobStatus(this.getClass().getName(), "Load meta index key ranges for file slices: " + config.getTableName());

final String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();
return context.flatMap(partitions, partitionName -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,7 @@ protected void cleanIfNecessary(BaseHoodieWriteClient writeClient, String instan
private void initialCommit(String createInstantTime, List<MetadataPartitionType> partitionTypes) {
// List all partitions in the basePath of the containing dataset
LOG.info("Initializing metadata table by using file listings in " + dataWriteConfig.getBasePath());
engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing metadata table by listing files and partitions");
engineContext.setJobStatus(this.getClass().getSimpleName(), "Initializing metadata table by listing files and partitions: " + dataWriteConfig.getTableName());

Map<MetadataPartitionType, HoodieData<HoodieRecord>> partitionToRecordsMap = new HashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,7 +566,7 @@ public void finalizeWrite(HoodieEngineContext context, String instantTs, List<Ho

private void deleteInvalidFilesByPartitions(HoodieEngineContext context, Map<String, List<Pair<String, String>>> invalidFilesByPartition) {
// Now delete partially written files
context.setJobStatus(this.getClass().getSimpleName(), "Delete invalid files generated during the write operation");
context.setJobStatus(this.getClass().getSimpleName(), "Delete invalid files generated during the write operation: " + config.getTableName());
context.map(new ArrayList<>(invalidFilesByPartition.values()), partitionWithFileList -> {
final FileSystem fileSystem = metaClient.getFs();
LOG.info("Deleting invalid data files=" + partitionWithFileList);
Expand Down Expand Up @@ -642,7 +642,7 @@ protected void reconcileAgainstMarkers(HoodieEngineContext context,
}

// Now delete partially written files
context.setJobStatus(this.getClass().getSimpleName(), "Delete all partially written files");
context.setJobStatus(this.getClass().getSimpleName(), "Delete all partially written files: " + config.getTableName());
deleteInvalidFilesByPartitions(context, invalidPathsByPartition);

// Now ensure the deleted files disappear
Expand All @@ -665,7 +665,7 @@ protected void reconcileAgainstMarkers(HoodieEngineContext context,
*/
private void waitForAllFiles(HoodieEngineContext context, Map<String, List<Pair<String, String>>> groupByPartition, FileVisibility visibility) {
// This will either ensure all files to be deleted are present.
context.setJobStatus(this.getClass().getSimpleName(), "Wait for all files to appear/disappear");
context.setJobStatus(this.getClass().getSimpleName(), "Wait for all files to appear/disappear: " + config.getTableName());
boolean checkPassed =
context.map(new ArrayList<>(groupByPartition.entrySet()), partitionWithFileList -> waitForCondition(partitionWithFileList.getKey(),
partitionWithFileList.getValue().stream(), visibility), config.getFinalizeWriteParallelism())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan clean
config.getCleanerParallelism());
LOG.info("Using cleanerParallelism: " + cleanerParallelism);

context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of partitions");
context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of partitions: " + config.getTableName());

Stream<Pair<String, CleanFileInfo>> filesToBeDeletedPerPartition =
cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {
try {
CleanPlanner<T, I, K, O> planner = new CleanPlanner<>(context, table, config);
Option<HoodieInstant> earliestInstant = planner.getEarliestCommitToRetain();
context.setJobStatus(this.getClass().getSimpleName(), "Obtaining list of partitions to be cleaned");
context.setJobStatus(this.getClass().getSimpleName(), "Obtaining list of partitions to be cleaned: " + config.getTableName());
List<String> partitionsToClean = planner.getPartitionPathsToClean(earliestInstant);

if (partitionsToClean.isEmpty()) {
Expand All @@ -107,7 +107,7 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) {
int cleanerParallelism = Math.min(partitionsToClean.size(), config.getCleanerParallelism());
LOG.info("Using cleanerParallelism: " + cleanerParallelism);

context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned");
context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned: " + config.getTableName());

Map<String, Pair<Boolean, List<CleanFileInfo>>> cleanOpsWithPartitionMeta = context
.map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public HoodieWriteMetadata<O> write(String instantTime,
I taggedRecords = dedupedRecords;
if (table.getIndex().requiresTagging(operationType)) {
// perform index loop up to get existing location of records
context.setJobStatus(this.getClass().getSimpleName(), "Tagging");
context.setJobStatus(this.getClass().getSimpleName(), "Tagging: " + table.getConfig().getTableName());
taggedRecords = tag(dedupedRecords, context, table);
}
Duration indexLookupDuration = Duration.between(lookupBegin, Instant.now());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public HoodieData<WriteStatus> compact(
.map(CompactionOperation::convertFromAvroRecordInstance).collect(toList());
LOG.info("Compactor compacting " + operations + " files");

context.setJobStatus(this.getClass().getSimpleName(), "Compacting file slices");
context.setJobStatus(this.getClass().getSimpleName(), "Compacting file slices: " + config.getTableName());
TaskContextSupplier taskContextSupplier = table.getTaskContextSupplier();
return context.parallelize(operations).map(operation -> compact(
compactionHandler, metaClient, config, operation, compactionInstantTime, taskContextSupplier))
Expand Down Expand Up @@ -288,7 +288,7 @@ HoodieCompactionPlan generateCompactionPlan(

SliceView fileSystemView = hoodieTable.getSliceView();
LOG.info("Compaction looking for files to compact in " + partitionPaths + " partitions");
context.setJobStatus(this.getClass().getSimpleName(), "Looking for files to compact");
context.setJobStatus(this.getClass().getSimpleName(), "Looking for files to compact: " + config.getTableName());

List<HoodieCompactionOperation> operations = context.flatMap(partitionPaths, partitionPath -> fileSystemView
.getLatestFileSlices(partitionPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
context, compactionPlan, table, configCopy, instantTime, compactionHandler);

compactor.maybePersist(statuses, config);
context.setJobStatus(this.getClass().getSimpleName(), "Preparing compaction metadata");
context.setJobStatus(this.getClass().getSimpleName(), "Preparing compaction metadata: " + config.getTableName());
List<HoodieWriteStat> updateStatusMap = statuses.map(WriteStatus::getStat).collectAsList();
HoodieCommitMetadata metadata = new HoodieCommitMetadata(true);
for (HoodieWriteStat stat : updateStatusMap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private HoodieCompactionPlan scheduleCompaction() {
.collect(Collectors.toSet());
// exclude files in pending clustering from compaction.
fgInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
context.setJobStatus(this.getClass().getSimpleName(), "Compaction: generating compaction plan");
context.setJobStatus(this.getClass().getSimpleName(), "Compaction: generating compaction plan: " + config.getTableName());
return compactor.generateCompactionPlan(context, table, config, instantTime, fgInPendingCompactionAndClustering);
} catch (IOException e) {
throw new HoodieCompactionException("Could not schedule compaction " + config.getBasePath(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public BaseRollbackHelper(HoodieTableMetaClient metaClient, HoodieWriteConfig co
public List<HoodieRollbackStat> performRollback(HoodieEngineContext context, HoodieInstant instantToRollback,
List<HoodieRollbackRequest> rollbackRequests) {
int parallelism = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
context.setJobStatus(this.getClass().getSimpleName(), "Perform rollback actions");
context.setJobStatus(this.getClass().getSimpleName(), "Perform rollback actions: " + config.getTableName());
// If not for conversion to HoodieRollbackInternalRequests, code fails. Using avro model (HoodieRollbackRequest) within spark.parallelize
// is failing with com.esotericsoftware.kryo.KryoException
// stack trace: https://gist.github.com/nsivabalan/b6359e7d5038484f8043506c8bc9e1c8
Expand All @@ -89,7 +89,7 @@ public List<HoodieRollbackStat> performRollback(HoodieEngineContext context, Hoo
public List<HoodieRollbackStat> collectRollbackStats(HoodieEngineContext context, HoodieInstant instantToRollback,
List<HoodieRollbackRequest> rollbackRequests) {
int parallelism = Math.max(Math.min(rollbackRequests.size(), config.getRollbackParallelism()), 1);
context.setJobStatus(this.getClass().getSimpleName(), "Collect rollback stats for upgrade/downgrade");
context.setJobStatus(this.getClass().getSimpleName(), "Collect rollback stats for upgrade/downgrade: " + config.getTableName());
// If not for conversion to HoodieRollbackInternalRequests, code fails. Using avro model (HoodieRollbackRequest) within spark.parallelize
// is failing with com.esotericsoftware.kryo.KryoException
// stack trace: https://gist.github.com/nsivabalan/b6359e7d5038484f8043506c8bc9e1c8
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public HoodieSavepointMetadata execute() {
ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(instantTime, HoodieTimeline.GREATER_THAN_OR_EQUALS, lastCommitRetained),
"Could not savepoint commit " + instantTime + " as this is beyond the lookup window " + lastCommitRetained);

context.setJobStatus(this.getClass().getSimpleName(), "Collecting latest files for savepoint " + instantTime);
context.setJobStatus(this.getClass().getSimpleName(), "Collecting latest files for savepoint " + instantTime + " " + table.getConfig().getTableName());
List<String> partitions = FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), table.getMetaClient().getBasePath());
Map<String, List<String>> latestFilesMap = context.mapToPair(partitions, partitionPath -> {
// Scan all partitions files with this commit time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public Option<Path> createIfNotExists(String partitionPath, String dataFileName,
*/
public void quietDeleteMarkerDir(HoodieEngineContext context, int parallelism) {
try {
context.setJobStatus(this.getClass().getSimpleName(), "Deleting marker directory");
context.setJobStatus(this.getClass().getSimpleName(), "Deleting marker directory: " + basePath);
deleteMarkerDir(context, parallelism);
} catch (Exception e) {
LOG.warn("Error deleting marker directory for instant " + instantTime, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ public void completeCompaction(
HoodieCommitMetadata metadata,
HoodieTable table,
String compactionCommitTime) {
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction");
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction: " + config.getTableName());
List<HoodieWriteStat> writeStats = metadata.getWriteStats();
final HoodieInstant compactionInstant = new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime);
try {
Expand Down Expand Up @@ -508,7 +508,7 @@ public Map<String, List<String>> getPartitionToReplacedFileIds(
List<String> partitionPaths =
FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), table.getMetaClient().getBasePath());
if (partitionPaths != null && partitionPaths.size() > 0) {
context.setJobStatus(this.getClass().getSimpleName(), "Getting ExistingFileIds of all partitions");
context.setJobStatus(this.getClass().getSimpleName(), "Getting ExistingFileIds of all partitions: " + config.getTableName());
partitionToExistingFileIds = partitionPaths.stream().parallel()
.collect(
Collectors.toMap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ private Map<String, List<SmallFile>> getSmallFilesForPartitions(List<String> par
}

if (partitionPaths != null && partitionPaths.size() > 0) {
context.setJobStatus(this.getClass().getSimpleName(), "Getting small files from partitions");
context.setJobStatus(this.getClass().getSimpleName(), "Getting small files from partitions: " + config.getTableName());
partitionSmallFilesMap = context.mapToPair(partitionPaths,
partitionPath -> new ImmutablePair<>(partitionPath, getSmallFiles(partitionPath)), 0);
}
Expand Down
Loading

0 comments on commit 87b4263

Please sign in to comment.