Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-2537] Fix metadata table for flink #3774

Merged
merged 1 commit into from
Oct 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.hudi.avro.model.HoodieMetadataRecord;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.client.AbstractHoodieWriteClient;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
Expand Down Expand Up @@ -219,7 +220,7 @@ public HoodieBackedTableMetadata metadata() {
*/
protected abstract void initialize(HoodieEngineContext engineContext);

protected void initTableMetadata() {
public void initTableMetadata() {
try {
if (this.metadata != null) {
this.metadata.close();
Expand Down Expand Up @@ -533,4 +534,42 @@ public void close() throws Exception {
* @param instantTime The timestamp to use for the deltacommit.
*/
protected abstract void commit(List<HoodieRecord> records, String partitionName, String instantTime);

/**
* Perform a compaction on the Metadata Table.
*
* Cases to be handled:
* 1. We cannot perform compaction if there are previous inflight operations on the dataset. This is because
* a compacted metadata base file at time Tx should represent all the actions on the dataset till time Tx.
*
* 2. In multi-writer scenario, a parallel operation with a greater instantTime may have completed creating a
* deltacommit.
*/
protected void compactIfNecessary(AbstractHoodieWriteClient writeClient, String instantTime) {
String latestDeltacommitTime = metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant()
.get().getTimestamp();
List<HoodieInstant> pendingInstants = dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
.findInstantsBefore(latestDeltacommitTime).getInstants().collect(Collectors.toList());

if (!pendingInstants.isEmpty()) {
LOG.info(String.format("Cannot compact metadata table as there are %d inflight instants before latest deltacommit %s: %s",
pendingInstants.size(), latestDeltacommitTime, Arrays.toString(pendingInstants.toArray())));
return;
}

// Trigger compaction with suffixes based on the same instant time. This ensures that any future
// delta commits synced over will not have an instant time lesser than the last completed instant on the
// metadata table.
final String compactionInstantTime = latestDeltacommitTime + "001";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @danny0405

I encountered a DateTimeParseException in this method today. This is the error stack when I run a test case TestMetadataTableWithSparkDataSource.testReadability with metrics function enabled.

java.time.format.DateTimeParseException: Text '00000000000000001' could not be parsed: Invalid value for YearOfEra (valid values 1 - 999999999/1000000000): 0

	at java.time.format.DateTimeFormatter.createError(DateTimeFormatter.java:1920)
	at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1855)
	at java.time.LocalDateTime.parse(LocalDateTime.java:492)
	at org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.parseDateFromInstantTime(HoodieInstantTimeGenerator.java:102)
	at org.apache.hudi.common.table.timeline.HoodieActiveTimeline.parseDateFromInstantTime(HoodieActiveTimeline.java:84)
	at org.apache.hudi.client.SparkRDDWriteClient.completeCompaction(SparkRDDWriteClient.java:322)
	at org.apache.hudi.client.SparkRDDWriteClient.completeTableService(SparkRDDWriteClient.java:461)
	at org.apache.hudi.client.SparkRDDWriteClient.compact(SparkRDDWriteClient.java:346)
...

The exception shows there is an illegal date text, so I checked code, found out the illegal date coming from here, where latestDeltacommitTime is "00000000000000", then compactionInstantTime is "00000000000000001", which is an illegal date for HoodieInstantTimeGenerator.parseDateFromInstantTime.

As I understand, the value "00000000000000" in latestDeltacommitTime comes from the metadata table timeline, which is an init timestamp in metadata table.

Screenshot 2022-08-22 at 11 40 03

May I ask if it is a good solution that I disable Hudi metrics in metadataWriteConfig by default? Do you have a better idea?

There is a similar issue I found before in PR: #6000

Copy link
Contributor Author

@danny0405 danny0405 Aug 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid this is a bug, can you fire a fix here ? Say, disable/skip the metrics sending when we the instant time starts with 00000000000000.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, got it, let me fix it here

what do you think if I replace the timestamp as HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS (the value is "00000000000001") if latestDeltacommitTime.equals("00000000000000")? As in PR #6000, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS has been handled correctly.

e.g.

if (latestDeltacommitTime.equals(HoodieTimeline.INIT_INSTANT_TS)) {
  compactionInstantTime = HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS;
} else {
  compactionInstantTime = latestDeltacommitTime + "001";
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for this way.

if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty())) {
writeClient.compact(compactionInstantTime);
}
}

protected void doClean(AbstractHoodieWriteClient writeClient, String instantTime) {
// Trigger cleaning with suffixes based on the same instant time. This ensures that any future
// delta commits synced over will not have an instant time lesser than the last completed instant on the
// metadata table.
writeClient.clean(instantTime + "002");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.hudi.client;

import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
Expand Down Expand Up @@ -51,6 +50,8 @@
import org.apache.hudi.io.FlinkMergeHandle;
import org.apache.hudi.io.HoodieWriteHandle;
import org.apache.hudi.io.MiniBatchHandle;
import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter;
import org.apache.hudi.table.BulkInsertPartitioner;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;
Expand Down Expand Up @@ -86,24 +87,18 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
* FileID to write handle mapping in order to record the write handles for each file group,
* so that we can append the mini-batch data buffer incrementally.
*/
private Map<String, HoodieWriteHandle<?, ?, ?, ?>> bucketToHandles;
private final Map<String, HoodieWriteHandle<?, ?, ?, ?>> bucketToHandles;

public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) {
this(context, clientConfig, false);
}
/**
* Cached metadata writer for coordinator to reuse for each commit.
*/
private Option<HoodieBackedTableMetadataWriter> metadataWriterOption = Option.empty();

@Deprecated
public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending) {
public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) {
super(context, writeConfig);
this.bucketToHandles = new HashMap<>();
}

@Deprecated
public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending,
Option<EmbeddedTimelineService> timelineService) {
super(context, writeConfig, timelineService);
}

/**
* Complete changes performed at the given instantTime marker with specified action.
*/
Expand Down Expand Up @@ -260,6 +255,24 @@ protected void preWrite(String instantTime, WriteOperationType writeOperationTyp
// remove the async cleaning
}

@Override
protected void preCommit(String instantTime, HoodieCommitMetadata metadata) {
this.metadataWriterOption.ifPresent(w -> {
w.initTableMetadata(); // refresh the timeline
w.update(metadata, instantTime);
});
}

/**
* Initialize the table metadata writer, for e.g, bootstrap the metadata table
* from the filesystem if it does not exist.
*/
public void initMetadataWriter() {
HoodieBackedTableMetadataWriter metadataWriter = (HoodieBackedTableMetadataWriter) FlinkHoodieBackedTableMetadataWriter.create(
FlinkClientUtil.getHadoopConf(), this.config, HoodieFlinkEngineContext.DEFAULT);
this.metadataWriterOption = Option.of(metadataWriter);
}

/**
* Starts async cleaning service for finished commits.
*
Expand Down Expand Up @@ -347,6 +360,8 @@ public void completeCompaction(
String compactionCommitTime) {
this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction");
List<HoodieWriteStat> writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList());
writeTableMetadata(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime));
// commit to data table after committing to metadata table.
finalizeWrite(table, compactionCommitTime, writeStats);
LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata);
FlinkCompactHelpers.newInstance().completeInflightCompaction(table, compactionCommitTime, metadata);
Expand Down Expand Up @@ -381,6 +396,19 @@ public HoodieWriteMetadata<List<WriteStatus>> cluster(final String clusteringIns
throw new HoodieNotSupportedException("Clustering is not supported yet");
}

private void writeTableMetadata(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
HoodieCommitMetadata commitMetadata,
HoodieInstant hoodieInstant) {
try {
this.txnManager.beginTransaction(Option.of(hoodieInstant), Option.empty());
// Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
// single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp()));
} finally {
this.txnManager.endTransaction();
}
}

@Override
protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getTableAndInitCtx(WriteOperationType operationType, String instantTime) {
HoodieTableMetaClient metaClient = createMetaClient(true);
Expand Down Expand Up @@ -478,6 +506,7 @@ private HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>
} else {
writeTimer = metrics.getDeltaCommitCtx();
}
table.getHoodieView().sync();
return table;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,17 @@

import org.apache.hudi.client.HoodieFlinkWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.table.HoodieFlinkTable;
import org.apache.hudi.table.HoodieTable;

import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.LogManager;
Expand Down Expand Up @@ -86,82 +79,61 @@ protected void initialize(HoodieEngineContext engineContext) {
@Override
protected void commit(List<HoodieRecord> records, String partitionName, String instantTime) {
ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled");
List<HoodieRecord> recordRDD = prepRecords(records, partitionName);
List<HoodieRecord> recordList = prepRecords(records, partitionName, 1);

try (HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient(engineContext, metadataWriteConfig, true)) {
writeClient.startCommitWithTime(instantTime);
writeClient.transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instantTime);
try (HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient(engineContext, metadataWriteConfig)) {
if (!metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(instantTime)) {
// if this is a new commit being applied to metadata for the first time
writeClient.startCommitWithTime(instantTime);
writeClient.transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instantTime);
} else {
// this code path refers to a re-attempted commit that got committed to metadata table, but failed in datatable.
// for eg, lets say compaction c1 on 1st attempt succeeded in metadata table and failed before committing to datatable.
// when retried again, data table will first rollback pending compaction. these will be applied to metadata table, but all changes
// are upserts to metadata table and so only a new delta commit will be created.
// once rollback is complete, compaction will be retried again, which will eventually hit this code block where the respective commit is
// already part of completed commit. So, we have to manually remove the completed instant and proceed.
// and it is for the same reason we enabled withAllowMultiWriteOnSameInstant for metadata table.
HoodieInstant alreadyCompletedInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.getTimestamp().equals(instantTime)).lastInstant().get();
HoodieActiveTimeline.deleteInstantFile(metadataMetaClient.getFs(), metadataMetaClient.getMetaPath(), alreadyCompletedInstant);
metadataMetaClient.reloadActiveTimeline();
}

List<WriteStatus> statuses = writeClient.upsertPreppedRecords(recordRDD, instantTime);
List<WriteStatus> statuses = records.size() > 0
? writeClient.upsertPreppedRecords(recordList, instantTime)
: Collections.emptyList();
statuses.forEach(writeStatus -> {
if (writeStatus.hasErrors()) {
throw new HoodieMetadataException("Failed to commit metadata table records at instant " + instantTime);
}
});
// flink does not support auto-commit yet, also the auto commit logic is not complete as AbstractHoodieWriteClient now.
writeClient.commit(instantTime, statuses, Option.empty(), HoodieActiveTimeline.DELTA_COMMIT_ACTION, Collections.emptyMap());
// trigger cleaning, compaction, with suffixes based on the same instant time. This ensures that any future
// delta commits synced over will not have an instant time lesser than the last completed instant on the
// metadata table.
if (writeClient.scheduleCompactionAtInstant(instantTime + "001", Option.empty())) {
writeClient.compact(instantTime + "001");
}
writeClient.clean(instantTime + "002");

// reload timeline
metadataMetaClient.reloadActiveTimeline();
compactIfNecessary(writeClient, instantTime);
doClean(writeClient, instantTime);
}

// Update total size of the metadata and count of base/log files
metrics.ifPresent(m -> {
try {
m.updateSizeMetrics(metadataMetaClient, metadata);
} catch (HoodieIOException e) {
LOG.error("Could not publish metadata size metrics", e);
}
});
metrics.ifPresent(m -> m.updateSizeMetrics(metadataMetaClient, metadata));
}

/**
* Tag each record with the location.
* <p>
* Since we only read the latest base file in a partition, we tag the records with the instant time of the latest
* base file.
* Tag each record with the location in the given partition.
*
* The record is tagged with respective file slice's location based on its record key.
*/
private List<HoodieRecord> prepRecords(List<HoodieRecord> records, String partitionName) {
HoodieTable table = HoodieFlinkTable.create(metadataWriteConfig, (HoodieFlinkEngineContext) engineContext);
TableFileSystemView.SliceView fsView = table.getSliceView();
List<HoodieBaseFile> baseFiles = fsView.getLatestFileSlices(partitionName)
.map(FileSlice::getBaseFile)
.filter(Option::isPresent)
.map(Option::get)
.collect(Collectors.toList());

// All the metadata fits within a single base file
if (partitionName.equals(MetadataPartitionType.FILES.partitionPath())) {
if (baseFiles.size() > 1) {
throw new HoodieMetadataException("Multiple base files found in metadata partition");
}
}

String fileId;
String instantTime;
if (!baseFiles.isEmpty()) {
fileId = baseFiles.get(0).getFileId();
instantTime = "U";
} else {
// If there is a log file then we can assume that it has the data
List<HoodieLogFile> logFiles = fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath())
.map(FileSlice::getLatestLogFile)
.filter(Option::isPresent)
.map(Option::get)
.collect(Collectors.toList());
if (logFiles.isEmpty()) {
// No base and log files. All are new inserts
fileId = FSUtils.createNewFileIdPfx();
instantTime = "I";
} else {
fileId = logFiles.get(0).getFileId();
instantTime = "U";
}
}

return records.stream().map(r -> r.setCurrentLocation(new HoodieRecordLocation(instantTime, fileId))).collect(Collectors.toList());
private List<HoodieRecord> prepRecords(List<HoodieRecord> records, String partitionName, int numFileGroups) {
List<FileSlice> fileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName);
ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups));

