Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Surface any active child thread of dying connectors #10660

Merged
merged 10 commits into from
Mar 3, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import com.google.common.base.Preconditions;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions.Procedure;
import io.airbyte.commons.string.Strings;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.integrations.base.sentry.AirbyteSentry;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
Expand All @@ -18,13 +20,21 @@
import io.airbyte.validation.json.JsonSchemaValidator;
import io.sentry.ITransaction;
import io.sentry.Sentry;
import io.sentry.SentryLevel;
import io.sentry.SpanStatus;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ThreadUtils;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,6 +47,11 @@ public class IntegrationRunner {

private static final Logger LOGGER = LoggerFactory.getLogger(IntegrationRunner.class);

public static final int INTERRUPT_THREAD_DELAY_MINUTES = 60;
public static final int EXIT_THREAD_DELAY_MINUTES = 70;

public static final int FORCED_EXIT_CODE = 2;

private final IntegrationCliParser cliParser;
private final Consumer<AirbyteMessage> outputRecordCollector;
private final Integration integration;
Expand Down Expand Up @@ -74,7 +89,7 @@ public IntegrationRunner(final Source source) {
final Source source,
final JsonSchemaValidator jsonSchemaValidator) {
this(cliParser, outputRecordCollector, destination, source);
this.validator = jsonSchemaValidator;
validator = jsonSchemaValidator;
}

public void run(final String[] args) throws Exception {
Expand Down Expand Up @@ -145,7 +160,7 @@ public void runInternal(final ITransaction transaction, final IntegrationConfig
validateConfig(integration.spec().getConnectionSpecification(), config, "WRITE");
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class);
try (final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, outputRecordCollector)) {
AirbyteSentry.executeWithTracing("WriteDestination", () -> consumeWriteStream(consumer));
AirbyteSentry.executeWithTracing("WriteDestination", () -> runConsumer(consumer));
}
}
default -> throw new IllegalStateException("Unexpected value: " + parsed.getCommand());
Expand All @@ -171,6 +186,88 @@ static void consumeWriteStream(final AirbyteMessageConsumer consumer) throws Exc
}
}

private static void runConsumer(final AirbyteMessageConsumer consumer) throws Exception {
watchForOrphanThreads(
() -> consumeWriteStream(consumer),
() -> System.exit(FORCED_EXIT_CODE),
true,
INTERRUPT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES,
EXIT_THREAD_DELAY_MINUTES,
TimeUnit.MINUTES);
}

/**
* This method calls a runMethod and make sure that it won't produce orphan non-daemon active
* threads once it is done. Active non-daemon threads blocks JVM from exiting when the main thread
* is done, whereas daemon ones don't.
*
* If any active non-daemon threads would be left as orphans, this method will schedule some
* interrupt/exit hooks after giving it some time delay to close up properly. It is generally
* preferred to have a proper closing sequence from children threads instead of interrupting or
* force exiting the process, so this mechanism serve as a fallback while surfacing warnings in logs
* and sentry for maintainers to fix the code behavior instead.
*/
@VisibleForTesting
static void watchForOrphanThreads(final Procedure runMethod,
final Runnable exitHook,
final boolean sentryEnabled,
final int interruptTimeDelay,
final TimeUnit interruptTimeUnit,
final int exitTimeDelay,
final TimeUnit exitTimeUnit)
throws Exception {
final Thread currentThread = Thread.currentThread();
try {
runMethod.call();
} finally {
final List<Thread> runningThreads = ThreadUtils.getAllThreads()
.stream()
// daemon threads don't block the JVM if the main `currentThread` exits, so they are not problematic
.filter(runningThread -> !runningThread.getName().equals(currentThread.getName()) && !runningThread.isDaemon())
ChristopheDuong marked this conversation as resolved.
Show resolved Hide resolved
.collect(Collectors.toList());
if (!runningThreads.isEmpty()) {
final StringBuilder sentryMessageBuilder = new StringBuilder();
LOGGER.warn("""
The main thread is exiting while children non-daemon threads from a connector are still active.
Ideally, this situation should not happen...
Please check with maintainers if the connector or library code should safely clean up its threads before quitting instead.
The main thread is: {}""", dumpThread(currentThread));
sentryMessageBuilder.append("The main thread is exiting while children non-daemon threads are still active.\nMain Thread:")
.append(dumpThread(currentThread));
final ScheduledExecutorService scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor(new BasicThreadFactory.Builder()
// this thread executor will create daemon threads, so it does not block exiting if all other active
// threads are already stopped.
.daemon(true).build());
for (final Thread runningThread : runningThreads) {
final String str = "Active non-daemon thread: " + dumpThread(runningThread);
LOGGER.warn(str);
sentryMessageBuilder.append(str);
// even though the main thread is already shutting down, we still leave some chances to the children
// threads to close properly on their own.
// So, we schedule an interrupt hook after a fixed time delay instead...
scheduledExecutorService.schedule(runningThread::interrupt, interruptTimeDelay, interruptTimeUnit);
}
if (!sentryEnabled) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChristopheDuong, this check should be reversed, right?

Actually it is always safe to call Sentry. When it is not initialized, it will just do nothing. I created #11224 to update this.

Sentry.captureMessage(sentryMessageBuilder.toString(), SentryLevel.WARNING);
}
scheduledExecutorService.schedule(() -> {
if (ThreadUtils.getAllThreads().stream()
.anyMatch(runningThread -> !runningThread.isDaemon() && !runningThread.getName().equals(currentThread.getName()))) {
LOGGER.error("Failed to interrupt children non-daemon threads, forcefully exiting NOW...\n");
exitHook.run();
}
}, exitTimeDelay, exitTimeUnit);
}
}
}

