Skip to content

Commit

Permalink
[HUDI-4287] Optimize Flink checkpoint meta mechanism to fix mistaken …
Browse files Browse the repository at this point in the history
…pending instants
  • Loading branch information
chenshzh committed Jun 29, 2022
1 parent 09dc001 commit 0125f05
Show file tree
Hide file tree
Showing 11 changed files with 386 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.hudi.sink.meta;
package org.apache.hudi.metadata;

import org.apache.hudi.common.util.ValidationUtils;

Expand Down Expand Up @@ -73,6 +73,10 @@ public boolean isComplete() {
return State.COMPLETED == this.state;
}

public boolean isCancelled() {
return State.CANCELLED == this.state;
}

public boolean isInflight() {
return State.INFLIGHT == this.state;
}
Expand Down Expand Up @@ -103,7 +107,10 @@ public enum State {
// than COMPLETED
ABORTED,
// Committed instant
COMPLETED
COMPLETED,
// In some conditions like rollback, instant may be deleted which means it has been dropped.
// So we need to mark it as CANCELLED with the highest priority
CANCELLED
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@
* limitations under the License.
*/

package org.apache.hudi.sink.meta;
package org.apache.hudi.metadata;

import org.apache.flink.configuration.Configuration;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.exception.HoodieException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
Expand All @@ -39,6 +38,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* The checkpoint metadata for bookkeeping the checkpoint messages.
Expand Down Expand Up @@ -67,20 +67,28 @@ public class CkpMetadata implements Serializable {
private static final String CKP_META = "ckp_meta";

private final FileSystem fs;
private final Path basePath;
protected final Path path;

private List<CkpMessage> messages;
private List<String> instantCache;

private CkpMetadata(Configuration config) {
this(FSUtils.getFs(config.getString(FlinkOptions.PATH), HadoopConfigurations.getHadoopConf(config)), config.getString(FlinkOptions.PATH));
private CkpMetadata(String basePath, Configuration hadoopConf) {
this(FSUtils.getFs(basePath, hadoopConf), basePath);
}

private CkpMetadata(FileSystem fs, String basePath) {
this.fs = fs;
this.basePath = new Path(basePath);
this.path = new Path(ckpMetaPath(basePath));
}

// for the fist time initialization in driver
private CkpMetadata(HoodieTableMetaClient metaClient) throws IOException {
this(metaClient.getFs(), metaClient.getBasePath());
bootstrap(metaClient);
}

public void close() {
this.instantCache = null;
}
Expand All @@ -94,9 +102,11 @@ public void close() {
*
* <p>This expects to be called by the driver.
*/
public void bootstrap(HoodieTableMetaClient metaClient) throws IOException {
private void bootstrap(HoodieTableMetaClient metaClient) throws IOException {
fs.delete(path, true);
fs.mkdirs(path);
metaClient.getActiveTimeline().reload().getCommitsTimeline().filterPendingExcludingCompaction()
.lastInstant().ifPresent(instant -> startInstant(instant.getTimestamp()));
}

public void startInstant(String instant) {
Expand Down Expand Up @@ -158,6 +168,18 @@ public void abortInstant(String instant) {
}
}

/**
* Add a cancelled checkpoint message for a deleted instant.
*/
public void deleteInstant(String instant) {
Path path = fullPath(CkpMessage.getFileName(instant, CkpMessage.State.CANCELLED));
try {
fs.createNewFile(path);
} catch (IOException e) {
throw new HoodieException("Exception while adding checkpoint delete metadata for instant: " + instant);
}
}

// -------------------------------------------------------------------------
// READ METHODS
// -------------------------------------------------------------------------
Expand All @@ -175,8 +197,22 @@ public String lastPendingInstant() {
load();
for (int i = this.messages.size() - 1; i >= 0; i--) {
CkpMessage ckpMsg = this.messages.get(i);
// consider 'aborted' as pending too to reuse the instant
if (!ckpMsg.isComplete()) {
if (validatePendingInstant(ckpMsg, false)) {
return ckpMsg.getInstant();
}
}
return null;
}

/**
* For some hadoop compatible systems, maybe we should supply an eagerly alligned choice for user
*/
@Nullable
public String latestPendingInstant() {
load();
for (int i = this.messages.size() - 1; i >= 0; i--) {
CkpMessage ckpMsg = this.messages.get(i);
if (validatePendingInstant(ckpMsg, true)) {
return ckpMsg.getInstant();
}
}
Expand All @@ -196,18 +232,42 @@ public boolean isAborted(String instant) {
// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
public static CkpMetadata getInstance(Configuration config) {
return new CkpMetadata(config);
public static CkpMetadata getInstance(String basePath, Configuration hadoopConf) {
return new CkpMetadata(basePath, hadoopConf);
}

public static CkpMetadata getInstance(FileSystem fs, String basePath) {
return new CkpMetadata(fs, basePath);
}

// for the fist time initialization in driver
public static CkpMetadata getInstanceAtFirstTime(HoodieTableMetaClient metaClient)
throws IOException {
return new CkpMetadata(metaClient);
}

protected static String ckpMetaPath(String basePath) {
return basePath + Path.SEPARATOR + HoodieTableMetaClient.AUXILIARYFOLDER_NAME + Path.SEPARATOR + CKP_META;
}

private static List<Path> getRequestOrInflightCommitFile(String instantTime, Path metaFolder) {
return Stream.of(
HoodieTimeline.makeRequestedCommitFileName(instantTime),
HoodieTimeline.makeInflightCommitFileName(instantTime),
HoodieTimeline.makeRequestedDeltaFileName(instantTime),
HoodieTimeline.makeInflightDeltaFileName(instantTime))
.map(fileName -> new Path(metaFolder, fileName))
.collect(Collectors.toList());
}

private static List<Path> getCompleteCommitFile(String instantTime, Path metaFolder) {
return Stream.of(
HoodieTimeline.makeCommitFileName(instantTime),
HoodieTimeline.makeDeltaFileName(instantTime))
.map(fileName -> new Path(metaFolder, fileName))
.collect(Collectors.toList());
}

private Path fullPath(String fileName) {
return new Path(path, fileName);
}
Expand All @@ -224,4 +284,46 @@ private List<CkpMessage> scanCkpMetadata(Path ckpMetaPath) throws IOException {
}).get())
.sorted().collect(Collectors.toList());
}

/**
* alligned eagerly means that we will fast validate timeline instant state
* by file rather than by timeline to avoid fs scan
*/
private boolean validatePendingInstant(CkpMessage ckpMsg, boolean allignedEagerly) {
// consider 'aborted' as pending too to reuse the instant
if (!ckpMsg.isComplete() && !ckpMsg.isCancelled()) {
if (allignedEagerly) {
String pendingInstant = ckpMsg.getInstant();
Path metaPathDir = new Path(this.basePath, HoodieTableMetaClient.METAFOLDER_NAME);
boolean isCompleted =
getCompleteCommitFile(pendingInstant, metaPathDir).stream()
.anyMatch(this::checkFileExists);
boolean isRequestedOrInflight =
getRequestOrInflightCommitFile(pendingInstant, metaPathDir).stream()
.anyMatch(this::checkFileExists);
boolean isValid = !isCompleted && isRequestedOrInflight;
boolean notExist = !isCompleted && !isRequestedOrInflight;
if (notExist) {
throw new HoodieException(
String.format(
"ckMsg is: %s but timeline instant: %s doesn't exist, isRequestedOrInflight: %s, isCompleted: %s",
ckpMsg, pendingInstant, isRequestedOrInflight, isCompleted));
}

return isValid;
}

return true;
}

return false;
}

private boolean checkFileExists(Path fullPath) {
try {
return this.fs.isFile(fullPath);
} catch (IOException e) {
throw new HoodieException("Exception while checking instant file existence", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@
import org.apache.hudi.table.action.commit.FlinkUpsertCommitActionExecutor;
import org.apache.hudi.table.action.commit.FlinkUpsertPreppedCommitActionExecutor;
import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
import org.apache.hudi.table.action.rollback.CopyOnWriteRollbackActionExecutor;
import org.apache.hudi.table.action.rollback.FlinkCopyOnWriteRollbackActionExecutor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -331,7 +332,7 @@ public HoodieCleanMetadata clean(HoodieEngineContext context, String cleanInstan
@Override
public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant,
boolean deleteInstants, boolean skipLocking) {
return new CopyOnWriteRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants, skipLocking).execute();
return new FlinkCopyOnWriteRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants, skipLocking).execute();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.hudi.table.action.compact.RunCompactionActionExecutor;
import org.apache.hudi.table.action.compact.ScheduleCompactionActionExecutor;
import org.apache.hudi.table.action.rollback.BaseRollbackPlanActionExecutor;
import org.apache.hudi.table.action.rollback.MergeOnReadRollbackActionExecutor;
import org.apache.hudi.table.action.rollback.FlinkMergeOnReadRollbackActionExecutor;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -128,7 +128,7 @@ public Option<HoodieRollbackPlan> scheduleRollback(HoodieEngineContext context,
@Override
public HoodieRollbackMetadata rollback(HoodieEngineContext context, String rollbackInstantTime, HoodieInstant commitInstant,
boolean deleteInstants, boolean skipLocking) {
return new MergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants,
return new FlinkMergeOnReadRollbackActionExecutor(context, config, this, rollbackInstantTime, commitInstant, deleteInstants,
skipLocking).execute();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.table.action.rollback;

import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.CkpMetadata;
import org.apache.hudi.table.HoodieTable;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

public class FlinkCopyOnWriteRollbackActionExecutor<T extends HoodieRecordPayload, I, K, O>
extends CopyOnWriteRollbackActionExecutor<T, I, K, O> {

private static final Logger LOG =
LogManager.getLogger(FlinkCopyOnWriteRollbackActionExecutor.class);

public FlinkCopyOnWriteRollbackActionExecutor(
HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, I, K, O> table,
String instantTime,
HoodieInstant commitInstant,
boolean deleteInstants,
boolean skipLocking) {

super(context, config, table, instantTime, commitInstant, deleteInstants, skipLocking);
}

public FlinkCopyOnWriteRollbackActionExecutor(
HoodieEngineContext context,
HoodieWriteConfig config,
HoodieTable<T, I, K, O> table,
String instantTime,
HoodieInstant commitInstant,
boolean deleteInstants,
boolean skipTimelinePublish,
boolean useMarkerBasedStrategy,
boolean skipLocking) {
super(
context,
config,
table,
instantTime,
commitInstant,
deleteInstants,
skipTimelinePublish,
useMarkerBasedStrategy,
skipLocking);
}

@Override
protected void deleteInflightAndRequestedInstant(
boolean deleteInstant,
HoodieActiveTimeline activeTimeline,
HoodieInstant instantToBeDeleted) {
super.deleteInflightAndRequestedInstant(deleteInstant, activeTimeline, instantToBeDeleted);
// resolvedInstant uncompleted means that we will delete it during rollback
if (deleteInstant && !instantToBeDeleted.isCompleted()) {
CkpMetadata ckpMetadata =
CkpMetadata.getInstance(
table.getMetaClient().getFs(), table.getMetaClient().getBasePath());
ckpMetadata.deleteInstant(instantToBeDeleted.getTimestamp());
LOG.info("delete checkpoint metadata for for rollback instant: " + instantToBeDeleted);
}
}
}
Loading

0 comments on commit 0125f05

Please sign in to comment.