diff --git a/runner-kafka/src/main/java/io/kestra/runner/kafka/services/KafkaStreamSourceService.java b/runner-kafka/src/main/java/io/kestra/runner/kafka/services/KafkaStreamSourceService.java index 31396209530..26e189ed43c 100644 --- a/runner-kafka/src/main/java/io/kestra/runner/kafka/services/KafkaStreamSourceService.java +++ b/runner-kafka/src/main/java/io/kestra/runner/kafka/services/KafkaStreamSourceService.java @@ -73,16 +73,11 @@ public Executor joinFlow(Executor executor, Boolean withDefaults) { try { // pooling of new flow can be delayed on ExecutorStore, we maybe need to wait that the flow is updated - if (!flowExecutorInterface.isReady()) { - flow = Await.until( - () -> flowExecutorInterface.findByExecution(executor.getExecution()).orElse(null), - Duration.ofMillis(100), - Duration.ofMinutes(5) - ); - } else { - flow = flowExecutorInterface.findByExecution(executor.getExecution()) - .orElseThrow(() -> new TimeoutException("Unable to find flow with flow executor ready")); - } + flow = Await.until( + () -> flowExecutorInterface.findByExecution(executor.getExecution()).orElse(null), + Duration.ofMillis(100), + Duration.ofMinutes(5) + ); } catch (TimeoutException e) { // execution is failed, can't find flow, avoid recursive exception, skipped it. if (executor.getExecution().getState().isFailed()) {