Skip to content

Commit

Permalink
Revert "Source postgres: mark termination connection PSQLException as…
Browse files Browse the repository at this point in the history
… config error (#19623)" (#19881)

This reverts commit 92282de.
  • Loading branch information
sashaNeshcheret authored Nov 29, 2022
1 parent a05d541 commit 494349f
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1221,7 +1221,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 1.0.29
dockerImageTag: 1.0.28
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11153,7 +11153,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:1.0.29"
- dockerImage: "airbyte/source-postgres:1.0.28"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@
import io.airbyte.commons.string.Strings;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.integrations.base.errors.messages.ErrorMessage;
import io.airbyte.integrations.util.ConnectorExceptionUtil;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.validation.json.JsonSchemaValidator;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.*;
import java.util.List;
import java.util.Optional;
import java.util.Scanner;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -156,11 +157,11 @@ private void runInternal(final IntegrationConfig parsed) throws Exception {
// to
// find the root exception that corresponds to a configuration error. If that does not exist, we
// just return the original exception.
final Throwable rootThrowable = ConnectorExceptionUtil.getRootConfigError(e);
final String displayMessage = ConnectorExceptionUtil.getDisplayMessage(rootThrowable);
final Throwable rootThrowable = getRootConfigError(e);
final String displayMessage = getDisplayMessage(rootThrowable);
// If the source connector throws a config error, a trace message with the relevant message should
// be surfaced.
if (ConnectorExceptionUtil.isConfigError(rootThrowable)) {
if (isConfigError(rootThrowable)) {
AirbyteTraceMessageUtility.emitConfigErrorTrace(e, displayMessage);
}
if (parsed.getCommand().equals(Command.CHECK)) {
Expand All @@ -183,6 +184,37 @@ private void runInternal(final IntegrationConfig parsed) throws Exception {
LOGGER.info("Completed integration: {}", integration.getClass().getName());
}

/**
* Returns the first instance of an exception associated with a configuration error (if it exists).
* Otherwise, the original exception is returned.
*/
private Throwable getRootConfigError(final Exception e) {
Throwable current = e;
while (current != null) {
if (isConfigError(current)) {
return current;
} else {
current = current.getCause();
}
}
return e;
}

private boolean isConfigError(final Throwable e) {
return e instanceof ConfigErrorException || e instanceof ConnectionErrorException;
}

private String getDisplayMessage(final Throwable e) {
if (e instanceof ConfigErrorException) {
return ((ConfigErrorException) e).getDisplayMessage();
} else if (e instanceof ConnectionErrorException) {
final ConnectionErrorException connEx = (ConnectionErrorException) e;
return ErrorMessage.getErrorMessage(connEx.getStateCode(), connEx.getErrorCode(), connEx.getExceptionMessage(), connEx);
} else {
return "Could not connect with provided configuration. Error: " + e.getMessage() != null ? e.getMessage() : "";
}
}

private void produceMessages(final AutoCloseableIterator<AirbyteMessage> messageIterator) throws Exception {
watchForOrphanThreads(
() -> messageIterator.forEachRemaining(outputRecordCollector),
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.29
LABEL io.airbyte.version=1.0.28
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.29
LABEL io.airbyte.version=1.0.28
LABEL io.airbyte.name=airbyte/source-postgres
3 changes: 1 addition & 2 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,7 @@ The root causes is that the WALs needed for the incremental sync has been remove

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-------------------------------------------------------------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.0.29 | 2022-11-29 | [19623](https://github.com/airbytehq/airbyte/pull/19623) | Mark PSQLException related to using replica that is configured as a hot standby server as config error. |
| 1.0.28 | 2022-11-28 | [19514](https://github.com/airbytehq/airbyte/pull/19514) | Adjust batch selection memory limits databases. |
| 1.0.28 | 2022-11-22 | [19514](https://github.com/airbytehq/airbyte/pull/19514) | Adjust batch selection memory limits databases. |
| 1.0.27 | 2022-11-28 | [16990](https://github.com/airbytehq/airbyte/pull/16990) | Handle arrays data types |
| 1.0.26 | 2022-11-18 | [19551](https://github.com/airbytehq/airbyte/pull/19551) | Fixes bug with ssl modes |
| 1.0.25 | 2022-11-16 | [19004](https://github.com/airbytehq/airbyte/pull/19004) | Use Debezium heartbeats to improve CDC replication of large databases. |
Expand Down

0 comments on commit 494349f

Please sign in to comment.