diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 19e9d313e3e9a..ceac9eb2cd10f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -22,6 +22,7 @@ import org.apache.hudi.avro.model.HoodieMetadataRecord; import org.apache.hudi.avro.model.HoodieRestoreMetadata; import org.apache.hudi.avro.model.HoodieRollbackMetadata; +import org.apache.hudi.client.AbstractHoodieWriteClient; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.engine.HoodieEngineContext; @@ -219,7 +220,7 @@ public HoodieBackedTableMetadata metadata() { */ protected abstract void initialize(HoodieEngineContext engineContext); - protected void initTableMetadata() { + public void initTableMetadata() { try { if (this.metadata != null) { this.metadata.close(); @@ -533,4 +534,42 @@ public void close() throws Exception { * @param instantTime The timestamp to use for the deltacommit. */ protected abstract void commit(List records, String partitionName, String instantTime); + + /** + * Perform a compaction on the Metadata Table. + * + * Cases to be handled: + * 1. We cannot perform compaction if there are previous inflight operations on the dataset. This is because + * a compacted metadata base file at time Tx should represent all the actions on the dataset till time Tx. + * + * 2. In multi-writer scenario, a parallel operation with a greater instantTime may have completed creating a + * deltacommit. + */ + protected void compactIfNecessary(AbstractHoodieWriteClient writeClient, String instantTime) { + String latestDeltacommitTime = metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant() + .get().getTimestamp(); + List pendingInstants = dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested() + .findInstantsBefore(latestDeltacommitTime).getInstants().collect(Collectors.toList()); + + if (!pendingInstants.isEmpty()) { + LOG.info(String.format("Cannot compact metadata table as there are %d inflight instants before latest deltacommit %s: %s", + pendingInstants.size(), latestDeltacommitTime, Arrays.toString(pendingInstants.toArray()))); + return; + } + + // Trigger compaction with suffixes based on the same instant time. This ensures that any future + // delta commits synced over will not have an instant time lesser than the last completed instant on the + // metadata table. + final String compactionInstantTime = latestDeltacommitTime + "001"; + if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty())) { + writeClient.compact(compactionInstantTime); + } + } + + protected void doClean(AbstractHoodieWriteClient writeClient, String instantTime) { + // Trigger cleaning with suffixes based on the same instant time. This ensures that any future + // delta commits synced over will not have an instant time lesser than the last completed instant on the + // metadata table. + writeClient.clean(instantTime + "002"); + } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 669be164b4d81..e95b0f823d181 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -19,7 +19,6 @@ package org.apache.hudi.client; import org.apache.hudi.client.common.HoodieFlinkEngineContext; -import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; @@ -51,6 +50,8 @@ import org.apache.hudi.io.FlinkMergeHandle; import org.apache.hudi.io.HoodieWriteHandle; import org.apache.hudi.io.MiniBatchHandle; +import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter; +import org.apache.hudi.metadata.HoodieBackedTableMetadataWriter; import org.apache.hudi.table.BulkInsertPartitioner; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.HoodieTable; @@ -86,24 +87,18 @@ public class HoodieFlinkWriteClient extends * FileID to write handle mapping in order to record the write handles for each file group, * so that we can append the mini-batch data buffer incrementally. */ - private Map> bucketToHandles; + private final Map> bucketToHandles; - public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig clientConfig) { - this(context, clientConfig, false); - } + /** + * Cached metadata writer for coordinator to reuse for each commit. + */ + private Option metadataWriterOption = Option.empty(); - @Deprecated - public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending) { + public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig) { super(context, writeConfig); this.bucketToHandles = new HashMap<>(); } - @Deprecated - public HoodieFlinkWriteClient(HoodieEngineContext context, HoodieWriteConfig writeConfig, boolean rollbackPending, - Option timelineService) { - super(context, writeConfig, timelineService); - } - /** * Complete changes performed at the given instantTime marker with specified action. */ @@ -260,6 +255,24 @@ protected void preWrite(String instantTime, WriteOperationType writeOperationTyp // remove the async cleaning } + @Override + protected void preCommit(String instantTime, HoodieCommitMetadata metadata) { + this.metadataWriterOption.ifPresent(w -> { + w.initTableMetadata(); // refresh the timeline + w.update(metadata, instantTime); + }); + } + + /** + * Initialize the table metadata writer, for e.g, bootstrap the metadata table + * from the filesystem if it does not exist. + */ + public void initMetadataWriter() { + HoodieBackedTableMetadataWriter metadataWriter = (HoodieBackedTableMetadataWriter) FlinkHoodieBackedTableMetadataWriter.create( + FlinkClientUtil.getHadoopConf(), this.config, HoodieFlinkEngineContext.DEFAULT); + this.metadataWriterOption = Option.of(metadataWriter); + } + /** * Starts async cleaning service for finished commits. * @@ -347,6 +360,8 @@ public void completeCompaction( String compactionCommitTime) { this.context.setJobStatus(this.getClass().getSimpleName(), "Collect compaction write status and commit compaction"); List writeStats = writeStatuses.stream().map(WriteStatus::getStat).collect(Collectors.toList()); + writeTableMetadata(table, metadata, new HoodieInstant(HoodieInstant.State.INFLIGHT, HoodieTimeline.COMPACTION_ACTION, compactionCommitTime)); + // commit to data table after committing to metadata table. finalizeWrite(table, compactionCommitTime, writeStats); LOG.info("Committing Compaction {} finished with result {}.", compactionCommitTime, metadata); FlinkCompactHelpers.newInstance().completeInflightCompaction(table, compactionCommitTime, metadata); @@ -381,6 +396,19 @@ public HoodieWriteMetadata> cluster(final String clusteringIns throw new HoodieNotSupportedException("Clustering is not supported yet"); } + private void writeTableMetadata(HoodieTable>, List, List> table, + HoodieCommitMetadata commitMetadata, + HoodieInstant hoodieInstant) { + try { + this.txnManager.beginTransaction(Option.of(hoodieInstant), Option.empty()); + // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a + // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. + table.getMetadataWriter().ifPresent(w -> w.update(commitMetadata, hoodieInstant.getTimestamp())); + } finally { + this.txnManager.endTransaction(); + } + } + @Override protected HoodieTable>, List, List> getTableAndInitCtx(WriteOperationType operationType, String instantTime) { HoodieTableMetaClient metaClient = createMetaClient(true); @@ -478,6 +506,7 @@ private HoodieTable>, List, List } else { writeTimer = metrics.getDeltaCommitCtx(); } + table.getHoodieView().sync(); return table; } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index 18a19603bb3d0..634eabaf9765c 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -20,24 +20,17 @@ import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.model.FileSlice; -import org.apache.hudi.common.model.HoodieBaseFile; -import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; -import org.apache.hudi.common.table.view.TableFileSystemView; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieMetadataException; -import org.apache.hudi.table.HoodieFlinkTable; -import org.apache.hudi.table.HoodieTable; import org.apache.hadoop.conf.Configuration; import org.apache.log4j.LogManager; @@ -86,82 +79,61 @@ protected void initialize(HoodieEngineContext engineContext) { @Override protected void commit(List records, String partitionName, String instantTime) { ValidationUtils.checkState(enabled, "Metadata table cannot be committed to as it is not enabled"); - List recordRDD = prepRecords(records, partitionName); + List recordList = prepRecords(records, partitionName, 1); - try (HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient(engineContext, metadataWriteConfig, true)) { - writeClient.startCommitWithTime(instantTime); - writeClient.transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instantTime); + try (HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient(engineContext, metadataWriteConfig)) { + if (!metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(instantTime)) { + // if this is a new commit being applied to metadata for the first time + writeClient.startCommitWithTime(instantTime); + writeClient.transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instantTime); + } else { + // this code path refers to a re-attempted commit that got committed to metadata table, but failed in datatable. + // for eg, lets say compaction c1 on 1st attempt succeeded in metadata table and failed before committing to datatable. + // when retried again, data table will first rollback pending compaction. these will be applied to metadata table, but all changes + // are upserts to metadata table and so only a new delta commit will be created. + // once rollback is complete, compaction will be retried again, which will eventually hit this code block where the respective commit is + // already part of completed commit. So, we have to manually remove the completed instant and proceed. + // and it is for the same reason we enabled withAllowMultiWriteOnSameInstant for metadata table. + HoodieInstant alreadyCompletedInstant = metadataMetaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> entry.getTimestamp().equals(instantTime)).lastInstant().get(); + HoodieActiveTimeline.deleteInstantFile(metadataMetaClient.getFs(), metadataMetaClient.getMetaPath(), alreadyCompletedInstant); + metadataMetaClient.reloadActiveTimeline(); + } - List statuses = writeClient.upsertPreppedRecords(recordRDD, instantTime); + List statuses = records.size() > 0 + ? writeClient.upsertPreppedRecords(recordList, instantTime) + : Collections.emptyList(); statuses.forEach(writeStatus -> { if (writeStatus.hasErrors()) { throw new HoodieMetadataException("Failed to commit metadata table records at instant " + instantTime); } }); + // flink does not support auto-commit yet, also the auto commit logic is not complete as AbstractHoodieWriteClient now. writeClient.commit(instantTime, statuses, Option.empty(), HoodieActiveTimeline.DELTA_COMMIT_ACTION, Collections.emptyMap()); - // trigger cleaning, compaction, with suffixes based on the same instant time. This ensures that any future - // delta commits synced over will not have an instant time lesser than the last completed instant on the - // metadata table. - if (writeClient.scheduleCompactionAtInstant(instantTime + "001", Option.empty())) { - writeClient.compact(instantTime + "001"); - } - writeClient.clean(instantTime + "002"); + + // reload timeline + metadataMetaClient.reloadActiveTimeline(); + compactIfNecessary(writeClient, instantTime); + doClean(writeClient, instantTime); } // Update total size of the metadata and count of base/log files - metrics.ifPresent(m -> { - try { - m.updateSizeMetrics(metadataMetaClient, metadata); - } catch (HoodieIOException e) { - LOG.error("Could not publish metadata size metrics", e); - } - }); + metrics.ifPresent(m -> m.updateSizeMetrics(metadataMetaClient, metadata)); } /** - * Tag each record with the location. - *

- * Since we only read the latest base file in a partition, we tag the records with the instant time of the latest - * base file. + * Tag each record with the location in the given partition. + * + * The record is tagged with respective file slice's location based on its record key. */ - private List prepRecords(List records, String partitionName) { - HoodieTable table = HoodieFlinkTable.create(metadataWriteConfig, (HoodieFlinkEngineContext) engineContext); - TableFileSystemView.SliceView fsView = table.getSliceView(); - List baseFiles = fsView.getLatestFileSlices(partitionName) - .map(FileSlice::getBaseFile) - .filter(Option::isPresent) - .map(Option::get) - .collect(Collectors.toList()); - - // All the metadata fits within a single base file - if (partitionName.equals(MetadataPartitionType.FILES.partitionPath())) { - if (baseFiles.size() > 1) { - throw new HoodieMetadataException("Multiple base files found in metadata partition"); - } - } - - String fileId; - String instantTime; - if (!baseFiles.isEmpty()) { - fileId = baseFiles.get(0).getFileId(); - instantTime = "U"; - } else { - // If there is a log file then we can assume that it has the data - List logFiles = fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath()) - .map(FileSlice::getLatestLogFile) - .filter(Option::isPresent) - .map(Option::get) - .collect(Collectors.toList()); - if (logFiles.isEmpty()) { - // No base and log files. All are new inserts - fileId = FSUtils.createNewFileIdPfx(); - instantTime = "I"; - } else { - fileId = logFiles.get(0).getFileId(); - instantTime = "U"; - } - } - - return records.stream().map(r -> r.setCurrentLocation(new HoodieRecordLocation(instantTime, fileId))).collect(Collectors.toList()); + private List prepRecords(List records, String partitionName, int numFileGroups) { + List fileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName); + ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups)); + + return records.stream().map(r -> { + FileSlice slice = fileSlices.get(HoodieTableMetadataUtil.mapRecordKeyToFileGroupIndex(r.getRecordKey(), numFileGroups)); + final String instantTime = slice.isEmpty() ? "I" : "U"; + r.setCurrentLocation(new HoodieRecordLocation(instantTime, slice.getFileId())); + return r; + }).collect(Collectors.toList()); } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java index 3e26025c258bb..ce63a2dd009ad 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java @@ -29,14 +29,25 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieMetadataException; import org.apache.hudi.index.FlinkHoodieIndex; import org.apache.hudi.index.HoodieIndex; +import org.apache.hudi.metadata.FlinkHoodieBackedTableMetadataWriter; +import org.apache.hudi.metadata.HoodieTableMetadata; +import org.apache.hudi.metadata.HoodieTableMetadataWriter; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; import java.util.List; public abstract class HoodieFlinkTable extends HoodieTable>, List, List> implements ExplicitWriteHandleTable { + + private boolean isMetadataAvailabilityUpdated = false; + private boolean isMetadataTableAvailable; + protected HoodieFlinkTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) { super(config, context, metaClient); } @@ -66,4 +77,31 @@ public static HoodieFlinkTable create(HoodieW protected HoodieIndex>, List, List> getIndex(HoodieWriteConfig config, HoodieEngineContext context) { return FlinkHoodieIndex.createIndex((HoodieFlinkEngineContext) context, config); } + + /** + * Fetch instance of {@link HoodieTableMetadataWriter}. + * + * @return instance of {@link HoodieTableMetadataWriter} + */ + @Override + public Option getMetadataWriter() { + synchronized (this) { + if (!isMetadataAvailabilityUpdated) { + // this code assumes that if metadata availability is updated once it will not change. please revisit this logic if that's not the case. + // this is done to avoid repeated calls to fs.exists(). + try { + isMetadataTableAvailable = config.isMetadataTableEnabled() + && metaClient.getFs().exists(new Path(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath()))); + } catch (IOException e) { + throw new HoodieMetadataException("Checking existence of metadata table failed", e); + } + isMetadataAvailabilityUpdated = true; + } + } + if (isMetadataTableAvailable) { + return Option.of(FlinkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config, context)); + } else { + return Option.empty(); + } + } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index f512b8f98dcc0..3324455a09807 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -41,9 +41,7 @@ import org.apache.spark.api.java.JavaSparkContext; import java.io.IOException; -import java.util.Arrays; import java.util.List; -import java.util.stream.Collectors; public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetadataWriter { @@ -129,44 +127,6 @@ protected void commit(List records, String partitionName, String i metrics.ifPresent(m -> m.updateSizeMetrics(metadataMetaClient, metadata)); } - /** - * Perform a compaction on the Metadata Table. - * - * Cases to be handled: - * 1. We cannot perform compaction if there are previous inflight operations on the dataset. This is because - * a compacted metadata base file at time Tx should represent all the actions on the dataset till time Tx. - * - * 2. In multi-writer scenario, a parallel operation with a greater instantTime may have completed creating a - * deltacommit. - */ - private void compactIfNecessary(SparkRDDWriteClient writeClient, String instantTime) { - String latestDeltacommitTime = metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant() - .get().getTimestamp(); - List pendingInstants = dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested() - .findInstantsBefore(latestDeltacommitTime).getInstants().collect(Collectors.toList()); - - if (!pendingInstants.isEmpty()) { - LOG.info(String.format("Cannot compact metadata table as there are %d inflight instants before latest deltacommit %s: %s", - pendingInstants.size(), latestDeltacommitTime, Arrays.toString(pendingInstants.toArray()))); - return; - } - - // Trigger compaction with suffixes based on the same instant time. This ensures that any future - // delta commits synced over will not have an instant time lesser than the last completed instant on the - // metadata table. - final String compactionInstantTime = latestDeltacommitTime + "001"; - if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, Option.empty())) { - writeClient.compact(compactionInstantTime); - } - } - - private void doClean(SparkRDDWriteClient writeClient, String instantTime) { - // Trigger cleaning with suffixes based on the same instant time. This ensures that any future - // delta commits synced over will not have an instant time lesser than the last completed instant on the - // metadata table. - writeClient.clean(instantTime + "002"); - } - /** * Tag each record with the location in the given partition. * diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 26c159533bad7..f2844a608bf0e 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -126,11 +126,6 @@ public class StreamWriteOperatorCoordinator */ private HiveSyncContext hiveSyncContext; - /** - * A single-thread executor to handle metadata table sync. - */ - private NonThrownExecutor metadataSyncExecutor; - /** * The table state. */ @@ -294,7 +289,7 @@ public void syncHive() { } private void initMetadataSync() { - this.metadataSyncExecutor = new NonThrownExecutor(LOG, true); + this.writeClient.initMetadataWriter(); } private void reset() { @@ -498,14 +493,6 @@ public void setExecutor(CoordinatorExecutor executor) throws Exception { this.executor = executor; } - @VisibleForTesting - public void setMetadataSyncExecutor(NonThrownExecutor executor) throws Exception { - if (this.metadataSyncExecutor != null) { - this.metadataSyncExecutor.close(); - } - this.metadataSyncExecutor = executor; - } - // ------------------------------------------------------------------------- // Inner Class // ------------------------------------------------------------------------- diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java b/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java index 128c03010e6d7..0279313ff39f9 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java +++ b/hudi-flink/src/main/java/org/apache/hudi/sink/append/AppendWriteFunction.java @@ -72,8 +72,6 @@ public void snapshotState() { // it would check the validity. // wait for the buffer data flush out and request a new instant flushData(false); - // nullify the write helper for next ckp - this.writerHelper = null; } @Override @@ -133,5 +131,10 @@ private void flushData(boolean endInput) { .endInput(endInput) .build(); this.eventGateway.sendEventToCoordinator(event); + // nullify the write helper for next ckp + this.writerHelper = null; + this.writeStatuses.addAll(writeStatus); + // blocks flushing until the coordinator starts a new instant + this.confirming = true; } } diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java b/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java index 54085ebc128ef..07383ef7fea5f 100644 --- a/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java +++ b/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java @@ -22,6 +22,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; @@ -48,10 +49,12 @@ public class FileIndex { private final Path path; private final HoodieMetadataConfig metadataConfig; private List partitionPaths; // cache of partition paths + private final boolean tableExists; private FileIndex(Path path, Configuration conf) { this.path = path; this.metadataConfig = metadataConfig(conf); + this.tableExists = StreamerUtil.tableExists(path.toString(), StreamerUtil.getHadoopConf()); } public static FileIndex instance(Path path, Configuration conf) { @@ -111,6 +114,9 @@ public List> getPartitions( * Returns all the file statuses under the table base path. */ public FileStatus[] getFilesInPartitions() { + if (!tableExists) { + return new FileStatus[0]; + } String[] partitions = getOrBuildPartitionPaths().stream().map(p -> fullPartitionPath(path, p)).toArray(String[]::new); return FSUtils.getFilesInPartitions(HoodieFlinkEngineContext.DEFAULT, metadataConfig, path.toString(), partitions, "/tmp/") @@ -165,8 +171,9 @@ public List getOrBuildPartitionPaths() { if (this.partitionPaths != null) { return this.partitionPaths; } - this.partitionPaths = FSUtils.getAllPartitionPaths(HoodieFlinkEngineContext.DEFAULT, - metadataConfig, path.toString()); + this.partitionPaths = this.tableExists + ? FSUtils.getAllPartitionPaths(HoodieFlinkEngineContext.DEFAULT, metadataConfig, path.toString()) + : Collections.emptyList(); return this.partitionPaths; } @@ -174,7 +181,7 @@ private static HoodieMetadataConfig metadataConfig(org.apache.flink.configuratio Properties properties = new Properties(); // set up metadata.enabled=true in table DDL to enable metadata listing - properties.put(HoodieMetadataConfig.ENABLE, conf.getBoolean(FlinkOptions.METADATA_ENABLED)); + properties.put(HoodieMetadataConfig.ENABLE.key(), conf.getBoolean(FlinkOptions.METADATA_ENABLED)); return HoodieMetadataConfig.newBuilder().fromProperties(properties).build(); } diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java index 05387141630ee..1fdb5ca0acb30 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java @@ -41,7 +41,6 @@ import org.apache.hadoop.fs.Path; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -181,7 +180,7 @@ public void testHiveSyncInvoked() throws Exception { assertDoesNotThrow(() -> coordinator.notifyCheckpointComplete(1)); } - @Disabled + @Test void testSyncMetadataTable() throws Exception { // reset reset(); @@ -193,7 +192,6 @@ void testSyncMetadataTable() throws Exception { coordinator = new StreamWriteOperatorCoordinator(conf, context); coordinator.start(); coordinator.setExecutor(new MockCoordinatorExecutor(context)); - coordinator.setMetadataSyncExecutor(new MockCoordinatorExecutor(context)); final WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap(0); @@ -209,7 +207,7 @@ void testSyncMetadataTable() throws Exception { assertThat(completedTimeline.lastInstant().get().getTimestamp(), is("0000000000000")); // test metadata table compaction - // write another 4 commits + // write another 3 commits for (int i = 1; i < 4; i++) { instant = mockWriteWithMetadata(); metadataTableMetaClient.reloadActiveTimeline(); @@ -247,7 +245,13 @@ private static WriteMetadataEvent createOperatorEvent( double failureFraction) { final WriteStatus writeStatus = new WriteStatus(trackSuccessRecords, failureFraction); writeStatus.setPartitionPath(partitionPath); - writeStatus.setStat(new HoodieWriteStat()); + + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setPartitionPath(partitionPath); + writeStat.setFileId("fileId123"); + writeStat.setPath("path123"); + + writeStatus.setStat(writeStat); return WriteMetadataEvent.builder() .taskID(taskId) diff --git a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java index c5d3ec5a21221..6b6bedea58420 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java +++ b/hudi-flink/src/test/java/org/apache/hudi/sink/utils/StreamWriteFunctionWrapper.java @@ -142,9 +142,6 @@ public StreamWriteFunctionWrapper(String tablePath, Configuration conf) throws E public void openFunction() throws Exception { this.coordinator.start(); this.coordinator.setExecutor(new MockCoordinatorExecutor(coordinatorContext)); - if (conf.getBoolean(FlinkOptions.METADATA_ENABLED)) { - this.coordinator.setMetadataSyncExecutor(new MockCoordinatorExecutor(coordinatorContext)); - } toHoodieFunction = new RowDataToHoodieFunction<>(TestConfigurations.ROW_TYPE, conf); toHoodieFunction.setRuntimeContext(runtimeContext); toHoodieFunction.open(conf); diff --git a/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java b/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java index 060974df73327..334df5961314d 100644 --- a/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java +++ b/hudi-flink/src/test/java/org/apache/hudi/source/TestFileIndex.java @@ -27,7 +27,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; @@ -90,7 +89,8 @@ void testFileListingUsingMetadataNonPartitionedTable() throws Exception { assertTrue(fileStatuses[0].getPath().toString().endsWith(HoodieFileFormat.PARQUET.getFileExtension())); } - @Disabled + @ParameterizedTest + @ValueSource(booleans = {true, false}) void testFileListingEmptyTable(boolean enableMetadata) { Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); conf.setBoolean(FlinkOptions.METADATA_ENABLED, enableMetadata); diff --git a/packaging/hudi-flink-bundle/pom.xml b/packaging/hudi-flink-bundle/pom.xml index 380199e6b26f8..120cba3a20b9c 100644 --- a/packaging/hudi-flink-bundle/pom.xml +++ b/packaging/hudi-flink-bundle/pom.xml @@ -144,6 +144,10 @@ org.apache.flink:flink-sql-connector-hive-2.3.6_${scala.binary.version} org.apache.hbase:hbase-common + org.apache.hbase:hbase-client + org.apache.hbase:hbase-server + org.apache.hbase:hbase-protocol + org.apache.htrace:htrace-core commons-codec:commons-codec @@ -594,6 +598,45 @@ + + org.apache.hbase + hbase-server + ${hbase.version} + compile + + + javax.servlet + * + + + org.codehaus.jackson + * + + + org.mortbay.jetty + * + + + tomcat + * + + + + + org.apache.hbase + hbase-client + ${hbase.version} + + + org.apache.hbase + hbase-protocol + ${hbase.version} + + + org.apache.htrace + htrace-core + ${htrace.version} +