From fc8081004781833feb63cf68e3f302bc968c4819 Mon Sep 17 00:00:00 2001 From: cgardens Date: Tue, 13 Oct 2020 11:24:49 -0700 Subject: [PATCH] move graceful thread shutdown to a helper --- .../concurrency/GracefulShutdownHandler.java | 17 ++++++++++------- .../GracefulShutdownHandlerTest.java | 12 ++++++------ .../java/io/airbyte/scheduler/SchedulerApp.java | 4 +++- 3 files changed, 19 insertions(+), 14 deletions(-) rename airbyte-scheduler/src/main/java/io/airbyte/scheduler/SchedulerShutdownHandler.java => airbyte-commons/src/main/java/io/airbyte/commons/concurrency/GracefulShutdownHandler.java (70%) rename airbyte-scheduler/src/test/java/io/airbyte/scheduler/SchedulerShutdownHandlerTest.java => airbyte-commons/src/test/java/io/airbyte/commons/concurrency/GracefulShutdownHandlerTest.java (82%) diff --git a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/SchedulerShutdownHandler.java b/airbyte-commons/src/main/java/io/airbyte/commons/concurrency/GracefulShutdownHandler.java similarity index 70% rename from airbyte-scheduler/src/main/java/io/airbyte/scheduler/SchedulerShutdownHandler.java rename to airbyte-commons/src/main/java/io/airbyte/commons/concurrency/GracefulShutdownHandler.java index 8c78341b83f9..c4fefee78b9f 100644 --- a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/SchedulerShutdownHandler.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/concurrency/GracefulShutdownHandler.java @@ -22,19 +22,22 @@ * SOFTWARE. */ -package io.airbyte.scheduler; +package io.airbyte.commons.concurrency; +import java.time.Duration; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SchedulerShutdownHandler extends Thread { +public class GracefulShutdownHandler extends Thread { - private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerShutdownHandler.class); + private static final Logger LOGGER = LoggerFactory.getLogger(GracefulShutdownHandler.class); + private final Duration terminateWaitDuration; private final ExecutorService[] threadPools; - public SchedulerShutdownHandler(final ExecutorService... threadPools) { + public GracefulShutdownHandler(Duration terminateWaitDuration, final ExecutorService... threadPools) { + this.terminateWaitDuration = terminateWaitDuration; this.threadPools = threadPools; } @@ -44,11 +47,11 @@ public void run() { threadPool.shutdown(); try { - if (!threadPool.awaitTermination(30, TimeUnit.SECONDS)) { - LOGGER.error("Unable to kill worker threads by shutdown timeout."); + if (!threadPool.awaitTermination(terminateWaitDuration.getSeconds(), TimeUnit.SECONDS)) { + LOGGER.error("Unable to kill threads by shutdown timeout."); } } catch (InterruptedException e) { - LOGGER.error("Wait for graceful worker thread shutdown interrupted.", e); + LOGGER.error("Wait for graceful thread shutdown interrupted.", e); } } } diff --git a/airbyte-scheduler/src/test/java/io/airbyte/scheduler/SchedulerShutdownHandlerTest.java b/airbyte-commons/src/test/java/io/airbyte/commons/concurrency/GracefulShutdownHandlerTest.java similarity index 82% rename from airbyte-scheduler/src/test/java/io/airbyte/scheduler/SchedulerShutdownHandlerTest.java rename to airbyte-commons/src/test/java/io/airbyte/commons/concurrency/GracefulShutdownHandlerTest.java index 43e937fec610..0130143c68bc 100644 --- a/airbyte-scheduler/src/test/java/io/airbyte/scheduler/SchedulerShutdownHandlerTest.java +++ b/airbyte-commons/src/test/java/io/airbyte/commons/concurrency/GracefulShutdownHandlerTest.java @@ -22,23 +22,23 @@ * SOFTWARE. */ -package io.airbyte.scheduler; +package io.airbyte.commons.concurrency; -import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import java.time.Duration; import java.util.concurrent.ExecutorService; import org.junit.jupiter.api.Test; -class SchedulerShutdownHandlerTest { +class GracefulShutdownHandlerTest { @Test public void testRun() throws InterruptedException { final ExecutorService executorService = mock(ExecutorService.class); - final SchedulerShutdownHandler schedulerShutdownHandler = new SchedulerShutdownHandler(executorService); - schedulerShutdownHandler.start(); - schedulerShutdownHandler.join(); + final GracefulShutdownHandler gracefulShutdownHandler = new GracefulShutdownHandler(Duration.ofSeconds(30), executorService); + gracefulShutdownHandler.start(); + gracefulShutdownHandler.join(); verify(executorService).shutdown(); } diff --git a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/SchedulerApp.java b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/SchedulerApp.java index 129949dee8f2..3683b6f23df5 100644 --- a/airbyte-scheduler/src/main/java/io/airbyte/scheduler/SchedulerApp.java +++ b/airbyte-scheduler/src/main/java/io/airbyte/scheduler/SchedulerApp.java @@ -25,6 +25,7 @@ package io.airbyte.scheduler; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.airbyte.commons.concurrency.GracefulShutdownHandler; import io.airbyte.config.Configs; import io.airbyte.config.EnvConfigs; import io.airbyte.config.persistence.ConfigPersistence; @@ -36,6 +37,7 @@ import io.airbyte.workers.process.DockerProcessBuilderFactory; import io.airbyte.workers.process.ProcessBuilderFactory; import java.nio.file.Path; +import java.time.Duration; import java.time.Instant; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -98,7 +100,7 @@ public void start() { JOB_SUBMITTER_DELAY_MILLIS, TimeUnit.MILLISECONDS); - Runtime.getRuntime().addShutdownHook(new SchedulerShutdownHandler(workerThreadPool, scheduledPool)); + Runtime.getRuntime().addShutdownHook(new GracefulShutdownHandler(Duration.ofSeconds(30), workerThreadPool, scheduledPool)); } public static void main(String[] args) {