Skip to content

Commit

Permalink
Postgres source: handle terminate connection exception with test. (#1…
Browse files Browse the repository at this point in the history
…9887)

* Source postgres: catch termination connection PSQLException

* Source postgres: move common code to util method

* Source postgres: clean code

* Source postgres: review fixes and added unit tests

* Source postgres: clean code

* Source postgres: bump version

* Source postgres: format

* Source postgres: format

* Source postgres: bump version

* Source postgres: fix failing test and bump version

* Source mysql: make message visible for tests

* Source postgres: reformat import

Co-authored-by: VitaliiMaltsev <39538064+VitaliiMaltsev@users.noreply.github.com>
Co-authored-by: Akash Kulkarni <113392464+akashkulk@users.noreply.github.com>
Co-authored-by: Rodi Reich Zilberman <867491+rodireich@users.noreply.github.com>
  • Loading branch information
4 people authored Dec 6, 2022
1 parent 4429968 commit 293075e
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,12 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.exceptions.ConnectionErrorException;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions.Procedure;
import io.airbyte.commons.string.Strings;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.integrations.base.errors.messages.ErrorMessage;
import io.airbyte.integrations.util.ConnectorExceptionUtil;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
Expand Down Expand Up @@ -157,11 +155,11 @@ private void runInternal(final IntegrationConfig parsed) throws Exception {
// to
// find the root exception that corresponds to a configuration error. If that does not exist, we
// just return the original exception.
final Throwable rootThrowable = getRootConfigError(e);
final String displayMessage = getDisplayMessage(rootThrowable);
final Throwable rootThrowable = ConnectorExceptionUtil.getRootConfigError(e);
final String displayMessage = ConnectorExceptionUtil.getDisplayMessage(rootThrowable);
// If the source connector throws a config error, a trace message with the relevant message should
// be surfaced.
if (isConfigError(rootThrowable)) {
if (ConnectorExceptionUtil.isConfigError(rootThrowable)) {
AirbyteTraceMessageUtility.emitConfigErrorTrace(e, displayMessage);
}
if (parsed.getCommand().equals(Command.CHECK)) {
Expand All @@ -184,37 +182,6 @@ private void runInternal(final IntegrationConfig parsed) throws Exception {
LOGGER.info("Completed integration: {}", integration.getClass().getName());
}

/**
* Returns the first instance of an exception associated with a configuration error (if it exists).
* Otherwise, the original exception is returned.
*/
private Throwable getRootConfigError(final Exception e) {
Throwable current = e;
while (current != null) {
if (isConfigError(current)) {
return current;
} else {
current = current.getCause();
}
}
return e;
}

private boolean isConfigError(final Throwable e) {
return e instanceof ConfigErrorException || e instanceof ConnectionErrorException;
}

private String getDisplayMessage(final Throwable e) {
if (e instanceof ConfigErrorException) {
return ((ConfigErrorException) e).getDisplayMessage();
} else if (e instanceof ConnectionErrorException) {
final ConnectionErrorException connEx = (ConnectionErrorException) e;
return ErrorMessage.getErrorMessage(connEx.getStateCode(), connEx.getErrorCode(), connEx.getExceptionMessage(), connEx);
} else {
return "Could not connect with provided configuration. Error: " + e.getMessage() != null ? e.getMessage() : "";
}
}

private void produceMessages(final AutoCloseableIterator<AirbyteMessage> messageIterator) throws Exception {
watchForOrphanThreads(
() -> messageIterator.forEachRemaining(outputRecordCollector),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.util;

import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.exceptions.ConnectionErrorException;
import io.airbyte.integrations.base.errors.messages.ErrorMessage;
import java.sql.SQLException;
import java.util.List;
import java.util.Locale;
import java.util.function.Predicate;

/**
* Utility class defining methods for handling configuration exceptions in connectors.
*/
public class ConnectorExceptionUtil {

public static final String COMMON_EXCEPTION_MESSAGE_TEMPLATE = "Could not connect with provided configuration. Error: %s";
static final String RECOVERY_CONNECTION_ERROR_MESSAGE =
"We're having issues syncing from a Postgres replica that is configured as a hot standby server. " +
"Please see https://docs.airbyte.com/integrations/sources/postgres/#sync-data-from-postgres-hot-standby-server for options and workarounds";
private static final List<Predicate<Throwable>> configErrorPredicates =
List.of(getConfigErrorPredicate(), getConnectionErrorPredicate(), isRecoveryConnectionExceptionPredicate());

public static boolean isConfigError(final Throwable e) {
return configErrorPredicates.stream().anyMatch(predicate -> predicate.test(e));
}

public static String getDisplayMessage(final Throwable e) {
if (e instanceof ConfigErrorException) {
return ((ConfigErrorException) e).getDisplayMessage();
} else if (e instanceof ConnectionErrorException) {
final ConnectionErrorException connEx = (ConnectionErrorException) e;
return ErrorMessage.getErrorMessage(connEx.getStateCode(), connEx.getErrorCode(), connEx.getExceptionMessage(), connEx);
} else if (isRecoveryConnectionExceptionPredicate().test(e)) {
return RECOVERY_CONNECTION_ERROR_MESSAGE;
} else {
return String.format(COMMON_EXCEPTION_MESSAGE_TEMPLATE, e.getMessage() != null ? e.getMessage() : "");
}
}

/**
* Returns the first instance of an exception associated with a configuration error (if it exists).
* Otherwise, the original exception is returned.
*/
public static Throwable getRootConfigError(final Exception e) {
Throwable current = e;
while (current != null) {
if (ConnectorExceptionUtil.isConfigError(current)) {
return current;
} else {
current = current.getCause();
}
}
return e;
}

private static Predicate<Throwable> getConfigErrorPredicate() {
return e -> e instanceof ConfigErrorException;
}

private static Predicate<Throwable> getConnectionErrorPredicate() {
return e -> e instanceof ConnectionErrorException;
}

private static Predicate<Throwable> isRecoveryConnectionExceptionPredicate() {
return e -> e instanceof SQLException && e.getMessage()
.toLowerCase(Locale.ROOT)
.contains("due to conflict with recovery");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.integrations.base;

import static io.airbyte.integrations.util.ConnectorExceptionUtil.COMMON_EXCEPTION_MESSAGE_TEMPLATE;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -269,7 +270,8 @@ void testCheckNestedException() throws Exception {
@Test
void testCheckRuntimeException() throws Exception {
final IntegrationConfig intConfig = IntegrationConfig.check(configPath);
final AirbyteConnectionStatus output = new AirbyteConnectionStatus().withStatus(Status.FAILED).withMessage("Runtime Error");
final AirbyteConnectionStatus output =
new AirbyteConnectionStatus().withStatus(Status.FAILED).withMessage(String.format(COMMON_EXCEPTION_MESSAGE_TEMPLATE, "Runtime Error"));
final RuntimeException runtimeException = new RuntimeException("Runtime Error");

when(cliParser.parse(ARGS)).thenReturn(intConfig);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.util;

import static io.airbyte.integrations.util.ConnectorExceptionUtil.COMMON_EXCEPTION_MESSAGE_TEMPLATE;
import static io.airbyte.integrations.util.ConnectorExceptionUtil.RECOVERY_CONNECTION_ERROR_MESSAGE;
import static org.junit.jupiter.api.Assertions.*;

import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.exceptions.ConnectionErrorException;
import java.sql.SQLException;
import org.junit.jupiter.api.Test;

class ConnectorExceptionUtilTest {

public static final String CONFIG_EXCEPTION_MESSAGE = "test message";
public static final String RECOVERY_EXCEPTION_MESSAGE = "FATAL: terminating connection due to conflict with recovery";
public static final String COMMON_EXCEPTION_MESSAGE = "something happens with connection";
public static final String CONNECTION_ERROR_MESSAGE_TEMPLATE = "State code: %s; Error code: %s; Message: %s";

@Test()
void isConfigErrorForConfigException() {
ConfigErrorException configErrorException = new ConfigErrorException(CONFIG_EXCEPTION_MESSAGE);
assertTrue(ConnectorExceptionUtil.isConfigError(configErrorException));

}

@Test
void isConfigErrorForConnectionException() {
ConnectionErrorException connectionErrorException = new ConnectionErrorException(CONFIG_EXCEPTION_MESSAGE);
assertTrue(ConnectorExceptionUtil.isConfigError(connectionErrorException));
}

@Test
void isConfigErrorForRecoveryPSQLException() {
SQLException recoveryPSQLException = new SQLException(RECOVERY_EXCEPTION_MESSAGE);
assertTrue(ConnectorExceptionUtil.isConfigError(recoveryPSQLException));
}

@Test
void isConfigErrorForCommonSQLException() {
SQLException recoveryPSQLException = new SQLException(COMMON_EXCEPTION_MESSAGE);
assertFalse(ConnectorExceptionUtil.isConfigError(recoveryPSQLException));
}

@Test
void isConfigErrorForCommonException() {
assertFalse(ConnectorExceptionUtil.isConfigError(new Exception()));
}

@Test
void getDisplayMessageForConfigException() {
ConfigErrorException configErrorException = new ConfigErrorException(CONFIG_EXCEPTION_MESSAGE);
String actualDisplayMessage = ConnectorExceptionUtil.getDisplayMessage(configErrorException);
assertEquals(CONFIG_EXCEPTION_MESSAGE, actualDisplayMessage);
}

@Test
void getDisplayMessageForConnectionError() {
String testCode = "test code";
int errorCode = -1;
ConnectionErrorException connectionErrorException = new ConnectionErrorException(testCode, errorCode, CONFIG_EXCEPTION_MESSAGE, new Exception());
String actualDisplayMessage = ConnectorExceptionUtil.getDisplayMessage(connectionErrorException);
assertEquals(String.format(CONNECTION_ERROR_MESSAGE_TEMPLATE, testCode, errorCode, CONFIG_EXCEPTION_MESSAGE), actualDisplayMessage);
}

@Test
void getDisplayMessageForRecoveryException() {
SQLException recoveryException = new SQLException(RECOVERY_EXCEPTION_MESSAGE);
String actualDisplayMessage = ConnectorExceptionUtil.getDisplayMessage(recoveryException);
assertEquals(RECOVERY_CONNECTION_ERROR_MESSAGE, actualDisplayMessage);
}

@Test
void getDisplayMessageForCommonException() {
Exception exception = new SQLException(COMMON_EXCEPTION_MESSAGE);
String actualDisplayMessage = ConnectorExceptionUtil.getDisplayMessage(exception);
assertEquals(String.format(COMMON_EXCEPTION_MESSAGE_TEMPLATE, COMMON_EXCEPTION_MESSAGE), actualDisplayMessage);
}

@Test
void getRootConfigErrorFromConfigException() {
ConfigErrorException configErrorException = new ConfigErrorException(CONFIG_EXCEPTION_MESSAGE);
Exception exception = new Exception(COMMON_EXCEPTION_MESSAGE, configErrorException);

Throwable actualRootConfigError = ConnectorExceptionUtil.getRootConfigError(exception);
assertEquals(configErrorException, actualRootConfigError);
}

@Test
void getRootConfigErrorFromRecoverySQLException() {
SQLException recoveryException = new SQLException(RECOVERY_EXCEPTION_MESSAGE);
RuntimeException runtimeException = new RuntimeException(COMMON_EXCEPTION_MESSAGE, recoveryException);
Exception exception = new Exception(runtimeException);

Throwable actualRootConfigError = ConnectorExceptionUtil.getRootConfigError(exception);
assertEquals(recoveryException, actualRootConfigError);
}

@Test
void getRootConfigErrorFromNonConfigException() {
SQLException configErrorException = new SQLException(CONFIG_EXCEPTION_MESSAGE);
Exception exception = new Exception(COMMON_EXCEPTION_MESSAGE, configErrorException);

Throwable actualRootConfigError = ConnectorExceptionUtil.getRootConfigError(exception);
assertEquals(exception, actualRootConfigError);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ ENV APPLICATION source-postgres-strict-encrypt
COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.30

LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
1 change: 1 addition & 0 deletions airbyte-integrations/connectors/source-postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ ENV APPLICATION source-postgres
COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.30

LABEL io.airbyte.name=airbyte/source-postgres

0 comments on commit 293075e

Please sign in to comment.