From 592391050ad63709e92d3c89353ab04a938d52c2 Mon Sep 17 00:00:00 2001 From: "yuzhao.cyz" Date: Fri, 15 Jul 2022 12:15:07 +0800 Subject: [PATCH] [HUDI-4403] Fix the end input metadata for bounded source --- .../apache/hudi/sink/StreamWriteFunction.java | 12 ++++++++- .../sink/StreamWriteOperatorCoordinator.java | 23 ++++++++++------- .../common/AbstractStreamWriteFunction.java | 9 ++++++- .../org/apache/hudi/sink/utils/Pipelines.java | 22 +++++++++------- .../hudi/sink/ITTestDataStreamWrite.java | 25 +++---------------- .../TestStreamWriteOperatorCoordinator.java | 11 ++++---- 6 files changed, 55 insertions(+), 47 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java index 2748af5290646..bbaba0414446d 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteFunction.java @@ -408,6 +408,16 @@ private boolean hasData() { && this.buckets.values().stream().anyMatch(bucket -> bucket.records.size() > 0); } + private void cleanWriteHandles() { + if (freshInstant(currentInstant)) { + // In rare cases, when a checkpoint was aborted and the instant time + // is reused, the merge handle generates a new file name + // with the reused instant time of last checkpoint, the write handles + // should be kept and reused in case data loss. + this.writeClient.cleanHandles(); + } + } + @SuppressWarnings("unchecked, rawtypes") private boolean flushBucket(DataBucket bucket) { String instant = instantToWrite(true); @@ -479,7 +489,7 @@ private void flushRemaining(boolean endInput) { this.eventGateway.sendEventToCoordinator(event); this.buckets.clear(); this.tracer.reset(); - this.writeClient.cleanHandles(); + cleanWriteHandles(); this.writeStatuses.addAll(writeStatus); // blocks flushing until the coordinator starts a new instant this.confirming = true; diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index 6aa4c0b1f8d6a..4b833897b6c74 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -365,7 +365,10 @@ private void reset() { */ private boolean allEventsReceived() { return Arrays.stream(eventBuffer) - .allMatch(event -> event != null && event.isReady(this.instant)); + // we do not use even.isReady to check the instant + // because the write task may send an event eagerly for empty + // data set, the even may have a timestamp of last committed instant. + .allMatch(event -> event != null && event.isLastBatch()); } private void addEventToBuffer(WriteMetadataEvent event) { @@ -425,12 +428,14 @@ private void handleEndInputEvent(WriteMetadataEvent event) { addEventToBuffer(event); if (allEventsReceived()) { // start to commit the instant. - commitInstant(this.instant); - // The executor thread inherits the classloader of the #handleEventFromOperator - // caller, which is a AppClassLoader. - Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); - // sync Hive synchronously if it is enabled in batch mode. - syncHive(); + boolean committed = commitInstant(this.instant); + if (committed) { + // The executor thread inherits the classloader of the #handleEventFromOperator + // caller, which is a AppClassLoader. + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + // sync Hive synchronously if it is enabled in batch mode. + syncHive(); + } } } @@ -474,8 +479,8 @@ private static boolean sendToFinishedTasks(Throwable throwable) { /** * Commits the instant. */ - private void commitInstant(String instant) { - commitInstant(instant, -1); + private boolean commitInstant(String instant) { + return commitInstant(instant, -1); } /** diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java index 6cf3d10fc2ef1..04b7f43547920 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java @@ -243,6 +243,13 @@ protected String lastPendingInstant() { return this.ckpMetadata.lastPendingInstant(); } + /** + * Returns whether the instant is fresh new(not aborted). + */ + protected boolean freshInstant(String instant) { + return !this.ckpMetadata.isAborted(instant); + } + /** * Prepares the instant time to write with for next checkpoint. * @@ -279,6 +286,6 @@ protected String instantToWrite(boolean hasData) { * Returns whether the pending instant is invalid to write with. */ private boolean invalidInstant(String instant, boolean hasData) { - return instant.equals(this.currentInstant) && hasData && !this.ckpMetadata.isAborted(instant); + return instant.equals(this.currentInstant) && hasData && freshInstant(instant); } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index 87a6551986d48..31355255f905b 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -358,9 +358,9 @@ public static DataStream hoodieStreamWrite(Configuration conf, int defau * The whole pipeline looks like the following: * *
-   *                                           /=== | task1 | ===\
-   *      | plan generation | ===> hash                           | commit |
-   *                                           \=== | task2 | ===/
+   *                                     /=== | task1 | ===\
+   *      | plan generation | ===> hash                      | commit |
+   *                                     \=== | task2 | ===/
    *
    *      Note: both the compaction plan generation task and commission task are singleton.
    * 
@@ -374,6 +374,8 @@ public static DataStreamSink compact(Configuration conf, TypeInformation.of(CompactionPlanEvent.class), new CompactionPlanOperator(conf)) .setParallelism(1) // plan generate must be singleton + // make the distribution strategy deterministic to avoid concurrent modifications + // on the same bucket files .keyBy(plan -> plan.getOperation().getFileGroupId().getFileId()) .transform("compact_task", TypeInformation.of(CompactionCommitEvent.class), @@ -393,9 +395,9 @@ public static DataStreamSink compact(Configuration conf, * The whole pipeline looks like the following: * *
-   *                                           /=== | task1 | ===\
-   *      | plan generation | ===> hash                           | commit |
-   *                                           \=== | task2 | ===/
+   *                                     /=== | task1 | ===\
+   *      | plan generation | ===> hash                      | commit |
+   *                                     \=== | task2 | ===/
    *
    *      Note: both the clustering plan generation task and commission task are singleton.
    * 
@@ -410,9 +412,11 @@ public static DataStreamSink cluster(Configuration conf, TypeInformation.of(ClusteringPlanEvent.class), new ClusteringPlanOperator(conf)) .setParallelism(1) // plan generate must be singleton - .keyBy(plan -> plan.getClusteringGroupInfo().getOperations() - .stream().map(ClusteringOperation::getFileId) - .collect(Collectors.joining())) + .keyBy(plan -> + // make the distribution strategy deterministic to avoid concurrent modifications + // on the same bucket files + plan.getClusteringGroupInfo().getOperations() + .stream().map(ClusteringOperation::getFileId).collect(Collectors.joining())) .transform("clustering_task", TypeInformation.of(ClusteringCommitEvent.class), new ClusteringOperator(conf, rowType)) diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java index 3d96c1cafad13..1589cf31e7405 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/ITTestDataStreamWrite.java @@ -248,21 +248,8 @@ private void testWriteToHoodie( Pipelines.clean(conf, pipeline); Pipelines.compact(conf, pipeline); } - JobClient client = execEnv.executeAsync(jobName); - if (isMor) { - if (client.getJobStatus().get() != JobStatus.FAILED) { - try { - TimeUnit.SECONDS.sleep(20); // wait long enough for the compaction to finish - client.cancel(); - } catch (Throwable var1) { - // ignored - } - } - } else { - // wait for the streaming job to finish - client.getJobExecutionResult().get(); - } + execute(execEnv, isMor, jobName); TestData.checkWrittenDataCOW(tempFile, expected); } @@ -322,17 +309,14 @@ private void testWriteToHoodieWithCluster( execEnv.addOperator(pipeline.getTransformation()); Pipelines.cluster(conf, rowType, pipeline); - JobClient client = execEnv.executeAsync(jobName); - - // wait for the streaming job to finish - client.getJobExecutionResult().get(); + execEnv.execute(jobName); TestData.checkWrittenDataCOW(tempFile, expected); } public void execute(StreamExecutionEnvironment execEnv, boolean isMor, String jobName) throws Exception { - JobClient client = execEnv.executeAsync(jobName); if (isMor) { + JobClient client = execEnv.executeAsync(jobName); if (client.getJobStatus().get() != JobStatus.FAILED) { try { TimeUnit.SECONDS.sleep(20); // wait long enough for the compaction to finish @@ -343,7 +327,7 @@ public void execute(StreamExecutionEnvironment execEnv, boolean isMor, String jo } } else { // wait for the streaming job to finish - client.getJobExecutionResult().get(); + execEnv.execute(jobName); } } @@ -451,5 +435,4 @@ public void testHoodiePipelineBuilderSink() throws Exception { execute(execEnv, true, "Api_Sink_Test"); TestData.checkWrittenDataCOW(tempFile, EXPECTED); } - } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java index d5d35f7494f48..277a6ab8cb9f9 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java @@ -315,18 +315,17 @@ public void testEndInputIsTheLastEvent() throws Exception { coordinator.handleEventFromOperator(0, WriteMetadataEvent.emptyBootstrap(0)); TimeUnit.SECONDS.sleep(5); // wait for handled bootstrap event - int eventCount = 20_000; // big enough to fill executor's queue - for (int i = 0; i < eventCount; i++) { - coordinator.handleEventFromOperator(0, createOperatorEvent(0, coordinator.getInstant(), "par1", true, 0.1)); - } - WriteMetadataEvent endInput = WriteMetadataEvent.builder() .taskID(0) .instantTime(coordinator.getInstant()) .writeStatus(Collections.emptyList()) .endInput(true) .build(); - coordinator.handleEventFromOperator(0, endInput); + + int eventCount = 20_000; // big enough to fill executor's queue + for (int i = 0; i < eventCount; i++) { + coordinator.handleEventFromOperator(0, endInput); + } // wait for submitted events completed executor.close();