Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ignore sigterm when closing source/destination #20519

Merged
merged 11 commits into from
Jan 3, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.nio.file.Path;
import java.util.Iterator;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
Expand All @@ -39,6 +40,10 @@ public class DefaultAirbyteDestination implements AirbyteDestination {
public static final MdcScope.Builder CONTAINER_LOG_MDC_BUILDER = new Builder()
.setLogPrefix("destination")
.setPrefixColor(Color.YELLOW_BACKGROUND);
private static final Set<Integer> IGNORED_EXIT_CODES = Set.of(
0, // Normal exit
143 // SIGTERM
);

private final IntegrationLauncher integrationLauncher;
private final AirbyteStreamFactory streamFactory;
Expand Down Expand Up @@ -118,7 +123,7 @@ public void close() throws Exception {

LOGGER.debug("Closing destination process");
WorkerUtils.gentleClose(destinationProcess, 1, TimeUnit.MINUTES);
if (destinationProcess.isAlive() || getExitValue() != 0) {
if (destinationProcess.isAlive() || !IGNORED_EXIT_CODES.contains(getExitValue())) {
final String message =
destinationProcess.isAlive() ? "Destination has not terminated " : "Destination process exit with code " + getExitValue();
throw new WorkerException(message + ". This warning is normal if the job was cancelled.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.time.temporal.ChronoUnit;
import java.util.Iterator;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -38,6 +39,10 @@ public class DefaultAirbyteSource implements AirbyteSource {

private static final Duration HEARTBEAT_FRESH_DURATION = Duration.of(5, ChronoUnit.MINUTES);
private static final Duration GRACEFUL_SHUTDOWN_DURATION = Duration.of(1, ChronoUnit.MINUTES);
private static final Set<Integer> IGNORED_EXIT_CODES = Set.of(
0, // Normal exit
143 // SIGTERM
);

public static final MdcScope.Builder CONTAINER_LOG_MDC_BUILDER = new Builder()
.setLogPrefix("source")
Expand Down Expand Up @@ -139,7 +144,7 @@ public void close() throws Exception {
GRACEFUL_SHUTDOWN_DURATION.toMillis(),
TimeUnit.MILLISECONDS);

if (sourceProcess.isAlive() || getExitValue() != 0) {
if (sourceProcess.isAlive() || !IGNORED_EXIT_CODES.contains(getExitValue())) {
final String message = sourceProcess.isAlive() ? "Source has not terminated " : "Source process exit with code " + getExitValue();
throw new WorkerException(message + ". This warning is normal if the job was cancelled.");
}
Expand Down