From 744b86e379c72d03a630788086b300b2b512a557 Mon Sep 17 00:00:00 2001 From: Ludovic DEHON Date: Wed, 12 Oct 2022 20:42:16 +0200 Subject: [PATCH] fix(kafka-runner): we always need to wait for the flow for new created flow --- .../kafka/services/KafkaStreamSourceService.java | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/services/KafkaStreamSourceService.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/services/KafkaStreamSourceService.java index 31396209530..26e189ed43c 100644 --- a/runner-kafka/src/main/java/io/kestra/runner/kafka/services/KafkaStreamSourceService.java +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/services/KafkaStreamSourceService.java @@ -73,16 +73,11 @@ public Executor joinFlow(Executor executor, Boolean withDefaults) { try { // pooling of new flow can be delayed on ExecutorStore, we maybe need to wait that the flow is updated - if (!flowExecutorInterface.isReady()) { - flow = Await.until( - () -> flowExecutorInterface.findByExecution(executor.getExecution()).orElse(null), - Duration.ofMillis(100), - Duration.ofMinutes(5) - ); - } else { - flow = flowExecutorInterface.findByExecution(executor.getExecution()) - .orElseThrow(() -> new TimeoutException("Unable to find flow with flow executor ready")); - } + flow = Await.until( + () -> flowExecutorInterface.findByExecution(executor.getExecution()).orElse(null), + Duration.ofMillis(100), + Duration.ofMinutes(5) + ); } catch (TimeoutException e) { // execution is failed, can't find flow, avoid recursive exception, skipped it. if (executor.getExecution().getState().isFailed()) {