From d00a8f549f54057a53fedce3e07a050c63ac2efa Mon Sep 17 00:00:00 2001 From: Florian Hussonnois Date: Mon, 27 Jan 2025 14:48:25 +0100 Subject: [PATCH] fix(webserver): ensure queues are not closed in nioEventLoop --- .../controllers/api/ExecutionController.java | 8 +++++--- .../webserver/controllers/api/LogController.java | 16 +++++++--------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/webserver/src/main/java/io/kestra/webserver/controllers/api/ExecutionController.java b/webserver/src/main/java/io/kestra/webserver/controllers/api/ExecutionController.java index f13b5bae3cf..ee25fee98d5 100644 --- a/webserver/src/main/java/io/kestra/webserver/controllers/api/ExecutionController.java +++ b/webserver/src/main/java/io/kestra/webserver/controllers/api/ExecutionController.java @@ -1527,9 +1527,11 @@ public Flux> follow( cancel.set(receive); }, FluxSink.OverflowStrategy.BUFFER) .doFinally(ignored -> { - if (cancel.get() != null) { - cancel.get().run(); - } + Schedulers.boundedElastic().schedule(() -> { + if (cancel.get() != null) { + cancel.get().run(); + } + }); }); } diff --git a/webserver/src/main/java/io/kestra/webserver/controllers/api/LogController.java b/webserver/src/main/java/io/kestra/webserver/controllers/api/LogController.java index 8303c940ea9..d16ce60d58e 100644 --- a/webserver/src/main/java/io/kestra/webserver/controllers/api/LogController.java +++ b/webserver/src/main/java/io/kestra/webserver/controllers/api/LogController.java @@ -25,6 +25,7 @@ import org.slf4j.event.Level; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; +import reactor.core.scheduler.Schedulers; import java.io.ByteArrayInputStream; import java.io.InputStream; @@ -156,15 +157,12 @@ public Flux> follow( cancel.set(receive); }, FluxSink.OverflowStrategy.BUFFER) - .doOnCancel(() -> { - if (cancel.get() != null) { - cancel.get().run(); - } - }) - .doOnComplete(() -> { - if (cancel.get() != null) { - cancel.get().run(); - } + .doFinally(ignored -> { + Schedulers.boundedElastic().schedule(() -> { + if (cancel.get() != null) { + cancel.get().run(); + } + }); }); }