From fe86a83345c09cc5b294ff246b9990065893fe03 Mon Sep 17 00:00:00 2001 From: Danny Chan Date: Fri, 16 Dec 2022 08:34:50 +0800 Subject: [PATCH] Cherry pick for HUDI-5007 and HUDI-5228 (#7471) * [HUDI-5007] Prevent Hudi from reading the entire timeline's when performing a LATEST streaming read (#6920) (cherry picked from commit 6baf733292bcae5b83c801044ab99ac7f88d7599) * [HUDI-5228] Flink table service job fs view conf overwrites the one of writing job (#7214) (cherry picked from commit dc5cc08cccd2b727bce7b98693fc4df032b64afb) Co-authored-by: voonhous --- .../common/HoodieFlinkEngineContext.java | 4 + .../hudi/configuration/OptionsResolver.java | 7 + .../org/apache/hudi/sink/CleanFunction.java | 4 +- .../sink/StreamWriteOperatorCoordinator.java | 3 +- .../sink/bootstrap/BootstrapOperator.java | 4 +- .../sink/bulk/BulkInsertWriteFunction.java | 4 +- .../sink/clustering/ClusteringCommitSink.java | 3 +- .../sink/clustering/ClusteringOperator.java | 6 +- .../clustering/ClusteringPlanOperator.java | 4 +- .../clustering/HoodieFlinkClusteringJob.java | 3 +- .../common/AbstractStreamWriteFunction.java | 3 +- .../hudi/sink/compact/CompactFunction.java | 4 +- .../sink/compact/CompactionCommitSink.java | 4 +- .../sink/compact/HoodieFlinkCompactor.java | 3 +- .../partitioner/BucketAssignFunction.java | 4 +- .../hudi/source/IncrementalInputSplits.java | 13 +- .../apache/hudi/table/format/FormatUtils.java | 3 +- .../org/apache/hudi/util/FlinkTables.java | 5 +- .../apache/hudi/util/FlinkWriteClients.java | 238 ++++++++++++++++++ .../org/apache/hudi/util/StreamerUtil.java | 174 ------------- .../hudi/sink/TestWriteCopyOnWrite.java | 6 +- .../cluster/ITTestHoodieFlinkClustering.java | 5 +- .../compact/ITTestHoodieFlinkCompactor.java | 7 +- .../sink/partitioner/TestBucketAssigner.java | 5 +- .../source/TestIncrementalInputSplits.java | 88 +++++++ .../hudi/table/format/TestInputFormat.java | 3 +- .../apache/hudi/utils/TestCompactionUtil.java | 3 +- .../apache/hudi/utils/TestStreamerUtil.java | 11 - .../hudi/utils/TestViewStorageProperties.java | 10 + 29 files changed, 407 insertions(+), 224 deletions(-) create mode 100644 hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java create mode 100644 hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java index 5e38c24d3091..c9136da6bb45 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java @@ -67,6 +67,10 @@ private HoodieFlinkEngineContext() { this(new SerializableConfiguration(FlinkClientUtil.getHadoopConf()), new DefaultTaskContextSupplier()); } + public HoodieFlinkEngineContext(org.apache.hadoop.conf.Configuration hadoopConf) { + this(new SerializableConfiguration(hadoopConf), new DefaultTaskContextSupplier()); + } + public HoodieFlinkEngineContext(TaskContextSupplier taskContextSupplier) { this(new SerializableConfiguration(FlinkClientUtil.getHadoopConf()), taskContextSupplier); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java index bf7b0f7f5709..cbd942616fea 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java @@ -189,4 +189,11 @@ public static boolean isSpecificStartCommit(Configuration conf) { return conf.getOptional(FlinkOptions.READ_START_COMMIT).isPresent() && !conf.get(FlinkOptions.READ_START_COMMIT).equalsIgnoreCase(FlinkOptions.START_COMMIT_EARLIEST); } + + /** + * Returns true if there are no explicit start and end commits. + */ + public static boolean hasNoSpecificReadCommits(Configuration conf) { + return !conf.contains(FlinkOptions.READ_START_COMMIT) && !conf.contains(FlinkOptions.READ_END_COMMIT); + } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java index a59be858bae7..638fe9fdab28 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/CleanFunction.java @@ -22,7 +22,7 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.sink.utils.NonThrownExecutor; -import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.util.FlinkWriteClients; import org.apache.flink.api.common.functions.AbstractRichFunction; import org.apache.flink.api.common.state.CheckpointListener; @@ -60,7 +60,7 @@ public CleanFunction(Configuration conf) { @Override public void open(Configuration parameters) throws Exception { super.open(parameters); - this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext()); + this.writeClient = FlinkWriteClients.createWriteClient(conf, getRuntimeContext()); this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build(); String instantTime = HoodieActiveTimeline.createNewInstantTime(); LOG.info(String.format("exec clean with instant time %s...", instantTime)); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index c87d5b2443c4..17b789e2f0dc 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -39,6 +39,7 @@ import org.apache.hudi.sink.utils.NonThrownExecutor; import org.apache.hudi.util.ClusteringUtil; import org.apache.hudi.util.CompactionUtil; +import org.apache.hudi.util.FlinkWriteClients; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; @@ -179,7 +180,7 @@ public void start() throws Exception { this.metaClient = initTableIfNotExists(this.conf); this.ckpMetadata = initCkpMetadata(this.metaClient); // the write client must create after the table creation - this.writeClient = StreamerUtil.createWriteClient(conf); + this.writeClient = FlinkWriteClients.createWriteClient(conf); initMetadataTable(this.writeClient); this.tableState = TableState.create(conf); // start the executor diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java index 09250e31328a..3eaa47e3b627 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bootstrap/BootstrapOperator.java @@ -42,7 +42,7 @@ import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.format.FormatUtils; import org.apache.hudi.util.FlinkTables; -import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.util.FlinkWriteClients; import org.apache.avro.Schema; import org.apache.flink.annotation.VisibleForTesting; @@ -125,7 +125,7 @@ public void initializeState(StateInitializationContext context) throws Exception } this.hadoopConf = HadoopConfigurations.getHadoopConf(this.conf); - this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true); + this.writeConfig = FlinkWriteClients.getHoodieClientConfig(this.conf, true); this.hoodieTable = FlinkTables.createTable(writeConfig, hadoopConf, getRuntimeContext()); this.ckpMetadata = CkpMetadata.getInstance(hoodieTable.getMetaClient().getFs(), this.writeConfig.getBasePath()); this.aggregateManager = getRuntimeContext().getGlobalAggregateManager(); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java index 06d9fcd851c2..9fbdbdd8e1af 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/BulkInsertWriteFunction.java @@ -27,7 +27,7 @@ import org.apache.hudi.sink.event.WriteMetadataEvent; import org.apache.hudi.sink.meta.CkpMetadata; import org.apache.hudi.sink.utils.TimeWait; -import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.util.FlinkWriteClients; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.operators.coordination.OperatorEvent; @@ -112,7 +112,7 @@ public BulkInsertWriteFunction(Configuration config, RowType rowType) { @Override public void open(Configuration parameters) throws IOException { this.taskID = getRuntimeContext().getIndexOfThisSubtask(); - this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext()); + this.writeClient = FlinkWriteClients.createWriteClient(this.config, getRuntimeContext()); this.ckpMetadata = CkpMetadata.getInstance(config); this.initInstant = lastPendingInstant(); sendBootstrapEvent(); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java index 97b0bd3bc995..5a46dcf8f336 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringCommitSink.java @@ -37,6 +37,7 @@ import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.util.CompactionUtil; +import org.apache.hudi.util.FlinkWriteClients; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.configuration.Configuration; @@ -87,7 +88,7 @@ public ClusteringCommitSink(Configuration conf) { public void open(Configuration parameters) throws Exception { super.open(parameters); if (writeClient == null) { - this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext()); + this.writeClient = FlinkWriteClients.createWriteClient(conf, getRuntimeContext()); } this.commitBuffer = new HashMap<>(); this.table = writeClient.getHoodieTable(); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java index 837b06a89a8b..ca1cd54c1fed 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java @@ -47,7 +47,7 @@ import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.AvroToRowDataConverters; -import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.util.FlinkWriteClients; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; @@ -144,8 +144,8 @@ public void open() throws Exception { super.open(); this.taskID = getRuntimeContext().getIndexOfThisSubtask(); - this.writeConfig = StreamerUtil.getHoodieClientConfig(this.conf); - this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext()); + this.writeConfig = FlinkWriteClients.getHoodieClientConfig(this.conf); + this.writeClient = FlinkWriteClients.createWriteClient(conf, getRuntimeContext()); this.table = writeClient.getHoodieTable(); this.schema = AvroSchemaConverter.convertToSchema(rowType); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanOperator.java index 4cdc3653e57c..48b2a9becd43 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringPlanOperator.java @@ -29,7 +29,7 @@ import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.util.ClusteringUtil; import org.apache.hudi.util.FlinkTables; -import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.util.FlinkWriteClients; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; @@ -68,7 +68,7 @@ public void open() throws Exception { // when starting up, rolls back all the inflight clustering instants if there exists, // these instants are in priority for scheduling task because the clustering instants are // scheduled from earliest(FIFO sequence). - ClusteringUtil.rollbackClustering(table, StreamerUtil.createWriteClient(conf, getRuntimeContext())); + ClusteringUtil.rollbackClustering(table, FlinkWriteClients.createWriteClient(conf, getRuntimeContext())); } @Override diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java index adb44164294c..1942b1ce29e7 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/HoodieFlinkClusteringJob.java @@ -35,6 +35,7 @@ import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.ClusteringUtil; import org.apache.hudi.util.CompactionUtil; +import org.apache.hudi.util.FlinkWriteClients; import org.apache.hudi.util.StreamerUtil; import com.beust.jcommander.JCommander; @@ -198,7 +199,7 @@ public AsyncClusteringService(FlinkClusteringConfig cfg, Configuration conf, Str // set table schema CompactionUtil.setAvroSchema(conf, metaClient); - this.writeClient = StreamerUtil.createWriteClient(conf); + this.writeClient = FlinkWriteClients.createWriteClientV2(conf); this.writeConfig = writeClient.getConfig(); this.table = writeClient.getHoodieTable(); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java index 1f2394618464..f8438a4eb245 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java @@ -28,6 +28,7 @@ import org.apache.hudi.sink.event.WriteMetadataEvent; import org.apache.hudi.sink.meta.CkpMetadata; import org.apache.hudi.sink.utils.TimeWait; +import org.apache.hudi.util.FlinkWriteClients; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.annotation.VisibleForTesting; @@ -139,7 +140,7 @@ public AbstractStreamWriteFunction(Configuration config) { public void initializeState(FunctionInitializationContext context) throws Exception { this.taskID = getRuntimeContext().getIndexOfThisSubtask(); this.metaClient = StreamerUtil.createMetaClient(this.config); - this.writeClient = StreamerUtil.createWriteClient(this.config, getRuntimeContext()); + this.writeClient = FlinkWriteClients.createWriteClient(this.config, getRuntimeContext()); this.writeStatuses = new ArrayList<>(); this.writeMetadataState = context.getOperatorStateStore().getListState( new ListStateDescriptor<>( diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java index f65e1f2d3b82..30c6d290b31a 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactFunction.java @@ -27,7 +27,7 @@ import org.apache.hudi.table.HoodieFlinkCopyOnWriteTable; import org.apache.hudi.table.action.compact.HoodieFlinkMergeOnReadTableCompactor; import org.apache.hudi.util.CompactionUtil; -import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.util.FlinkWriteClients; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; @@ -79,7 +79,7 @@ public CompactFunction(Configuration conf) { @Override public void open(Configuration parameters) throws Exception { this.taskID = getRuntimeContext().getIndexOfThisSubtask(); - this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext()); + this.writeClient = FlinkWriteClients.createWriteClient(conf, getRuntimeContext()); if (this.asyncCompaction) { this.executor = NonThrownExecutor.builder(LOG).build(); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java index 8dadd2e2dcf6..ef182241e4f1 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java @@ -30,7 +30,7 @@ import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.table.action.compact.CompactHelpers; import org.apache.hudi.util.CompactionUtil; -import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.util.FlinkWriteClients; import org.apache.flink.configuration.Configuration; import org.slf4j.Logger; @@ -91,7 +91,7 @@ public CompactionCommitSink(Configuration conf) { public void open(Configuration parameters) throws Exception { super.open(parameters); if (writeClient == null) { - this.writeClient = StreamerUtil.createWriteClient(conf, getRuntimeContext()); + this.writeClient = FlinkWriteClients.createWriteClient(conf, getRuntimeContext()); } this.commitBuffer = new HashMap<>(); this.compactionPlanCache = new HashMap<>(); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java index cfe53c6039b7..0fb28eb86a83 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/HoodieFlinkCompactor.java @@ -32,6 +32,7 @@ import org.apache.hudi.sink.compact.strategy.CompactionPlanStrategies; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.util.CompactionUtil; +import org.apache.hudi.util.FlinkWriteClients; import org.apache.hudi.util.StreamerUtil; import com.beust.jcommander.JCommander; @@ -188,7 +189,7 @@ public AsyncCompactionService(FlinkCompactionConfig cfg, Configuration conf, Str // infer changelog mode CompactionUtil.inferChangelogMode(conf, metaClient); - this.writeClient = StreamerUtil.createWriteClient(conf); + this.writeClient = FlinkWriteClients.createWriteClientV2(conf); this.writeConfig = writeClient.getConfig(); this.table = writeClient.getHoodieTable(); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java index 89f89cf5c0a9..676c03f41c97 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java @@ -35,7 +35,7 @@ import org.apache.hudi.sink.bootstrap.IndexRecord; import org.apache.hudi.sink.utils.PayloadCreation; import org.apache.hudi.table.action.commit.BucketInfo; -import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.util.FlinkWriteClients; import org.apache.flink.api.common.state.CheckpointListener; import org.apache.flink.api.common.state.StateTtlConfig; @@ -115,7 +115,7 @@ public BucketAssignFunction(Configuration conf) { @Override public void open(Configuration parameters) throws Exception { super.open(parameters); - HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(this.conf, true); + HoodieWriteConfig writeConfig = FlinkWriteClients.getHoodieClientConfig(this.conf, true); HoodieFlinkEngineContext context = new HoodieFlinkEngineContext( new SerializableConfiguration(HadoopConfigurations.getHadoopConf(this.conf)), new FlinkTaskContextSupplier(getRuntimeContext())); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java index 0be2a5300f09..2dd86d652869 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/IncrementalInputSplits.java @@ -18,6 +18,7 @@ package org.apache.hudi.source; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.BaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; @@ -404,7 +405,8 @@ private List getArchivedMetadata( * @param issuedInstant The last issued instant that has already been delivered to downstream * @return the filtered hoodie instants */ - private List filterInstantsWithRange( + @VisibleForTesting + public List filterInstantsWithRange( HoodieTimeline commitTimeline, final String issuedInstant) { HoodieTimeline completedTimeline = commitTimeline.filterCompletedInstants(); @@ -417,6 +419,15 @@ private List filterInstantsWithRange( Stream instantStream = completedTimeline.getInstants(); + if (OptionsResolver.hasNoSpecificReadCommits(this.conf)) { + // by default read from the latest commit + List instants = completedTimeline.getInstants().collect(Collectors.toList()); + if (instants.size() > 1) { + return Collections.singletonList(instants.get(instants.size() - 1)); + } + return instants; + } + if (OptionsResolver.isSpecificStartCommit(this.conf)) { final String startCommit = this.conf.get(FlinkOptions.READ_START_COMMIT); instantStream = instantStream diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java index 8adbde355cf7..6357b898d49d 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java @@ -33,6 +33,7 @@ import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.hadoop.config.HoodieRealtimeConfig; import org.apache.hudi.table.format.mor.MergeOnReadInputSplit; +import org.apache.hudi.util.FlinkWriteClients; import org.apache.hudi.util.StreamerUtil; import org.apache.avro.Schema; @@ -123,7 +124,7 @@ public static HoodieMergedLogRecordScanner logScanner( Schema logSchema, org.apache.flink.configuration.Configuration flinkConf, Configuration hadoopConf) { - HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(flinkConf); + HoodieWriteConfig writeConfig = FlinkWriteClients.getHoodieClientConfig(flinkConf); FileSystem fs = FSUtils.getFs(split.getTablePath(), hadoopConf); return HoodieMergedLogRecordScanner.newBuilder() .withFileSystem(fs) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkTables.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkTables.java index d440588b642e..ee164d3cda95 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkTables.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkTables.java @@ -28,7 +28,6 @@ import org.apache.flink.configuration.Configuration; import static org.apache.hudi.configuration.HadoopConfigurations.getHadoopConf; -import static org.apache.hudi.util.StreamerUtil.getHoodieClientConfig; /** * Utilities for {@link org.apache.hudi.table.HoodieFlinkTable}. @@ -46,7 +45,7 @@ public static HoodieFlinkTable createTable(Configuration conf, RuntimeContext HoodieFlinkEngineContext context = new HoodieFlinkEngineContext( new SerializableConfiguration(getHadoopConf(conf)), new FlinkTaskContextSupplier(runtimeContext)); - HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true); + HoodieWriteConfig writeConfig = FlinkWriteClients.getHoodieClientConfig(conf, true); return HoodieFlinkTable.create(writeConfig, context); } @@ -71,7 +70,7 @@ public static HoodieFlinkTable createTable( *

This expects to be used by driver. */ public static HoodieFlinkTable createTable(Configuration conf) { - HoodieWriteConfig writeConfig = StreamerUtil.getHoodieClientConfig(conf, true, false); + HoodieWriteConfig writeConfig = FlinkWriteClients.getHoodieClientConfig(conf, true, false); return HoodieFlinkTable.create(writeConfig, HoodieFlinkEngineContext.DEFAULT); } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java new file mode 100644 index 000000000000..41712e8fb982 --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/FlinkWriteClients.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.util; + +import org.apache.hudi.client.FlinkTaskContextSupplier; +import org.apache.hudi.client.HoodieFlinkWriteClient; +import org.apache.hudi.client.common.HoodieFlinkEngineContext; +import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider; +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.engine.EngineType; +import org.apache.hudi.common.model.HoodieCleaningPolicy; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; +import org.apache.hudi.config.HoodieArchivalConfig; +import org.apache.hudi.config.HoodieCleanConfig; +import org.apache.hudi.config.HoodieClusteringConfig; +import org.apache.hudi.config.HoodieCompactionConfig; +import org.apache.hudi.config.HoodieLockConfig; +import org.apache.hudi.config.HoodieMemoryConfig; +import org.apache.hudi.config.HoodieStorageConfig; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.configuration.HadoopConfigurations; +import org.apache.hudi.configuration.OptionsResolver; +import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode; +import org.apache.hudi.table.action.compact.CompactionTriggerStrategy; + +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.configuration.Configuration; + +import java.io.IOException; +import java.util.Locale; + +import static org.apache.hudi.util.StreamerUtil.flinkConf2TypedProperties; +import static org.apache.hudi.util.StreamerUtil.getPayloadConfig; +import static org.apache.hudi.util.StreamerUtil.getSourceSchema; + +/** + * Utilities for {@link org.apache.hudi.client.HoodieFlinkWriteClient}. + */ +public class FlinkWriteClients { + + /** + * Creates the Flink write client. + * + *

This expects to be used by the driver, the client can then send requests for files view. + * + *

The task context supplier is a constant: the write token is always '0-1-0'. + */ + @SuppressWarnings("rawtypes") + public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throws IOException { + HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true, false); + // build the write client to start the embedded timeline server + final HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient<>(new HoodieFlinkEngineContext(HadoopConfigurations.getHadoopConf(conf)), writeConfig); + writeClient.setOperationType(WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION))); + // create the filesystem view storage properties for client + final FileSystemViewStorageConfig viewStorageConfig = writeConfig.getViewStorageConfig(); + // rebuild the view storage config with simplified options. + FileSystemViewStorageConfig rebuilt = FileSystemViewStorageConfig.newBuilder() + .withStorageType(viewStorageConfig.getStorageType()) + .withRemoteServerHost(viewStorageConfig.getRemoteViewServerHost()) + .withRemoteServerPort(viewStorageConfig.getRemoteViewServerPort()) + .withRemoteTimelineClientTimeoutSecs(viewStorageConfig.getRemoteTimelineClientTimeoutSecs()) + .withRemoteTimelineClientRetry(viewStorageConfig.isRemoteTimelineClientRetryEnabled()) + .withRemoteTimelineClientMaxRetryNumbers(viewStorageConfig.getRemoteTimelineClientMaxRetryNumbers()) + .withRemoteTimelineInitialRetryIntervalMs(viewStorageConfig.getRemoteTimelineInitialRetryIntervalMs()) + .withRemoteTimelineClientMaxRetryIntervalMs(viewStorageConfig.getRemoteTimelineClientMaxRetryIntervalMs()) + .withRemoteTimelineClientRetryExceptions(viewStorageConfig.getRemoteTimelineClientRetryExceptions()) + .build(); + ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), rebuilt, conf); + return writeClient; + } + + /** + * Creates the Flink write client. + * + *

This expects to be used by the driver, the client can then send requests for files view. + * + *

The task context supplier is a constant: the write token is always '0-1-0'. + * + *

Note: different with {@link #createWriteClient}, the fs view storage options are set into the given + * configuration {@code conf}. + */ + @SuppressWarnings("rawtypes") + public static HoodieFlinkWriteClient createWriteClientV2(Configuration conf) { + HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true, false); + // build the write client to start the embedded timeline server + final HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient<>(new HoodieFlinkEngineContext(HadoopConfigurations.getHadoopConf(conf)), writeConfig); + writeClient.setOperationType(WriteOperationType.fromValue(conf.getString(FlinkOptions.OPERATION))); + // create the filesystem view storage properties for client + final FileSystemViewStorageConfig viewStorageConfig = writeConfig.getViewStorageConfig(); + conf.setString(FileSystemViewStorageConfig.VIEW_TYPE.key(), viewStorageConfig.getStorageType().name()); + conf.setString(FileSystemViewStorageConfig.REMOTE_HOST_NAME.key(), viewStorageConfig.getRemoteViewServerHost()); + conf.setInteger(FileSystemViewStorageConfig.REMOTE_PORT_NUM.key(), viewStorageConfig.getRemoteViewServerPort()); + return writeClient; + } + + /** + * Creates the Flink write client. + * + *

This expects to be used by client, the driver should start an embedded timeline server. + */ + @SuppressWarnings("rawtypes") + public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext) { + return createWriteClient(conf, runtimeContext, true); + } + + /** + * Creates the Flink write client. + * + *

This expects to be used by client, set flag {@code loadFsViewStorageConfig} to use + * remote filesystem view storage config, or an in-memory filesystem view storage is used. + */ + @SuppressWarnings("rawtypes") + public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext, boolean loadFsViewStorageConfig) { + HoodieFlinkEngineContext context = + new HoodieFlinkEngineContext( + new SerializableConfiguration(HadoopConfigurations.getHadoopConf(conf)), + new FlinkTaskContextSupplier(runtimeContext)); + + HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, loadFsViewStorageConfig); + return new HoodieFlinkWriteClient<>(context, writeConfig); + } + + /** + * Mainly used for tests. + */ + public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) { + return getHoodieClientConfig(conf, false, false); + } + + public static HoodieWriteConfig getHoodieClientConfig(Configuration conf, boolean loadFsViewStorageConfig) { + return getHoodieClientConfig(conf, false, loadFsViewStorageConfig); + } + + public static HoodieWriteConfig getHoodieClientConfig( + Configuration conf, + boolean enableEmbeddedTimelineService, + boolean loadFsViewStorageConfig) { + HoodieWriteConfig.Builder builder = + HoodieWriteConfig.newBuilder() + .withEngineType(EngineType.FLINK) + .withPath(conf.getString(FlinkOptions.PATH)) + .combineInput(conf.getBoolean(FlinkOptions.PRE_COMBINE), true) + .withMergeAllowDuplicateOnInserts(OptionsResolver.insertClustering(conf)) + .withClusteringConfig( + HoodieClusteringConfig.newBuilder() + .withAsyncClustering(conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED)) + .withClusteringPlanStrategyClass(conf.getString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS)) + .withClusteringPlanPartitionFilterMode( + ClusteringPlanPartitionFilterMode.valueOf(conf.getString(FlinkOptions.CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME))) + .withClusteringTargetPartitions(conf.getInteger(FlinkOptions.CLUSTERING_TARGET_PARTITIONS)) + .withClusteringMaxNumGroups(conf.getInteger(FlinkOptions.CLUSTERING_MAX_NUM_GROUPS)) + .withClusteringTargetFileMaxBytes(conf.getLong(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES)) + .withClusteringPlanSmallFileLimit(conf.getLong(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT) * 1024 * 1024L) + .withClusteringSkipPartitionsFromLatest(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST)) + .withAsyncClusteringMaxCommits(conf.getInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS)) + .build()) + .withCleanConfig(HoodieCleanConfig.newBuilder() + .withAsyncClean(conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) + .retainCommits(conf.getInteger(FlinkOptions.CLEAN_RETAIN_COMMITS)) + .cleanerNumHoursRetained(conf.getInteger(FlinkOptions.CLEAN_RETAIN_HOURS)) + .retainFileVersions(conf.getInteger(FlinkOptions.CLEAN_RETAIN_FILE_VERSIONS)) + // override and hardcode to 20, + // actually Flink cleaning is always with parallelism 1 now + .withCleanerParallelism(20) + .withCleanerPolicy(HoodieCleaningPolicy.valueOf(conf.getString(FlinkOptions.CLEAN_POLICY))) + .build()) + .withArchivalConfig(HoodieArchivalConfig.newBuilder() + .archiveCommitsWith(conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), conf.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS)) + .build()) + .withCompactionConfig(HoodieCompactionConfig.newBuilder() + .withTargetIOPerCompactionInMB(conf.getLong(FlinkOptions.COMPACTION_TARGET_IO)) + .withInlineCompactionTriggerStrategy( + CompactionTriggerStrategy.valueOf(conf.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY).toUpperCase(Locale.ROOT))) + .withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.COMPACTION_DELTA_COMMITS)) + .withMaxDeltaSecondsBeforeCompaction(conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS)) + .build()) + .withMemoryConfig( + HoodieMemoryConfig.newBuilder() + .withMaxMemoryMaxSize( + conf.getInteger(FlinkOptions.WRITE_MERGE_MAX_MEMORY) * 1024 * 1024L, + conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024L + ).build()) + .forTable(conf.getString(FlinkOptions.TABLE_NAME)) + .withStorageConfig(HoodieStorageConfig.newBuilder() + .logFileDataBlockMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_BLOCK_SIZE) * 1024 * 1024) + .logFileMaxSize(conf.getLong(FlinkOptions.WRITE_LOG_MAX_SIZE) * 1024 * 1024) + .parquetBlockSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_BLOCK_SIZE) * 1024 * 1024) + .parquetPageSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_PAGE_SIZE) * 1024 * 1024) + .parquetMaxFileSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE) * 1024 * 1024L) + .build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(conf.getBoolean(FlinkOptions.METADATA_ENABLED)) + .withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS)) + .build()) + .withLockConfig(HoodieLockConfig.newBuilder() + .withLockProvider(FileSystemBasedLockProvider.class) + .withLockWaitTimeInMillis(2000L) // 2s + .withFileSystemLockExpire(1) // 1 minute + .withClientNumRetries(30) + .withFileSystemLockPath(StreamerUtil.getAuxiliaryPath(conf)) + .build()) + .withPayloadConfig(getPayloadConfig(conf)) + .withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineService) + .withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton + .withAutoCommit(false) + .withAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) + .withProps(flinkConf2TypedProperties(conf)) + .withSchema(getSourceSchema(conf).toString()); + + // do not configure cleaning strategy as LAZY until multi-writers is supported. + HoodieWriteConfig writeConfig = builder.build(); + if (loadFsViewStorageConfig && !conf.containsKey(FileSystemViewStorageConfig.REMOTE_HOST_NAME.key())) { + // do not use the builder to give a change for recovering the original fs view storage config + FileSystemViewStorageConfig viewStorageConfig = ViewStorageProperties.loadFromProperties(conf.getString(FlinkOptions.PATH), conf); + writeConfig.setViewStorageConfig(viewStorageConfig); + } + return writeConfig; + } +} diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index 49a95027d5aa..98664c6dc3bd 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -18,37 +18,20 @@ package org.apache.hudi.util; -import org.apache.hudi.client.FlinkTaskContextSupplier; -import org.apache.hudi.client.HoodieFlinkWriteClient; -import org.apache.hudi.client.common.HoodieFlinkEngineContext; -import org.apache.hudi.client.transaction.lock.FileSystemBasedLockProvider; import org.apache.hudi.common.config.DFSPropertiesConfiguration; -import org.apache.hudi.common.config.HoodieMetadataConfig; -import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.common.engine.EngineType; import org.apache.hudi.common.fs.FSUtils; -import org.apache.hudi.common.model.HoodieCleaningPolicy; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.log.HoodieLogFormat; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ReflectionUtils; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; -import org.apache.hudi.config.HoodieArchivalConfig; -import org.apache.hudi.config.HoodieCleanConfig; -import org.apache.hudi.config.HoodieClusteringConfig; -import org.apache.hudi.config.HoodieCompactionConfig; -import org.apache.hudi.config.HoodieLockConfig; -import org.apache.hudi.config.HoodieMemoryConfig; import org.apache.hudi.config.HoodiePayloadConfig; -import org.apache.hudi.config.HoodieStorageConfig; -import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.configuration.OptionsResolver; @@ -59,11 +42,8 @@ import org.apache.hudi.sink.transform.ChainedTransformer; import org.apache.hudi.sink.transform.Transformer; import org.apache.hudi.streamer.FlinkStreamerConfig; -import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode; -import org.apache.hudi.table.action.compact.CompactionTriggerStrategy; import org.apache.avro.Schema; -import org.apache.flink.api.common.functions.RuntimeContext; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Preconditions; @@ -84,7 +64,6 @@ import java.util.Collections; import java.util.Date; import java.util.List; -import java.util.Locale; import java.util.Properties; import static org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG; @@ -147,102 +126,6 @@ public static DFSPropertiesConfiguration readConfig(org.apache.hadoop.conf.Confi return conf; } - /** - * Mainly used for tests. - */ - public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) { - return getHoodieClientConfig(conf, false, false); - } - - public static HoodieWriteConfig getHoodieClientConfig(Configuration conf, boolean loadFsViewStorageConfig) { - return getHoodieClientConfig(conf, false, loadFsViewStorageConfig); - } - - public static HoodieWriteConfig getHoodieClientConfig( - Configuration conf, - boolean enableEmbeddedTimelineService, - boolean loadFsViewStorageConfig) { - HoodieWriteConfig.Builder builder = - HoodieWriteConfig.newBuilder() - .withEngineType(EngineType.FLINK) - .withPath(conf.getString(FlinkOptions.PATH)) - .combineInput(conf.getBoolean(FlinkOptions.PRE_COMBINE), true) - .withMergeAllowDuplicateOnInserts(OptionsResolver.insertClustering(conf)) - .withClusteringConfig( - HoodieClusteringConfig.newBuilder() - .withAsyncClustering(conf.getBoolean(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED)) - .withClusteringPlanStrategyClass(conf.getString(FlinkOptions.CLUSTERING_PLAN_STRATEGY_CLASS)) - .withClusteringPlanPartitionFilterMode( - ClusteringPlanPartitionFilterMode.valueOf(conf.getString(FlinkOptions.CLUSTERING_PLAN_PARTITION_FILTER_MODE_NAME))) - .withClusteringTargetPartitions(conf.getInteger(FlinkOptions.CLUSTERING_TARGET_PARTITIONS)) - .withClusteringMaxNumGroups(conf.getInteger(FlinkOptions.CLUSTERING_MAX_NUM_GROUPS)) - .withClusteringTargetFileMaxBytes(conf.getLong(FlinkOptions.CLUSTERING_PLAN_STRATEGY_TARGET_FILE_MAX_BYTES)) - .withClusteringPlanSmallFileLimit(conf.getLong(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SMALL_FILE_LIMIT) * 1024 * 1024L) - .withClusteringSkipPartitionsFromLatest(conf.getInteger(FlinkOptions.CLUSTERING_PLAN_STRATEGY_SKIP_PARTITIONS_FROM_LATEST)) - .withAsyncClusteringMaxCommits(conf.getInteger(FlinkOptions.CLUSTERING_DELTA_COMMITS)) - .build()) - .withCleanConfig(HoodieCleanConfig.newBuilder() - .withAsyncClean(conf.getBoolean(FlinkOptions.CLEAN_ASYNC_ENABLED)) - .retainCommits(conf.getInteger(FlinkOptions.CLEAN_RETAIN_COMMITS)) - .retainFileVersions(conf.getInteger(FlinkOptions.CLEAN_RETAIN_FILE_VERSIONS)) - // override and hardcode to 20, - // actually Flink cleaning is always with parallelism 1 now - .withCleanerParallelism(20) - .withCleanerPolicy(HoodieCleaningPolicy.valueOf(conf.getString(FlinkOptions.CLEAN_POLICY))) - .build()) - .withArchivalConfig(HoodieArchivalConfig.newBuilder() - .archiveCommitsWith(conf.getInteger(FlinkOptions.ARCHIVE_MIN_COMMITS), conf.getInteger(FlinkOptions.ARCHIVE_MAX_COMMITS)) - .build()) - .withCompactionConfig(HoodieCompactionConfig.newBuilder() - .withTargetIOPerCompactionInMB(conf.getLong(FlinkOptions.COMPACTION_TARGET_IO)) - .withInlineCompactionTriggerStrategy( - CompactionTriggerStrategy.valueOf(conf.getString(FlinkOptions.COMPACTION_TRIGGER_STRATEGY).toUpperCase(Locale.ROOT))) - .withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.COMPACTION_DELTA_COMMITS)) - .withMaxDeltaSecondsBeforeCompaction(conf.getInteger(FlinkOptions.COMPACTION_DELTA_SECONDS)) - .build()) - .withMemoryConfig( - HoodieMemoryConfig.newBuilder() - .withMaxMemoryMaxSize( - conf.getInteger(FlinkOptions.WRITE_MERGE_MAX_MEMORY) * 1024 * 1024L, - conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) * 1024 * 1024L - ).build()) - .forTable(conf.getString(FlinkOptions.TABLE_NAME)) - .withStorageConfig(HoodieStorageConfig.newBuilder() - .logFileDataBlockMaxSize(conf.getInteger(FlinkOptions.WRITE_LOG_BLOCK_SIZE) * 1024 * 1024) - .logFileMaxSize(conf.getLong(FlinkOptions.WRITE_LOG_MAX_SIZE) * 1024 * 1024) - .parquetBlockSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_BLOCK_SIZE) * 1024 * 1024) - .parquetPageSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_PAGE_SIZE) * 1024 * 1024) - .parquetMaxFileSize(conf.getInteger(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE) * 1024 * 1024L) - .build()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder() - .enable(conf.getBoolean(FlinkOptions.METADATA_ENABLED)) - .withMaxNumDeltaCommitsBeforeCompaction(conf.getInteger(FlinkOptions.METADATA_COMPACTION_DELTA_COMMITS)) - .build()) - .withLockConfig(HoodieLockConfig.newBuilder() - .withLockProvider(FileSystemBasedLockProvider.class) - .withLockWaitTimeInMillis(2000L) // 2s - .withFileSystemLockExpire(1) // 1 minute - .withClientNumRetries(30) - .withFileSystemLockPath(StreamerUtil.getAuxiliaryPath(conf)) - .build()) - .withPayloadConfig(getPayloadConfig(conf)) - .withEmbeddedTimelineServerEnabled(enableEmbeddedTimelineService) - .withEmbeddedTimelineServerReuseEnabled(true) // make write client embedded timeline service singleton - .withAutoCommit(false) - .withAllowOperationMetadataField(conf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) - .withProps(flinkConf2TypedProperties(conf)) - .withSchema(getSourceSchema(conf).toString()); - - // do not configure cleaning strategy as LAZY until multi-writers is supported. - HoodieWriteConfig writeConfig = builder.build(); - if (loadFsViewStorageConfig) { - // do not use the builder to give a change for recovering the original fs view storage config - FileSystemViewStorageConfig viewStorageConfig = ViewStorageProperties.loadFromProperties(conf.getString(FlinkOptions.PATH), conf); - writeConfig.setViewStorageConfig(viewStorageConfig); - } - return writeConfig; - } - /** * Returns the payload config with given configuration. */ @@ -380,63 +263,6 @@ public static HoodieTableMetaClient createMetaClient(Configuration conf) { return createMetaClient(conf.getString(FlinkOptions.PATH), HadoopConfigurations.getHadoopConf(conf)); } - /** - * Creates the Flink write client. - * - *

This expects to be used by client, the driver should start an embedded timeline server. - */ - @SuppressWarnings("rawtypes") - public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext) { - return createWriteClient(conf, runtimeContext, true); - } - - /** - * Creates the Flink write client. - * - *

This expects to be used by client, set flag {@code loadFsViewStorageConfig} to use - * remote filesystem view storage config, or an in-memory filesystem view storage is used. - */ - @SuppressWarnings("rawtypes") - public static HoodieFlinkWriteClient createWriteClient(Configuration conf, RuntimeContext runtimeContext, boolean loadFsViewStorageConfig) { - HoodieFlinkEngineContext context = - new HoodieFlinkEngineContext( - new SerializableConfiguration(HadoopConfigurations.getHadoopConf(conf)), - new FlinkTaskContextSupplier(runtimeContext)); - - HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, loadFsViewStorageConfig); - return new HoodieFlinkWriteClient<>(context, writeConfig); - } - - /** - * Creates the Flink write client. - * - *

This expects to be used by the driver, the client can then send requests for files view. - * - *

The task context supplier is a constant: the write token is always '0-1-0'. - */ - @SuppressWarnings("rawtypes") - public static HoodieFlinkWriteClient createWriteClient(Configuration conf) throws IOException { - HoodieWriteConfig writeConfig = getHoodieClientConfig(conf, true, false); - // build the write client to start the embedded timeline server - final HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient<>(HoodieFlinkEngineContext.DEFAULT, writeConfig); - // create the filesystem view storage properties for client - final FileSystemViewStorageConfig viewStorageConfig = writeConfig.getViewStorageConfig(); - // rebuild the view storage config with simplified options. - FileSystemViewStorageConfig rebuilt = FileSystemViewStorageConfig.newBuilder() - .withStorageType(viewStorageConfig.getStorageType()) - .withRemoteServerHost(viewStorageConfig.getRemoteViewServerHost()) - .withRemoteServerPort(viewStorageConfig.getRemoteViewServerPort()) - .withRemoteTimelineClientTimeoutSecs(viewStorageConfig.getRemoteTimelineClientTimeoutSecs()) - .withRemoteTimelineClientRetry(viewStorageConfig.isRemoteTimelineClientRetryEnabled()) - .withRemoteTimelineClientMaxRetryNumbers(viewStorageConfig.getRemoteTimelineClientMaxRetryNumbers()) - .withRemoteTimelineInitialRetryIntervalMs(viewStorageConfig.getRemoteTimelineInitialRetryIntervalMs()) - .withRemoteTimelineClientMaxRetryIntervalMs(viewStorageConfig.getRemoteTimelineClientMaxRetryIntervalMs()) - .withRemoteTimelineClientRetryExceptions(viewStorageConfig.getRemoteTimelineClientRetryExceptions()) - .build(); - ViewStorageProperties.createProperties(conf.getString(FlinkOptions.PATH), rebuilt, conf); - return writeClient; - } - /** * Returns the median instant time between the given two instant time. */ diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index 9a6aaf01e669..21dd6fd1d18a 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -25,7 +25,7 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.sink.utils.TestWriteBase; -import org.apache.hudi.util.StreamerUtil; +import org.apache.hudi.util.FlinkWriteClients; import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestData; @@ -425,13 +425,13 @@ public void testWriteExactlyOnce() throws Exception { @Test public void testReuseEmbeddedServer() throws IOException { conf.setInteger("hoodie.filesystem.view.remote.timeout.secs", 500); - HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf); + HoodieFlinkWriteClient writeClient = FlinkWriteClients.createWriteClient(conf); FileSystemViewStorageConfig viewStorageConfig = writeClient.getConfig().getViewStorageConfig(); assertSame(viewStorageConfig.getStorageType(), FileSystemViewStorageType.REMOTE_FIRST); // get another write client - writeClient = StreamerUtil.createWriteClient(conf); + writeClient = FlinkWriteClients.createWriteClient(conf); assertSame(writeClient.getConfig().getViewStorageConfig().getStorageType(), FileSystemViewStorageType.REMOTE_FIRST); assertEquals(viewStorageConfig.getRemoteViewServerPort(), writeClient.getConfig().getViewStorageConfig().getRemoteViewServerPort()); assertEquals(viewStorageConfig.getRemoteTimelineClientTimeoutSecs(), 500); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java index 4c0fe82e44bf..f2273e40a26d 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/cluster/ITTestHoodieFlinkClustering.java @@ -38,6 +38,7 @@ import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.util.AvroSchemaConverter; import org.apache.hudi.util.CompactionUtil; +import org.apache.hudi.util.FlinkWriteClients; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.FlinkMiniCluster; import org.apache.hudi.utils.TestConfigurations; @@ -136,7 +137,7 @@ public void testHoodieFlinkClustering() throws Exception { // To compute the clustering instant time and do clustering. String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime(); - HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf); + HoodieFlinkWriteClient writeClient = FlinkWriteClients.createWriteClient(conf); HoodieFlinkTable table = writeClient.getHoodieTable(); boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty()); @@ -274,7 +275,7 @@ public void testHoodieFlinkClusteringSchedule() throws Exception { // To compute the clustering instant time. String clusteringInstantTime = HoodieActiveTimeline.createNewInstantTime(); - HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf); + HoodieFlinkWriteClient writeClient = FlinkWriteClients.createWriteClient(conf); boolean scheduled = writeClient.scheduleClusteringAtInstant(clusteringInstantTime, Option.empty()); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java index ee9285d60a75..711b738288fe 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/compact/ITTestHoodieFlinkCompactor.java @@ -29,6 +29,7 @@ import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.util.CompactionUtil; +import org.apache.hudi.util.FlinkWriteClients; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.FlinkMiniCluster; import org.apache.hudi.utils.TestConfigurations; @@ -137,7 +138,7 @@ public void testHoodieFlinkCompactor(boolean enableChangelog) throws Exception { // infer changelog mode CompactionUtil.inferChangelogMode(conf, metaClient); - HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf); + HoodieFlinkWriteClient writeClient = FlinkWriteClients.createWriteClient(conf); String compactionInstantTime = scheduleCompactionPlan(metaClient, writeClient); @@ -241,7 +242,7 @@ public void testHoodieFlinkCompactorWithPlanSelectStrategy(boolean enableChangel List compactionInstantTimeList = new ArrayList<>(2); - HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf); + HoodieFlinkWriteClient writeClient = FlinkWriteClients.createWriteClient(conf); compactionInstantTimeList.add(scheduleCompactionPlan(metaClient, writeClient)); @@ -255,7 +256,7 @@ public void testHoodieFlinkCompactorWithPlanSelectStrategy(boolean enableChangel // re-create the write client/fs view server // or there is low probability that connection refused occurs then // the reader metadata view is not complete - writeClient = StreamerUtil.createWriteClient(conf); + writeClient = FlinkWriteClients.createWriteClient(conf); metaClient.reloadActiveTimeline(); compactionInstantTimeList.add(scheduleCompactionPlan(metaClient, writeClient)); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java index 0748739064cf..07a3b7515a04 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java @@ -29,6 +29,7 @@ import org.apache.hudi.table.action.commit.BucketInfo; import org.apache.hudi.table.action.commit.BucketType; import org.apache.hudi.table.action.commit.SmallFile; +import org.apache.hudi.util.FlinkWriteClients; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestData; @@ -70,7 +71,7 @@ public void before() throws IOException { final String basePath = tempFile.getAbsolutePath(); conf = TestConfigurations.getDefaultConf(basePath); - writeConfig = StreamerUtil.getHoodieClientConfig(conf); + writeConfig = FlinkWriteClients.getHoodieClientConfig(conf); context = new HoodieFlinkEngineContext( new SerializableConfiguration(HadoopConfigurations.getHadoopConf(conf)), new FlinkTaskContextSupplier(null)); @@ -151,7 +152,7 @@ public void testAddInsert() { @Test public void testInsertOverBucketAssigned() { conf.setInteger(HoodieCompactionConfig.COPY_ON_WRITE_INSERT_SPLIT_SIZE.key(), 2); - writeConfig = StreamerUtil.getHoodieClientConfig(conf); + writeConfig = FlinkWriteClients.getHoodieClientConfig(conf); MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(context, writeConfig); BucketInfo bucketInfo1 = mockBucketAssigner.addInsert("par1"); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java new file mode 100644 index 000000000000..b42fd2c04a3c --- /dev/null +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestIncrementalInputSplits.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.source; + +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.utils.TestConfigurations; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.Path; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertIterableEquals; + +/** + * Test cases for {@link IncrementalInputSplits}. + */ +public class TestIncrementalInputSplits extends HoodieCommonTestHarness { + + @BeforeEach + private void init() throws IOException { + initPath(); + initMetaClient(); + } + + @Test + void testFilterInstantsWithRange() { + HoodieActiveTimeline timeline = new HoodieActiveTimeline(metaClient, true); + Configuration conf = TestConfigurations.getDefaultConf(basePath); + IncrementalInputSplits iis = IncrementalInputSplits.builder() + .conf(conf) + .path(new Path(basePath)) + .rowType(TestConfigurations.ROW_TYPE) + .build(); + + HoodieInstant commit1 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "1"); + HoodieInstant commit2 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "2"); + HoodieInstant commit3 = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, "3"); + timeline.createNewInstant(commit1); + timeline.createNewInstant(commit2); + timeline.createNewInstant(commit3); + timeline = timeline.reload(); + + // previous read iteration read till instant time "1", next read iteration should return ["2", "3"] + List instantRange2 = iis.filterInstantsWithRange(timeline, "1"); + assertEquals(2, instantRange2.size()); + assertIterableEquals(Arrays.asList(commit2, commit3), instantRange2); + + // simulate first iteration cycle with read from LATEST commit + List instantRange1 = iis.filterInstantsWithRange(timeline, null); + assertEquals(1, instantRange1.size()); + assertIterableEquals(Collections.singletonList(commit3), instantRange1); + + // specifying a start and end commit + conf.set(FlinkOptions.READ_START_COMMIT, "1"); + conf.set(FlinkOptions.READ_END_COMMIT, "3"); + List instantRange3 = iis.filterInstantsWithRange(timeline, null); + assertEquals(3, instantRange3.size()); + assertIterableEquals(Arrays.asList(commit1, commit2, commit3), instantRange3); + } + +} diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java index 9f821619089f..b76905ed8af0 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java @@ -30,6 +30,7 @@ import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat; import org.apache.hudi.table.format.mor.MergeOnReadInputFormat; import org.apache.hudi.util.AvroSchemaConverter; +import org.apache.hudi.util.FlinkWriteClients; import org.apache.hudi.util.StreamerUtil; import org.apache.hudi.utils.TestConfigurations; import org.apache.hudi.utils.TestData; @@ -518,7 +519,7 @@ void testReadArchivedCommitsIncrementally() throws Exception { } // cleaning HoodieFlinkWriteClient writeClient = new HoodieFlinkWriteClient<>( - HoodieFlinkEngineContext.DEFAULT, StreamerUtil.getHoodieClientConfig(conf)); + HoodieFlinkEngineContext.DEFAULT, FlinkWriteClients.getHoodieClientConfig(conf)); writeClient.clean(); HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(tempFile.getAbsolutePath(), HadoopConfigurations.getHadoopConf(conf)); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java index 87c8379d6ab8..e8d288585480 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestCompactionUtil.java @@ -33,6 +33,7 @@ import org.apache.hudi.table.HoodieFlinkTable; import org.apache.hudi.util.CompactionUtil; import org.apache.hudi.util.FlinkTables; +import org.apache.hudi.util.FlinkWriteClients; import org.apache.hudi.util.StreamerUtil; import org.apache.flink.configuration.Configuration; @@ -139,7 +140,7 @@ void testScheduleCompaction() throws Exception { // write a commit with data first TestData.writeDataAsBatch(TestData.DATA_SET_SINGLE_INSERT, conf); - HoodieFlinkWriteClient writeClient = StreamerUtil.createWriteClient(conf); + HoodieFlinkWriteClient writeClient = FlinkWriteClients.createWriteClient(conf); CompactionUtil.scheduleCompaction(metaClient, writeClient, true, true); Option pendingCompactionInstant = metaClient.reloadActiveTimeline().filterPendingCompactionTimeline().lastInstant(); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java index 9630d9cd4d73..a641811bb738 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestStreamerUtil.java @@ -19,13 +19,10 @@ package org.apache.hudi.utils; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; -import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.keygen.SimpleAvroKeyGenerator; import org.apache.hudi.util.StreamerUtil; -import org.apache.hudi.util.ViewStorageProperties; import org.apache.flink.configuration.Configuration; import org.junit.jupiter.api.Test; @@ -104,13 +101,5 @@ void testInstantTimeDiff() { long diff = StreamerUtil.instantTimeDiffSeconds(higher, lower); assertThat(diff, is(75L)); } - - @Test - void testDumpRemoteViewStorageConfig() throws IOException { - Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); - StreamerUtil.createWriteClient(conf); - FileSystemViewStorageConfig storageConfig = ViewStorageProperties.loadFromProperties(conf.getString(FlinkOptions.PATH), new Configuration()); - assertThat(storageConfig.getStorageType(), is(FileSystemViewStorageType.REMOTE_FIRST)); - } } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestViewStorageProperties.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestViewStorageProperties.java index 2e3bc7383f55..084f211e6609 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestViewStorageProperties.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestViewStorageProperties.java @@ -20,6 +20,8 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.table.view.FileSystemViewStorageType; +import org.apache.hudi.configuration.FlinkOptions; +import org.apache.hudi.util.FlinkWriteClients; import org.apache.hudi.util.ViewStorageProperties; import org.apache.flink.configuration.Configuration; @@ -56,4 +58,12 @@ void testReadWriteProperties() throws IOException { assertThat(readConfig.getRemoteViewServerHost(), is("host1")); assertThat(readConfig.getRemoteViewServerPort(), is(1234)); } + + @Test + void testDumpRemoteViewStorageConfig() throws IOException { + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + FlinkWriteClients.createWriteClient(conf); + FileSystemViewStorageConfig storageConfig = ViewStorageProperties.loadFromProperties(conf.getString(FlinkOptions.PATH), new Configuration()); + assertThat(storageConfig.getStorageType(), is(FileSystemViewStorageType.REMOTE_FIRST)); + } }