From 2c3b8bc63895fade94aaaed8d88d0b75788d64c3 Mon Sep 17 00:00:00 2001 From: Florian Hussonnois Date: Wed, 15 Jan 2025 16:51:44 +0100 Subject: [PATCH] fix(jdbc): ensure JdbcIndexer is only closed once --- .../io/kestra/jdbc/runner/JdbcIndexer.java | 24 ++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcIndexer.java b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcIndexer.java index 494d5151b67..5590b66e1a6 100644 --- a/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcIndexer.java +++ b/jdbc/src/main/java/io/kestra/jdbc/runner/JdbcIndexer.java @@ -17,6 +17,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import io.micronaut.context.event.ApplicationEventPublisher; @@ -47,6 +48,8 @@ public class JdbcIndexer implements IndexerInterface { private final AtomicReference state = new AtomicReference<>(); private final ApplicationEventPublisher eventPublisher; + private final AtomicBoolean closed = new AtomicBoolean(false); + @Inject public JdbcIndexer( LogRepositoryInterface logRepository, @@ -122,14 +125,19 @@ public ServiceState getState() { @PreDestroy @Override public void close() { - setState(ServiceState.TERMINATING); - this.receiveCancellations.forEach(Runnable::run); - try { - stopQueue(); - setState(ServiceState.TERMINATED_GRACEFULLY); - } catch (IOException e) { - log.error("Failed to close the queue", e); - setState(ServiceState.TERMINATED_FORCED); + if (closed.compareAndSet(false, true)) { + setState(ServiceState.TERMINATING); + if (log.isDebugEnabled()) { + log.debug("Terminating"); + } + this.receiveCancellations.forEach(Runnable::run); + try { + stopQueue(); + setState(ServiceState.TERMINATED_GRACEFULLY); + } catch (IOException e) { + log.error("Failed to close the queue", e); + setState(ServiceState.TERMINATED_FORCED); + } } }