private static String dumpThread(final Thread thread) {
return String.format("%s (%s)\n Thread stacktrace: %s", thread.getName(), thread.getState(),
Strings.join(List.of(thread.getStackTrace()), "\n at "));
}

private static void validateConfig(final JsonNode schemaJson, final JsonNode objectJson, final String operationType) throws Exception {
final Set<String> validationResult = validator.validate(schemaJson, objectJson);
if (!validationResult.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

package io.airbyte.integrations.base;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
Expand Down Expand Up @@ -38,14 +40,27 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ThreadUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class IntegrationRunnerTest {

private static final Logger LOGGER = LoggerFactory.getLogger(IntegrationRunnerTest.class);

private static final String CONFIG_FILE_NAME = "config.json";
private static final String CONFIGURED_CATALOG_FILE_NAME = "configured_catalog.json";
private static final String STATE_FILE_NAME = "state.json";
Expand Down Expand Up @@ -82,6 +97,12 @@ void setup() throws IOException {
configPath = IOs.writeFile(configDir, CONFIG_FILE_NAME, CONFIG_STRING);
configuredCatalogPath = IOs.writeFile(configDir, CONFIGURED_CATALOG_FILE_NAME, Jsons.serialize(CONFIGURED_CATALOG));
statePath = IOs.writeFile(configDir, STATE_FILE_NAME, Jsons.serialize(STATE));

final String testName = Thread.currentThread().getName();
ThreadUtils.getAllThreads()
.stream()
.filter(runningThread -> !runningThread.isDaemon())
.forEach(runningThread -> runningThread.setName(testName));
}

@Test
Expand Down Expand Up @@ -275,4 +296,76 @@ void testDestinationConsumerLifecycleFailure() throws Exception {
}
}

@Test
void testInterruptOrphanThreadFailure() {
final String testName = Thread.currentThread().getName();
final List<Exception> caughtExceptions = new ArrayList<>();
startSleepingThread(caughtExceptions, false);
assertThrows(IOException.class, () -> IntegrationRunner.watchForOrphanThreads(
() -> {
throw new IOException("random error");
},
Assertions::fail,
false,
3, TimeUnit.SECONDS,
10, TimeUnit.SECONDS));
try {
TimeUnit.SECONDS.sleep(10);
} catch (Exception e) {
throw new RuntimeException(e);
}
final List<Thread> runningThreads = ThreadUtils.getAllThreads().stream()
.filter(runningThread -> !runningThread.isDaemon() && !runningThread.getName().equals(testName))
.collect(Collectors.toList());
// all threads should be interrupted
assertEquals(List.of(), runningThreads);
assertEquals(1, caughtExceptions.size());
}

@Test
void testNoInterruptOrphanThreadFailure() {
davinchia marked this conversation as resolved.
Show resolved Hide resolved
final String testName = Thread.currentThread().getName();
final List<Exception> caughtExceptions = new ArrayList<>();
final AtomicBoolean exitCalled = new AtomicBoolean(false);
startSleepingThread(caughtExceptions, true);
assertThrows(IOException.class, () -> IntegrationRunner.watchForOrphanThreads(
() -> {
throw new IOException("random error");
},
() -> exitCalled.set(true),
false,
3, TimeUnit.SECONDS,
10, TimeUnit.SECONDS));
try {
TimeUnit.SECONDS.sleep(10);
} catch (Exception e) {
throw new RuntimeException(e);
}
final List<Thread> runningThreads = ThreadUtils.getAllThreads().stream()
.filter(runningThread -> !runningThread.isDaemon() && !runningThread.getName().equals(testName))
.collect(Collectors.toList());
// a thread that refuses to be interrupted should remain
assertEquals(1, runningThreads.size());
assertEquals(1, caughtExceptions.size());
assertTrue(exitCalled.get());
}

private void startSleepingThread(final List<Exception> caughtExceptions, final boolean ignoreInterrupt) {
final ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.submit(() -> {
for (int tries = 0; tries < 3; tries++) {
try {
TimeUnit.MINUTES.sleep(5);
} catch (Exception e) {
LOGGER.info("Caught Exception", e);
caughtExceptions.add(e);
if (!ignoreInterrupt) {
executorService.shutdownNow();
break;
}
}
}
});
}

}