diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java index 4d406140c2df..0f4532925865 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java @@ -9,6 +9,8 @@ import com.google.common.base.Preconditions; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.lang.Exceptions.Procedure; +import io.airbyte.commons.string.Strings; import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.integrations.base.sentry.AirbyteSentry; import io.airbyte.protocol.models.AirbyteConnectionStatus; @@ -18,13 +20,21 @@ import io.airbyte.validation.json.JsonSchemaValidator; import io.sentry.ITransaction; import io.sentry.Sentry; +import io.sentry.SentryLevel; import io.sentry.SpanStatus; import java.nio.file.Path; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Scanner; import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.apache.commons.lang3.ThreadUtils; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,6 +47,11 @@ public class IntegrationRunner { private static final Logger LOGGER = LoggerFactory.getLogger(IntegrationRunner.class); + public static final int INTERRUPT_THREAD_DELAY_MINUTES = 60; + public static final int EXIT_THREAD_DELAY_MINUTES = 70; + + public static final int FORCED_EXIT_CODE = 2; + private final IntegrationCliParser cliParser; private final Consumer outputRecordCollector; private final Integration integration; @@ -74,7 +89,7 @@ public IntegrationRunner(final Source source) { final Source source, final JsonSchemaValidator jsonSchemaValidator) { this(cliParser, outputRecordCollector, destination, source); - this.validator = jsonSchemaValidator; + validator = jsonSchemaValidator; } public void run(final String[] args) throws Exception { @@ -145,7 +160,7 @@ public void runInternal(final ITransaction transaction, final IntegrationConfig validateConfig(integration.spec().getConnectionSpecification(), config, "WRITE"); final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class); try (final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, outputRecordCollector)) { - AirbyteSentry.executeWithTracing("WriteDestination", () -> consumeWriteStream(consumer)); + AirbyteSentry.executeWithTracing("WriteDestination", () -> runConsumer(consumer)); } } default -> throw new IllegalStateException("Unexpected value: " + parsed.getCommand()); @@ -171,6 +186,88 @@ static void consumeWriteStream(final AirbyteMessageConsumer consumer) throws Exc } } + private static void runConsumer(final AirbyteMessageConsumer consumer) throws Exception { + watchForOrphanThreads( + () -> consumeWriteStream(consumer), + () -> System.exit(FORCED_EXIT_CODE), + true, + INTERRUPT_THREAD_DELAY_MINUTES, + TimeUnit.MINUTES, + EXIT_THREAD_DELAY_MINUTES, + TimeUnit.MINUTES); + } + + /** + * This method calls a runMethod and make sure that it won't produce orphan non-daemon active + * threads once it is done. Active non-daemon threads blocks JVM from exiting when the main thread + * is done, whereas daemon ones don't. + * + * If any active non-daemon threads would be left as orphans, this method will schedule some + * interrupt/exit hooks after giving it some time delay to close up properly. It is generally + * preferred to have a proper closing sequence from children threads instead of interrupting or + * force exiting the process, so this mechanism serve as a fallback while surfacing warnings in logs + * and sentry for maintainers to fix the code behavior instead. + */ + @VisibleForTesting + static void watchForOrphanThreads(final Procedure runMethod, + final Runnable exitHook, + final boolean sentryEnabled, + final int interruptTimeDelay, + final TimeUnit interruptTimeUnit, + final int exitTimeDelay, + final TimeUnit exitTimeUnit) + throws Exception { + final Thread currentThread = Thread.currentThread(); + try { + runMethod.call(); + } finally { + final List runningThreads = ThreadUtils.getAllThreads() + .stream() + // daemon threads don't block the JVM if the main `currentThread` exits, so they are not problematic + .filter(runningThread -> !runningThread.getName().equals(currentThread.getName()) && !runningThread.isDaemon()) + .collect(Collectors.toList()); + if (!runningThreads.isEmpty()) { + final StringBuilder sentryMessageBuilder = new StringBuilder(); + LOGGER.warn(""" + The main thread is exiting while children non-daemon threads from a connector are still active. + Ideally, this situation should not happen... + Please check with maintainers if the connector or library code should safely clean up its threads before quitting instead. + The main thread is: {}""", dumpThread(currentThread)); + sentryMessageBuilder.append("The main thread is exiting while children non-daemon threads are still active.\nMain Thread:") + .append(dumpThread(currentThread)); + final ScheduledExecutorService scheduledExecutorService = Executors + .newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder() + // this thread executor will create daemon threads, so it does not block exiting if all other active + // threads are already stopped. + .daemon(true).build()); + for (final Thread runningThread : runningThreads) { + final String str = "Active non-daemon thread: " + dumpThread(runningThread); + LOGGER.warn(str); + sentryMessageBuilder.append(str); + // even though the main thread is already shutting down, we still leave some chances to the children + // threads to close properly on their own. + // So, we schedule an interrupt hook after a fixed time delay instead... + scheduledExecutorService.schedule(runningThread::interrupt, interruptTimeDelay, interruptTimeUnit); + } + if (!sentryEnabled) { + Sentry.captureMessage(sentryMessageBuilder.toString(), SentryLevel.WARNING); + } + scheduledExecutorService.schedule(() -> { + if (ThreadUtils.getAllThreads().stream() + .anyMatch(runningThread -> !runningThread.isDaemon() && !runningThread.getName().equals(currentThread.getName()))) { + LOGGER.error("Failed to interrupt children non-daemon threads, forcefully exiting NOW...\n"); + exitHook.run(); + } + }, exitTimeDelay, exitTimeUnit); + } + } + } + + private static String dumpThread(final Thread thread) { + return String.format("%s (%s)\n Thread stacktrace: %s", thread.getName(), thread.getState(), + Strings.join(List.of(thread.getStackTrace()), "\n at ")); + } + private static void validateConfig(final JsonNode schemaJson, final JsonNode objectJson, final String operationType) throws Exception { final Set validationResult = validator.validate(schemaJson, objectJson); if (!validationResult.isEmpty()) { diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java index c2670ed5b6e9..39a96aecfcc9 100644 --- a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java +++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java @@ -4,7 +4,9 @@ package io.airbyte.integrations.base; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; @@ -38,14 +40,27 @@ import java.nio.file.Files; import java.nio.file.Path; import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.apache.commons.lang3.ThreadUtils; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.InOrder; import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; class IntegrationRunnerTest { + private static final Logger LOGGER = LoggerFactory.getLogger(IntegrationRunnerTest.class); + private static final String CONFIG_FILE_NAME = "config.json"; private static final String CONFIGURED_CATALOG_FILE_NAME = "configured_catalog.json"; private static final String STATE_FILE_NAME = "state.json"; @@ -82,6 +97,12 @@ void setup() throws IOException { configPath = IOs.writeFile(configDir, CONFIG_FILE_NAME, CONFIG_STRING); configuredCatalogPath = IOs.writeFile(configDir, CONFIGURED_CATALOG_FILE_NAME, Jsons.serialize(CONFIGURED_CATALOG)); statePath = IOs.writeFile(configDir, STATE_FILE_NAME, Jsons.serialize(STATE)); + + final String testName = Thread.currentThread().getName(); + ThreadUtils.getAllThreads() + .stream() + .filter(runningThread -> !runningThread.isDaemon()) + .forEach(runningThread -> runningThread.setName(testName)); } @Test @@ -275,4 +296,76 @@ void testDestinationConsumerLifecycleFailure() throws Exception { } } + @Test + void testInterruptOrphanThreadFailure() { + final String testName = Thread.currentThread().getName(); + final List caughtExceptions = new ArrayList<>(); + startSleepingThread(caughtExceptions, false); + assertThrows(IOException.class, () -> IntegrationRunner.watchForOrphanThreads( + () -> { + throw new IOException("random error"); + }, + Assertions::fail, + false, + 3, TimeUnit.SECONDS, + 10, TimeUnit.SECONDS)); + try { + TimeUnit.SECONDS.sleep(10); + } catch (Exception e) { + throw new RuntimeException(e); + } + final List runningThreads = ThreadUtils.getAllThreads().stream() + .filter(runningThread -> !runningThread.isDaemon() && !runningThread.getName().equals(testName)) + .collect(Collectors.toList()); + // all threads should be interrupted + assertEquals(List.of(), runningThreads); + assertEquals(1, caughtExceptions.size()); + } + + @Test + void testNoInterruptOrphanThreadFailure() { + final String testName = Thread.currentThread().getName(); + final List caughtExceptions = new ArrayList<>(); + final AtomicBoolean exitCalled = new AtomicBoolean(false); + startSleepingThread(caughtExceptions, true); + assertThrows(IOException.class, () -> IntegrationRunner.watchForOrphanThreads( + () -> { + throw new IOException("random error"); + }, + () -> exitCalled.set(true), + false, + 3, TimeUnit.SECONDS, + 10, TimeUnit.SECONDS)); + try { + TimeUnit.SECONDS.sleep(10); + } catch (Exception e) { + throw new RuntimeException(e); + } + final List runningThreads = ThreadUtils.getAllThreads().stream() + .filter(runningThread -> !runningThread.isDaemon() && !runningThread.getName().equals(testName)) + .collect(Collectors.toList()); + // a thread that refuses to be interrupted should remain + assertEquals(1, runningThreads.size()); + assertEquals(1, caughtExceptions.size()); + assertTrue(exitCalled.get()); + } + + private void startSleepingThread(final List caughtExceptions, final boolean ignoreInterrupt) { + final ExecutorService executorService = Executors.newFixedThreadPool(1); + executorService.submit(() -> { + for (int tries = 0; tries < 3; tries++) { + try { + TimeUnit.MINUTES.sleep(5); + } catch (Exception e) { + LOGGER.info("Caught Exception", e); + caughtExceptions.add(e); + if (!ignoreInterrupt) { + executorService.shutdownNow(); + break; + } + } + } + }); + } + }