return records.stream().map(r -> {
FileSlice slice = fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(), numFileGroups));
final String instantTime = slice.isEmpty() ? "I" : "U";
r.setCurrentLocation(new HoodieRecordLocation(instantTime, slice.getFileId()));
return r;
}).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,25 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.index.FlinkHoodieIndex;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;

import org.apache.hadoop.fs.Path;

import java.io.IOException;
import java.util.List;

public abstract class HoodieFlinkTable<T extends HoodieRecordPayload>
extends HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>>
implements ExplicitWriteHandleTable<T> {

private boolean isMetadataAvailabilityUpdated = false;
private boolean isMetadataTableAvailable;

protected HoodieFlinkTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) {
super(config, context, metaClient);
}
Expand Down Expand Up @@ -66,4 +77,31 @@ public static <T extends HoodieRecordPayload> HoodieFlinkTable<T> create(HoodieW
protected HoodieIndex<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> getIndex(HoodieWriteConfig config, HoodieEngineContext context) {
return FlinkHoodieIndex.createIndex((HoodieFlinkEngineContext) context, config);
}

/**
* Fetch instance of {@link HoodieTableMetadataWriter}.
*
* @return instance of {@link HoodieTableMetadataWriter}
*/
@Override
public Option<HoodieTableMetadataWriter> getMetadataWriter() {
synchronized (this) {
if (!isMetadataAvailabilityUpdated) {
// this code assumes that if metadata availability is updated once it will not change. please revisit this logic if that's not the case.
// this is done to avoid repeated calls to fs.exists().
try {
isMetadataTableAvailable = config.isMetadataTableEnabled()
&& metaClient.getFs().exists(new Path(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())));
} catch (IOException e) {
throw new HoodieMetadataException("Checking existence of metadata table failed", e);
}
isMetadataAvailabilityUpdated = true;
}
}
if (isMetadataTableAvailable) {
return Option.of(FlinkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context));
} else {
return Option.empty();
}
}
}
Loading