From 50a8d03c4ca9100cb92e4d764356034b22ce9be5 Mon Sep 17 00:00:00 2001 From: Ryan Fu Date: Mon, 12 Sep 2022 16:08:11 -0700 Subject: [PATCH] Cleans and Rebase Error Message Factory PR (#16202) * Cleaned error messages factory PR * Bumped MySQL and Postgres version * Fixed messages and typos in test * Fixes the changelog conflict with per-stream state * Added note for flaky test * Bumps mysql version to match changelog * Added exception objects to all LOGGER.error for more visibility * auto-bump connector version [ci skip] Co-authored-by: Octavia Squidington III --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 2 +- .../exception/ConnectionErrorException.java | 53 ++++++++ .../airbyte/db/jdbc/DefaultJdbcDatabase.java | 10 ++ .../io/airbyte/db/mongodb/MongoDatabase.java | 7 +- .../base/errors/messages/ErrorMessage.java | 27 ++++ .../unit_tests/private_key_path.txt | 1 + .../destination/dynamodb/DynamodbChecker.java | 2 +- .../destination/dynamodb/DynamodbWriter.java | 6 +- .../DynamodbDestinationAcceptanceTest.java | 4 +- .../destination/gcs/GcsDestination.java | 16 ++- .../gcs/GcsDestinationAcceptanceTest.java | 55 ++++++++- .../jdbc/AbstractJdbcDestination.java | 35 +++++- .../jdbc/copy/CopyDestination.java | 11 ++ .../mongodb/MongodbDestination.java | 29 ++++- .../MongodbDestinationAcceptanceTest.java | 67 +++++++++- .../destination/mysql/MySQLDestination.java | 10 ++ .../mysql/MySQLDestinationAcceptanceTest.java | 62 ++++++++++ .../SslMySQLDestinationAcceptanceTest.java | 14 +++ .../connectors/destination-postgres/encrypt | 1 + .../postgres/PostgresDestinationTest.java | 81 +++++++++++- .../RedshiftStagingS3Destination.java | 9 ++ ...iftStagingS3DestinationAcceptanceTest.java | 51 +++++++- .../jdbc/test/JdbcSourceAcceptanceTest.java | 4 +- .../MongoDbSource.java | 38 +++--- .../MongoDbSourceAtlasAcceptanceTest.java | 39 ++++++ ...MongoDbSourceStandaloneAcceptanceTest.java | 42 +++++++ .../mssql/MssqlJdbcSourceAcceptanceTest.java | 59 +++++++++ .../source-mysql-strict-encrypt/Dockerfile | 2 +- .../connectors/source-mysql/Dockerfile | 2 +- .../mysql/MySqlJdbcSourceAcceptanceTest.java | 68 ++++++++++- .../OracleJdbcSourceAcceptanceTest.java | 51 ++++++++ .../source-postgres-strict-encrypt/Dockerfile | 2 +- .../connectors/source-postgres/Dockerfile | 2 +- .../PostgresJdbcSourceAcceptanceTest.java | 71 +++++++++++ .../source/relationaldb/AbstractDbSource.java | 10 ++ .../SnowflakeJdbcSourceAcceptanceTest.java | 33 ++++- docs/integrations/sources/mysql.md | 115 +++++++++--------- docs/integrations/sources/postgres.md | 1 + 39 files changed, 988 insertions(+), 106 deletions(-) create mode 100644 airbyte-db/db-lib/src/main/java/io/airbyte/db/exception/ConnectionErrorException.java create mode 100644 airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/errors/messages/ErrorMessage.java create mode 100644 airbyte-integrations/bases/base-normalization/unit_tests/private_key_path.txt create mode 100644 airbyte-integrations/connectors/destination-postgres/encrypt diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index f18117b05234..142693502738 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -655,7 +655,7 @@ - name: MySQL sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad dockerRepository: airbyte/source-mysql - dockerImageTag: 0.6.10 + dockerImageTag: 0.6.11 documentationUrl: https://docs.airbyte.io/integrations/sources/mysql icon: mysql.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index a0d2dd00caac..64cbcd074ac6 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -6480,7 +6480,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-mysql:0.6.10" +- dockerImage: "airbyte/source-mysql:0.6.11" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/mysql" connectionSpecification: diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/exception/ConnectionErrorException.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/exception/ConnectionErrorException.java new file mode 100644 index 000000000000..8687ae228e98 --- /dev/null +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/exception/ConnectionErrorException.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.db.exception; + +public class ConnectionErrorException extends RuntimeException { + + private String stateCode; + private int errorCode; + private String exceptionMessage; + + public ConnectionErrorException(final String exceptionMessage) { + super(exceptionMessage); + } + + public ConnectionErrorException(final String stateCode, final Throwable exception) { + super(exception); + this.stateCode = stateCode; + this.exceptionMessage = exception.getMessage(); + } + + public ConnectionErrorException(final String stateCode, + final String exceptionMessage, + final Throwable exception) { + super(exception); + this.stateCode = stateCode; + this.exceptionMessage = exceptionMessage; + } + + public ConnectionErrorException(final String stateCode, + final int errorCode, + final String exceptionMessage, + final Throwable exception) { + super(exception); + this.stateCode = stateCode; + this.errorCode = errorCode; + this.exceptionMessage = exceptionMessage; + } + + public String getStateCode() { + return this.stateCode; + } + + public int getErrorCode() { + return errorCode; + } + + public String getExceptionMessage() { + return exceptionMessage; + } + +} diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/DefaultJdbcDatabase.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/DefaultJdbcDatabase.java index 17ca59843741..b8376f4415cd 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/DefaultJdbcDatabase.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/DefaultJdbcDatabase.java @@ -8,12 +8,14 @@ import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.commons.functional.CheckedFunction; import io.airbyte.db.JdbcCompatibleSourceOperations; +import io.airbyte.db.exception.ConnectionErrorException; import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.List; +import java.util.Objects; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.sql.DataSource; @@ -76,6 +78,14 @@ public DatabaseMetaData getMetaData() throws SQLException { try (final Connection connection = dataSource.getConnection()) { final DatabaseMetaData metaData = connection.getMetaData(); return metaData; + } catch (final SQLException e) { + // Some databases like Redshift will have null cause + if (Objects.isNull(e.getCause()) || !(e.getCause() instanceof SQLException)) { + throw new ConnectionErrorException(e.getSQLState(), e.getErrorCode(), e.getMessage(), e); + } else { + final SQLException cause = (SQLException) e.getCause(); + throw new ConnectionErrorException(e.getSQLState(), cause.getErrorCode(), cause.getMessage(), e); + } } } diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/mongodb/MongoDatabase.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/mongodb/MongoDatabase.java index 6a0eccef589b..23482ba79c09 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/mongodb/MongoDatabase.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/mongodb/MongoDatabase.java @@ -7,6 +7,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; import com.mongodb.ConnectionString; +import com.mongodb.MongoConfigurationException; import com.mongodb.ReadConcern; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; @@ -16,6 +17,7 @@ import io.airbyte.commons.functional.CheckedFunction; import io.airbyte.commons.util.MoreIterators; import io.airbyte.db.AbstractDatabase; +import io.airbyte.db.exception.ConnectionErrorException; import java.util.Collections; import java.util.List; import java.util.Optional; @@ -47,8 +49,11 @@ public MongoDatabase(final String connectionString, final String databaseName) { this.connectionString = new ConnectionString(connectionString); mongoClient = MongoClients.create(this.connectionString); database = mongoClient.getDatabase(databaseName); + } catch (final MongoConfigurationException e) { + LOGGER.error(e.getMessage(), e); + throw new ConnectionErrorException(String.valueOf(e.getCode()), e.getMessage(), e); } catch (final Exception e) { - LOGGER.error(e.getMessage()); + LOGGER.error(e.getMessage(), e); throw new RuntimeException(e); } } diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/errors/messages/ErrorMessage.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/errors/messages/ErrorMessage.java new file mode 100644 index 000000000000..75c8f5c55eff --- /dev/null +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/errors/messages/ErrorMessage.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base.errors.messages; + +import java.util.Objects; + +public class ErrorMessage { + + // TODO: this could be built using a Builder design pattern instead of passing in 0 to indicate no + // errorCode exists + public static String getErrorMessage(final String stateCode, final int errorCode, final String message, final Exception exception) { + if (Objects.isNull(message)) { + return configMessage(stateCode, 0, exception.getMessage()); + } else { + return configMessage(stateCode, errorCode, message); + } + } + + private static String configMessage(final String stateCode, final int errorCode, final String message) { + final String stateCodePart = Objects.isNull(stateCode) ? "" : String.format("State code: %s; ", stateCode); + final String errorCodePart = errorCode == 0 ? "" : String.format("Error code: %s; ", errorCode); + return String.format("%s%sMessage: %s", stateCodePart, errorCodePart, message); + } + +} diff --git a/airbyte-integrations/bases/base-normalization/unit_tests/private_key_path.txt b/airbyte-integrations/bases/base-normalization/unit_tests/private_key_path.txt new file mode 100644 index 000000000000..8b98a34afc48 --- /dev/null +++ b/airbyte-integrations/bases/base-normalization/unit_tests/private_key_path.txt @@ -0,0 +1 @@ +AIRBYTE_PRIVATE_KEY \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbChecker.java b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbChecker.java index 7b16c9ed03df..0224476842fa 100644 --- a/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbChecker.java +++ b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbChecker.java @@ -43,7 +43,7 @@ private static void attemptWriteAndDeleteDynamodbItem(final DynamodbDestinationC .putItem( new Item().withPrimaryKey(JavaBaseConstants.COLUMN_NAME_AB_ID, UUID.randomUUID().toString(), "sync_time", System.currentTimeMillis())); } catch (final Exception e) { - LOGGER.error(e.getMessage()); + LOGGER.error(e.getMessage(), e); } table.delete(); // delete table diff --git a/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbWriter.java b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbWriter.java index 9527a1fdbbcc..6c23842e2437 100644 --- a/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbWriter.java +++ b/airbyte-integrations/connectors/destination-dynamodb/src/main/java/io/airbyte/integrations/destination/dynamodb/DynamodbWriter.java @@ -69,7 +69,7 @@ public DynamodbWriter(final DynamodbDestinationConfig config, final var table = createTableIfNotExists(amazonDynamodb, outputTableName); table.waitForActive(); } catch (final Exception e) { - LOGGER.error(e.getMessage()); + LOGGER.error(e.getMessage(), e); } this.tableWriteItems = new TableWriteItems(outputTableName); @@ -134,7 +134,7 @@ public void write(final UUID id, final AirbyteRecordMessage recordMessage) { LOGGER.warn(String.format("Unprocessed items count after retry %d times: %s", 5, Integer.toString(outcome.getUnprocessedItems().size()))); } } catch (final Exception e) { - LOGGER.error(e.getMessage()); + LOGGER.error(e.getMessage(), e); } } } @@ -156,7 +156,7 @@ public void close(final boolean hasFailed) throws IOException { } } } catch (final Exception e) { - LOGGER.error(e.getMessage()); + LOGGER.error(e.getMessage(), e); } LOGGER.info("Data writing completed for DynamoDB."); } diff --git a/airbyte-integrations/connectors/destination-dynamodb/src/test-integration/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-dynamodb/src/test-integration/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationAcceptanceTest.java index 427e18d5d21c..2c30db98ec95 100644 --- a/airbyte-integrations/connectors/destination-dynamodb/src/test-integration/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-dynamodb/src/test-integration/java/io/airbyte/integrations/destination/dynamodb/DynamodbDestinationAcceptanceTest.java @@ -79,7 +79,7 @@ protected List getAllSyncedObjects(final String streamName, final String n maxSyncTime = Math.max(maxSyncTime, ((BigDecimal) item.get("sync_time")).longValue()); } } catch (final Exception e) { - LOGGER.error(e.getMessage()); + LOGGER.error(e.getMessage(), e); } items.sort(Comparator.comparingLong(o -> ((BigDecimal) o.get(JavaBaseConstants.COLUMN_NAME_EMITTED_AT)).longValue())); @@ -155,7 +155,7 @@ protected void tearDown(final TestDestinationEnv testEnv) { LOGGER.info(String.format("Delete table %s", tableName)); } } catch (final Exception e) { - LOGGER.error(e.getMessage()); + LOGGER.error(e.getMessage(), e); } } diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsDestination.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsDestination.java index 18bd7cd8053a..7829ca7a2458 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsDestination.java +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsDestination.java @@ -4,10 +4,14 @@ package io.airbyte.integrations.destination.gcs; +import static io.airbyte.integrations.base.errors.messages.ErrorMessage.getErrorMessage; + import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.model.AmazonS3Exception; import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.integrations.BaseConnector; import io.airbyte.integrations.base.AirbyteMessageConsumer; +import io.airbyte.integrations.base.AirbyteTraceMessageUtility; import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.destination.NamingConventionTransformer; @@ -52,10 +56,16 @@ public AirbyteConnectionStatus check(final JsonNode config) { S3Destination.testMultipartUpload(s3Client, destinationConfig.getBucketName(), destinationConfig.getBucketPath()); return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); + } catch (final AmazonS3Exception e) { + LOGGER.error("Exception attempting to access the AWS bucket: {}", e.getMessage()); + final String message = getErrorMessage(e.getErrorCode(), 0, e.getMessage(), e); + AirbyteTraceMessageUtility.emitConfigErrorTrace(e, message); + return new AirbyteConnectionStatus() + .withStatus(Status.FAILED) + .withMessage(message); } catch (final Exception e) { - LOGGER.error("Exception attempting to access the Gcs bucket: {}", e.getMessage()); - LOGGER.error("Please make sure you account has all of these roles: " + EXPECTED_ROLES); - + LOGGER.error("Exception attempting to access the AWS bucket: {}. Please make sure you account has all of these roles: {}", e.getMessage(), EXPECTED_ROLES); + AirbyteTraceMessageUtility.emitConfigErrorTrace(e, e.getMessage()); return new AirbyteConnectionStatus() .withStatus(AirbyteConnectionStatus.Status.FAILED) .withMessage("Could not connect to the Gcs bucket with the provided configuration. \n" + e diff --git a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsDestinationAcceptanceTest.java index 38878e47c52b..e9d928d7cee9 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsDestinationAcceptanceTest.java @@ -5,6 +5,7 @@ package io.airbyte.integrations.destination.gcs; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion; @@ -12,6 +13,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.ImmutableMap; import io.airbyte.commons.io.IOs; import io.airbyte.commons.jackson.MoreMappers; import io.airbyte.commons.json.Jsons; @@ -23,6 +25,7 @@ import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator; import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator; +import io.airbyte.protocol.models.AirbyteConnectionStatus; import java.nio.file.Path; import java.util.Comparator; import java.util.LinkedList; @@ -112,8 +115,8 @@ protected JsonNode getFailCheckConfig() { final JsonNode baseJson = getBaseConfigJson(); final JsonNode failCheckJson = Jsons.clone(baseJson); // invalid credential - ((ObjectNode) failCheckJson).put("access_key_id", "fake-key"); - ((ObjectNode) failCheckJson).put("secret_access_key", "fake-secret"); + ((ObjectNode) failCheckJson).put("hmac_key_access_id", "fake-key"); + ((ObjectNode) failCheckJson).put("hmac_key_secret", "fake-secret"); return failCheckJson; } @@ -218,4 +221,52 @@ public void testCheckConnectionInsufficientRoles() throws Exception { assertEquals(Status.FAILED, runCheck(configJson).getStatus()); } + @Test + public void testCheckIncorrectHmacKeyAccessIdCredential() { + final JsonNode baseJson = getBaseConfigJson(); + final JsonNode credential = Jsons.jsonNode(ImmutableMap.builder() + .put("credential_type", "HMAC_KEY") + .put("hmac_key_access_id", "fake-key") + .put("hmac_key_secret", baseJson.get("credential").get("hmac_key_secret").asText()) + .build()); + + ((ObjectNode) baseJson).put("credential", credential); + ((ObjectNode) baseJson).set("format", getFormatConfig()); + + final GcsDestination destination = new GcsDestination(); + final AirbyteConnectionStatus status = destination.check(baseJson); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: SignatureDoesNotMatch;")); + } + + @Test + public void testCheckIncorrectHmacKeySecretCredential() { + final JsonNode baseJson = getBaseConfigJson(); + final JsonNode credential = Jsons.jsonNode(ImmutableMap.builder() + .put("credential_type", "HMAC_KEY") + .put("hmac_key_access_id", baseJson.get("credential").get("hmac_key_access_id").asText()) + .put("hmac_key_secret", "fake-secret") + .build()); + + ((ObjectNode) baseJson).put("credential", credential); + ((ObjectNode) baseJson).set("format", getFormatConfig()); + + final GcsDestination destination = new GcsDestination(); + final AirbyteConnectionStatus status = destination.check(baseJson); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: SignatureDoesNotMatch;")); + } + + @Test + public void testCheckIncorrectBucketCredential() { + final JsonNode baseJson = getBaseConfigJson(); + ((ObjectNode) baseJson).put("gcs_bucket_name", "fake_bucket"); + ((ObjectNode) baseJson).set("format", getFormatConfig()); + + final GcsDestination destination = new GcsDestination(); + final AirbyteConnectionStatus status = destination.check(baseJson); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: NoSuchKey;")); + } + } diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java index 7e09675ff1dc..e1adaf9c51f9 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/AbstractJdbcDestination.java @@ -4,20 +4,25 @@ package io.airbyte.integrations.destination.jdbc; +import static io.airbyte.integrations.base.errors.messages.ErrorMessage.getErrorMessage; + import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.map.MoreMaps; +import io.airbyte.db.exception.ConnectionErrorException; import io.airbyte.db.factory.DataSourceFactory; import io.airbyte.db.jdbc.DefaultJdbcDatabase; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.integrations.BaseConnector; import io.airbyte.integrations.base.AirbyteMessageConsumer; +import io.airbyte.integrations.base.AirbyteTraceMessageUtility; import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import java.sql.SQLException; import java.util.Map; import java.util.Objects; import java.util.UUID; @@ -59,6 +64,12 @@ public AirbyteConnectionStatus check(final JsonNode config) { final String outputSchema = namingResolver.getIdentifier(config.get(JdbcUtils.SCHEMA_KEY).asText()); attemptSQLCreateAndDropTableOperations(outputSchema, database, namingResolver, sqlOperations); return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); + } catch (final ConnectionErrorException ex) { + final String message = getErrorMessage(ex.getStateCode(), ex.getErrorCode(), ex.getExceptionMessage(), ex); + AirbyteTraceMessageUtility.emitConfigErrorTrace(ex, message); + return new AirbyteConnectionStatus() + .withStatus(Status.FAILED) + .withMessage(message); } catch (final Exception e) { LOGGER.error("Exception while checking connection: ", e); return new AirbyteConnectionStatus() @@ -80,10 +91,26 @@ public static void attemptSQLCreateAndDropTableOperations(final String outputSch throws Exception { // verify we have write permissions on the target schema by creating a table with a random name, // then dropping that table - final String outputTableName = namingResolver.getIdentifier("_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", "")); - sqlOps.createSchemaIfNotExists(database, outputSchema); - sqlOps.createTableIfNotExists(database, outputSchema, outputTableName); - sqlOps.dropTableIfExists(database, outputSchema, outputTableName); + try { + // Get metadata from the database to see whether connection is possible + database.bufferedResultSetQuery(conn -> conn.getMetaData().getCatalogs(), JdbcUtils.getDefaultSourceOperations()::rowToJson); + + // verify we have write permissions on the target schema by creating a table with a random name, + // then dropping that table + final String outputTableName = namingResolver.getIdentifier("_airbyte_connection_test_" + UUID.randomUUID().toString().replaceAll("-", "")); + sqlOps.createSchemaIfNotExists(database, outputSchema); + sqlOps.createTableIfNotExists(database, outputSchema, outputTableName); + sqlOps.dropTableIfExists(database, outputSchema, outputTableName); + } catch (final SQLException e) { + if (Objects.isNull(e.getCause()) || !(e.getCause() instanceof SQLException)) { + throw new ConnectionErrorException(e.getSQLState(), e.getErrorCode(), e.getMessage(), e); + } else { + final SQLException cause = (SQLException) e.getCause(); + throw new ConnectionErrorException(e.getSQLState(), cause.getErrorCode(), cause.getMessage(), e); + } + } catch (final Exception e) { + throw new Exception(e); + } } protected DataSource getDataSource(final JsonNode config) { diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyDestination.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyDestination.java index 3454d58081ae..3a2a8d94f57c 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyDestination.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyDestination.java @@ -4,10 +4,14 @@ package io.airbyte.integrations.destination.jdbc.copy; +import static io.airbyte.integrations.base.errors.messages.ErrorMessage.getErrorMessage; + import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.db.exception.ConnectionErrorException; import io.airbyte.db.factory.DataSourceFactory; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.BaseConnector; +import io.airbyte.integrations.base.AirbyteTraceMessageUtility; import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.destination.ExtendedNameTransformer; import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination; @@ -67,6 +71,13 @@ public AirbyteConnectionStatus check(final JsonNode config) { AbstractJdbcDestination.attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, getSqlOperations()); return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED); + } catch (final ConnectionErrorException ex) { + LOGGER.info("Exception while checking connection: ", ex); + final String message = getErrorMessage(ex.getStateCode(), ex.getErrorCode(), ex.getExceptionMessage(), ex); + AirbyteTraceMessageUtility.emitConfigErrorTrace(ex, message); + return new AirbyteConnectionStatus() + .withStatus(AirbyteConnectionStatus.Status.FAILED) + .withMessage(message); } catch (final Exception e) { LOGGER.error("Exception attempting to connect to the warehouse: ", e); return new AirbyteConnectionStatus() diff --git a/airbyte-integrations/connectors/destination-mongodb/src/main/java/io/airbyte/integrations/destination/mongodb/MongodbDestination.java b/airbyte-integrations/connectors/destination-mongodb/src/main/java/io/airbyte/integrations/destination/mongodb/MongodbDestination.java index 202cafc99c11..21ee4c5c0ed0 100644 --- a/airbyte-integrations/connectors/destination-mongodb/src/main/java/io/airbyte/integrations/destination/mongodb/MongodbDestination.java +++ b/airbyte-integrations/connectors/destination-mongodb/src/main/java/io/airbyte/integrations/destination/mongodb/MongodbDestination.java @@ -5,18 +5,24 @@ package io.airbyte.integrations.destination.mongodb; import static com.mongodb.client.model.Projections.excludeId; +import static io.airbyte.integrations.base.errors.messages.ErrorMessage.getErrorMessage; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; import com.mongodb.client.MongoCollection; +import com.mongodb.MongoCommandException; +import com.mongodb.MongoException; +import com.mongodb.MongoSecurityException; import com.mongodb.client.MongoCursor; import io.airbyte.commons.util.MoreIterators; +import io.airbyte.db.exception.ConnectionErrorException; import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.db.mongodb.MongoDatabase; import io.airbyte.db.mongodb.MongoUtils.MongoInstanceType; import io.airbyte.integrations.BaseConnector; import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.integrations.base.AirbyteTraceMessageUtility; import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.destination.mongodb.exception.MongodbDatabaseException; @@ -69,13 +75,19 @@ public static void main(final String[] args) throws Exception { @Override public AirbyteConnectionStatus check(final JsonNode config) { try { - final var database = getDatabase(config); + final MongoDatabase database = getDatabase(config); final var databaseName = config.get(JdbcUtils.DATABASE_KEY).asText(); - final Set databaseNames = MoreIterators.toSet(database.getDatabaseNames().iterator()); + final Set databaseNames = getDatabaseNames(database); if (!databaseNames.contains(databaseName) && !databaseName.equals(database.getName())) { throw new MongodbDatabaseException(databaseName); } return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED); + } catch (final ConnectionErrorException e) { + final String message = getErrorMessage(e.getStateCode(), e.getErrorCode(), e.getExceptionMessage(), e); + AirbyteTraceMessageUtility.emitConfigErrorTrace(e, message); + return new AirbyteConnectionStatus() + .withStatus(AirbyteConnectionStatus.Status.FAILED) + .withMessage(message); } catch (final RuntimeException e) { LOGGER.error("Check failed.", e); return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.FAILED) @@ -83,11 +95,22 @@ public AirbyteConnectionStatus check(final JsonNode config) { } } + private Set getDatabaseNames(final MongoDatabase mongoDatabase) { + try { + return MoreIterators.toSet(mongoDatabase.getDatabaseNames().iterator()); + } catch (final MongoSecurityException e) { + final MongoCommandException exception = (MongoCommandException) e.getCause(); + throw new ConnectionErrorException(String.valueOf(exception.getCode()), e); + } catch (final MongoException e) { + throw new ConnectionErrorException(String.valueOf(e.getCode()), e); + } + } + @Override public AirbyteMessageConsumer getConsumer(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final Consumer outputRecordCollector) { - final var database = getDatabase(config); + final MongoDatabase database = getDatabase(config); final Map writeConfigs = new HashMap<>(); for (final ConfiguredAirbyteStream configStream : catalog.getStreams()) { diff --git a/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/MongodbDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/MongodbDestinationAcceptanceTest.java index ee7bd582e5f8..e5642a090ed8 100644 --- a/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/MongodbDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-mongodb/src/test-integration/java/io/airbyte/integrations/destination/mongodb/MongodbDestinationAcceptanceTest.java @@ -5,8 +5,11 @@ package io.airbyte.integrations.destination.mongodb; import static com.mongodb.client.model.Projections.excludeId; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; import com.mongodb.client.MongoCursor; import io.airbyte.commons.json.Jsons; @@ -15,9 +18,11 @@ import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest; import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator; import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator; +import io.airbyte.protocol.models.AirbyteConnectionStatus; import java.util.ArrayList; import java.util.List; import org.bson.Document; +import org.junit.jupiter.api.Test; import org.testcontainers.containers.MongoDBContainer; public class MongodbDestinationAcceptanceTest extends DestinationAcceptanceTest { @@ -85,7 +90,7 @@ protected List retrieveRecords(final TestDestinationEnv testEnv, final String streamName, final String namespace, final JsonNode streamSchema) { - final var database = getMongoDatabase(container.getHost(), + final MongoDatabase database = getMongoDatabase(container.getHost(), container.getFirstMappedPort(), DATABASE_NAME); final var collection = database.getOrCreateNewCollection(namingResolver.getRawTableName(streamName)); final List result = new ArrayList<>(); @@ -97,6 +102,64 @@ protected List retrieveRecords(final TestDestinationEnv testEnv, return result; } + /** + * For each of the state codes reference MongoDb's base error code yaml + *

+ * https://github.com/mongodb/mongo/blob/master/src/mongo/base/error_codes.yml + *

+ */ + @Test + void testCheckIncorrectPasswordFailure() { + final JsonNode invalidConfig = getFailCheckConfig(); + ((ObjectNode) invalidConfig).put(JdbcUtils.DATABASE_KEY, DATABASE_NAME); + ((ObjectNode) invalidConfig.get(AUTH_TYPE)).put(JdbcUtils.PASSWORD_KEY, "fake"); + final MongodbDestination destination = new MongodbDestination(); + final AirbyteConnectionStatus status = destination.check(invalidConfig); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 18")); + } + + @Test + public void testCheckIncorrectUsernameFailure() { + final JsonNode invalidConfig = getFailCheckConfig(); + ((ObjectNode) invalidConfig).put(JdbcUtils.DATABASE_KEY, DATABASE_NAME); + ((ObjectNode) invalidConfig.get(AUTH_TYPE)).put(JdbcUtils.USERNAME_KEY, "fakeusername"); + final MongodbDestination destination = new MongodbDestination(); + final AirbyteConnectionStatus status = destination.check(invalidConfig); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 18")); + } + + @Test + public void testCheckIncorrectDataBaseFailure() { + final JsonNode invalidConfig = getFailCheckConfig(); + ((ObjectNode) invalidConfig).put(JdbcUtils.DATABASE_KEY, DATABASE_FAIL_NAME); + final MongodbDestination destination = new MongodbDestination(); + final AirbyteConnectionStatus status = destination.check(invalidConfig); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 18")); + } + + @Test + public void testCheckIncorrectHost() { + final JsonNode invalidConfig = getConfig(); + ((ObjectNode) invalidConfig).put(JdbcUtils.HOST_KEY, "localhost2"); + final MongodbDestination destination = new MongodbDestination(); + final AirbyteConnectionStatus status = destination.check(invalidConfig); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: -3")); + } + + @Test + public void testCheckIncorrectPort() { + final JsonNode invalidConfig = getConfig(); + ((ObjectNode) invalidConfig).put(JdbcUtils.PORT_KEY, 1234); + final MongodbDestination destination = new MongodbDestination(); + final AirbyteConnectionStatus status = destination.check(invalidConfig); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: -3")); + } + @Override protected void setup(final TestDestinationEnv testEnv) { container = new MongoDBContainer(DOCKER_IMAGE_NAME); @@ -119,7 +182,7 @@ private JsonNode getAuthTypeConfig() { private MongoDatabase getMongoDatabase(final String host, final int port, final String databaseName) { try { - final var connectionString = String.format("mongodb://%s:%s/", host, port); + final String connectionString = String.format("mongodb://%s:%s/", host, port); return new MongoDatabase(connectionString, databaseName); } catch (final RuntimeException e) { throw new RuntimeException(e); diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java index 3bacd6738112..19009ba7e494 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java +++ b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java @@ -4,14 +4,18 @@ package io.airbyte.integrations.destination.mysql; +import static io.airbyte.integrations.base.errors.messages.ErrorMessage.getErrorMessage; + import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.map.MoreMaps; +import io.airbyte.db.exception.ConnectionErrorException; import io.airbyte.db.factory.DataSourceFactory; import io.airbyte.db.factory.DatabaseDriver; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.db.jdbc.JdbcUtils; +import io.airbyte.integrations.base.AirbyteTraceMessageUtility; import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.ssh.SshWrappedDestination; @@ -69,6 +73,12 @@ public AirbyteConnectionStatus check(final JsonNode config) { } return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); + } catch (final ConnectionErrorException e) { + final String message = getErrorMessage(e.getStateCode(), e.getErrorCode(), e.getExceptionMessage(), e); + AirbyteTraceMessageUtility.emitConfigErrorTrace(e, message); + return new AirbyteConnectionStatus() + .withStatus(Status.FAILED) + .withMessage(message); } catch (final Exception e) { LOGGER.error("Exception while checking connection: ", e); return new AirbyteConnectionStatus() diff --git a/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySQLDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySQLDestinationAcceptanceTest.java index 21558c120fe8..b720ea006c3c 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySQLDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySQLDestinationAcceptanceTest.java @@ -5,8 +5,10 @@ package io.airbyte.integrations.destination.mysql; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import io.airbyte.commons.json.Jsons; @@ -19,6 +21,7 @@ import io.airbyte.integrations.standardtest.destination.JdbcDestinationAcceptanceTest; import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator; import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.AirbyteRecordMessage; @@ -36,6 +39,9 @@ public class MySQLDestinationAcceptanceTest extends JdbcDestinationAcceptanceTest { + protected static final String USERNAME_WITHOUT_PERMISSION = "new_user"; + protected static final String PASSWORD_WITHOUT_PERMISSION = "new_password"; + private MySQLContainer db; private final ExtendedNameTransformer namingResolver = new MySQLNameTransformer(); @@ -261,4 +267,60 @@ protected void assertSameValue(final JsonNode expectedValue, final JsonNode actu } } + @Test + void testCheckIncorrectPasswordFailure() { + final JsonNode config = ((ObjectNode) getConfig()).put(JdbcUtils.PASSWORD_KEY, "fake"); + final MySQLDestination destination = new MySQLDestination(); + final AirbyteConnectionStatus status = destination.check(config); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 28000; Error code: 1045;")); + } + + @Test + public void testCheckIncorrectUsernameFailure() { + final JsonNode config = ((ObjectNode) getConfig()).put(JdbcUtils.USERNAME_KEY, "fake"); + final MySQLDestination destination = new MySQLDestination(); + final AirbyteConnectionStatus status = destination.check(config); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 28000; Error code: 1045;")); + } + + @Test + public void testCheckIncorrectHostFailure() { + final JsonNode config = ((ObjectNode) getConfig()).put(JdbcUtils.HOST_KEY, "localhost2"); + final MySQLDestination destination = new MySQLDestination(); + final AirbyteConnectionStatus status = destination.check(config); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 08S01;")); + } + + @Test + public void testCheckIncorrectPortFailure() { + final JsonNode config = ((ObjectNode) getConfig()).put(JdbcUtils.PORT_KEY, "0000"); + final MySQLDestination destination = new MySQLDestination(); + final AirbyteConnectionStatus status = destination.check(config); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 08S01;")); + } + + @Test + public void testCheckIncorrectDataBaseFailure() { + final JsonNode config = ((ObjectNode) getConfig()).put(JdbcUtils.DATABASE_KEY, "wrongdatabase"); + final MySQLDestination destination = new MySQLDestination(); + final AirbyteConnectionStatus status = destination.check(config); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 42000; Error code: 1049;")); + } + + @Test + public void testUserHasNoPermissionToDataBase() { + executeQuery("create user '" + USERNAME_WITHOUT_PERMISSION + "'@'%' IDENTIFIED BY '" + PASSWORD_WITHOUT_PERMISSION + "';\n"); + final JsonNode config = ((ObjectNode) getConfig()).put(JdbcUtils.USERNAME_KEY, USERNAME_WITHOUT_PERMISSION); + ((ObjectNode) config).put(JdbcUtils.PASSWORD_KEY, PASSWORD_WITHOUT_PERMISSION); + final MySQLDestination destination = new MySQLDestination(); + final AirbyteConnectionStatus status = destination.check(config); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 42000; Error code: 1044;")); + } + } diff --git a/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/SslMySQLDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/SslMySQLDestinationAcceptanceTest.java index 2efccebac845..90f4dc32bb6a 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/SslMySQLDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-mysql/src/test-integration/java/io/airbyte/integrations/destination/mysql/SslMySQLDestinationAcceptanceTest.java @@ -4,7 +4,10 @@ package io.airbyte.integrations.destination.mysql; +import static org.junit.jupiter.api.Assertions.assertEquals; + import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; import io.airbyte.db.Database; @@ -12,6 +15,7 @@ import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.integrations.base.JavaBaseConstants; import io.airbyte.integrations.destination.ExtendedNameTransformer; +import io.airbyte.protocol.models.AirbyteConnectionStatus; import java.sql.SQLException; import java.util.List; import java.util.stream.Collectors; @@ -146,4 +150,14 @@ private void executeQuery(final String query) { } } + @Test + public void testUserHasNoPermissionToDataBase() { + executeQuery("create user '" + USERNAME_WITHOUT_PERMISSION + "'@'%' IDENTIFIED BY '" + PASSWORD_WITHOUT_PERMISSION + "';\n"); + final JsonNode config = ((ObjectNode) getConfig()).put(JdbcUtils.USERNAME_KEY, USERNAME_WITHOUT_PERMISSION); + ((ObjectNode) config).put("password", PASSWORD_WITHOUT_PERMISSION); + final MySQLDestination destination = new MySQLDestination(); + final AirbyteConnectionStatus status = destination.check(config); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + } + } diff --git a/airbyte-integrations/connectors/destination-postgres/encrypt b/airbyte-integrations/connectors/destination-postgres/encrypt new file mode 100644 index 000000000000..3dc91e6dcc27 --- /dev/null +++ b/airbyte-integrations/connectors/destination-postgres/encrypt @@ -0,0 +1 @@ +CRwphdRpRk \ No newline at end of file diff --git a/airbyte-integrations/connectors/destination-postgres/src/test/java/io/airbyte/integrations/destination/postgres/PostgresDestinationTest.java b/airbyte-integrations/connectors/destination-postgres/src/test/java/io/airbyte/integrations/destination/postgres/PostgresDestinationTest.java index ad718969059a..a68cbe21b3b8 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/test/java/io/airbyte/integrations/destination/postgres/PostgresDestinationTest.java +++ b/airbyte-integrations/connectors/destination-postgres/src/test/java/io/airbyte/integrations/destination/postgres/PostgresDestinationTest.java @@ -5,14 +5,17 @@ package io.airbyte.integrations.destination.postgres; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.integrations.base.AirbyteMessageConsumer; import io.airbyte.integrations.base.Destination; +import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.AirbyteRecordMessage; @@ -29,6 +32,7 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -38,6 +42,10 @@ public class PostgresDestinationTest { private static PostgreSQLContainer PSQL_DB; + private static final String USERNAME = "new_user"; + private static final String DATABASE = "new_db"; + private static final String PASSWORD = "new_password"; + private static final String SCHEMA_NAME = "public"; private static final String STREAM_NAME = "id_and_name"; @@ -61,7 +69,7 @@ private JsonNode buildConfigNoJdbcParameters() { JdbcUtils.PORT_KEY, 1337, JdbcUtils.USERNAME_KEY, "user", JdbcUtils.DATABASE_KEY, "db", - "ssl", true, + JdbcUtils.SSL_KEY, true, "ssl_mode", ImmutableMap.of("mode", "require"))); } @@ -147,6 +155,77 @@ void testDefaultParamsWithSSL() { assertEquals(SSL_JDBC_PARAMETERS, defaultProperties); } + @Test + void testCheckIncorrectPasswordFailure() { + final var config = buildConfigNoJdbcParameters(); + ((ObjectNode) config).put(JdbcUtils.PASSWORD_KEY, "fake"); + ((ObjectNode) config).put(JdbcUtils.SCHEMA_KEY, "public"); + final PostgresDestination destination = new PostgresDestination(); + final var actual = destination.check(config); + assertTrue(actual.getMessage().contains("State code: 08001;")); + } + + @Test + public void testCheckIncorrectUsernameFailure() { + final var config = buildConfigNoJdbcParameters(); + ((ObjectNode) config).put(JdbcUtils.USERNAME_KEY, ""); + ((ObjectNode) config).put(JdbcUtils.SCHEMA_KEY, "public"); + final PostgresDestination destination = new PostgresDestination(); + final AirbyteConnectionStatus status = destination.check(config); + assertTrue(status.getMessage().contains("State code: 08001;")); + } + + @Test + public void testCheckIncorrectHostFailure() { + final var config = buildConfigNoJdbcParameters(); + ((ObjectNode) config).put(JdbcUtils.HOST_KEY, "localhost2"); + ((ObjectNode) config).put(JdbcUtils.SCHEMA_KEY, "public"); + final PostgresDestination destination = new PostgresDestination(); + final AirbyteConnectionStatus status = destination.check(config); + assertTrue(status.getMessage().contains("State code: 08001;")); + } + + @Test + public void testCheckIncorrectPortFailure() { + final var config = buildConfigNoJdbcParameters(); + ((ObjectNode) config).put(JdbcUtils.PORT_KEY, "30000"); + ((ObjectNode) config).put(JdbcUtils.SCHEMA_KEY, "public"); + final PostgresDestination destination = new PostgresDestination(); + final AirbyteConnectionStatus status = destination.check(config); + assertTrue(status.getMessage().contains("State code: 08001;")); + } + + @Test + public void testCheckIncorrectDataBaseFailure() { + final var config = buildConfigNoJdbcParameters(); + ((ObjectNode) config).put(JdbcUtils.DATABASE_KEY, "wrongdatabase"); + ((ObjectNode) config).put(JdbcUtils.SCHEMA_KEY, "public"); + final PostgresDestination destination = new PostgresDestination(); + final AirbyteConnectionStatus status = destination.check(config); + assertTrue(status.getMessage().contains("State code: 08001;")); + } + + @Test + public void testUserHasNoPermissionToDataBase() throws Exception { + final JdbcDatabase database = PostgreSQLContainerHelper.getJdbcDatabaseFromConfig(PostgreSQLContainerHelper.getDataSourceFromConfig(config)); + + database.execute(connection -> connection.createStatement() + .execute(String.format("create user %s with password '%s';", USERNAME, PASSWORD))); + database.execute(connection -> connection.createStatement() + .execute(String.format("create database %s;", DATABASE))); + // deny access for database for all users from group public + database.execute(connection -> connection.createStatement() + .execute(String.format("revoke all on database %s from public;", DATABASE))); + + ((ObjectNode) config).put(JdbcUtils.USERNAME_KEY, USERNAME); + ((ObjectNode) config).put(JdbcUtils.PASSWORD_KEY, PASSWORD); + ((ObjectNode) config).put(JdbcUtils.DATABASE_KEY, DATABASE); + + final Destination destination = new PostgresDestination(); + final AirbyteConnectionStatus status = destination.check(config); + Assertions.assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + } + // This test is a bit redundant with PostgresIntegrationTest. It makes it easy to run the // destination in the same process as the test allowing us to put breakpoint in, which is handy for // debugging (especially since we use postgres as a guinea pig for most features). diff --git a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java index eb38a6515717..d5938004f4b2 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java +++ b/airbyte-integrations/connectors/destination-redshift/src/main/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3Destination.java @@ -4,6 +4,7 @@ package io.airbyte.integrations.destination.redshift; +import static io.airbyte.integrations.base.errors.messages.ErrorMessage.getErrorMessage; import static io.airbyte.integrations.destination.redshift.RedshiftInsertDestination.SSL_JDBC_PARAMETERS; import static io.airbyte.integrations.destination.redshift.RedshiftInsertDestination.getJdbcConfig; import static io.airbyte.integrations.destination.redshift.util.RedshiftUtil.findS3Options; @@ -11,11 +12,13 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.json.Jsons; +import io.airbyte.db.exception.ConnectionErrorException; import io.airbyte.db.factory.DataSourceFactory; import io.airbyte.db.jdbc.DefaultJdbcDatabase; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.integrations.base.AirbyteMessageConsumer; +import io.airbyte.integrations.base.AirbyteTraceMessageUtility; import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.destination.NamingConventionTransformer; import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination; @@ -76,6 +79,12 @@ public AirbyteConnectionStatus check(final JsonNode config) { final String outputSchema = super.getNamingResolver().getIdentifier(config.get("schema").asText()); attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, redshiftS3StagingSqlOperations); return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED); + } catch (final ConnectionErrorException e) { + final String message = getErrorMessage(e.getStateCode(), e.getErrorCode(), e.getExceptionMessage(), e); + AirbyteTraceMessageUtility.emitConfigErrorTrace(e, message); + return new AirbyteConnectionStatus() + .withStatus(AirbyteConnectionStatus.Status.FAILED) + .withMessage(message); } catch (final Exception e) { LOGGER.error("Exception while checking connection: ", e); return new AirbyteConnectionStatus() diff --git a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3DestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3DestinationAcceptanceTest.java index 4e66dbb3f518..779fbede3c08 100644 --- a/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3DestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-redshift/src/test-integration/java/io/airbyte/integrations/destination/redshift/RedshiftStagingS3DestinationAcceptanceTest.java @@ -4,6 +4,9 @@ package io.airbyte.integrations.destination.redshift; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -18,11 +21,13 @@ import io.airbyte.integrations.destination.redshift.operations.RedshiftSqlOperations; import io.airbyte.integrations.standardtest.destination.JdbcDestinationAcceptanceTest; import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator; +import io.airbyte.protocol.models.AirbyteConnectionStatus; import java.io.IOException; import java.nio.file.Path; import java.sql.SQLException; import java.util.List; import java.util.stream.Collectors; +import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,6 +44,7 @@ public class RedshiftStagingS3DestinationAcceptanceTest extends JdbcDestinationA // config which refers to the schema that the test is being run in. protected JsonNode config; private final RedshiftSQLNameTransformer namingResolver = new RedshiftSQLNameTransformer(); + private final String USER_WITHOUT_CREDS = Strings.addRandomSuffix("test_user", "_", 5); protected TestDestinationEnv testDestinationEnv; @@ -65,6 +71,46 @@ protected JsonNode getFailCheckConfig() { return invalidConfig; } + @Test + void testCheckIncorrectPasswordFailure() throws Exception { + final JsonNode invalidConfig = Jsons.clone(config); + ((ObjectNode) invalidConfig).put("password", "fake"); + final RedshiftDestination destination = new RedshiftDestination(); + final AirbyteConnectionStatus status = destination.check(invalidConfig); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 28000; Error code: 500310;")); + } + + @Test + public void testCheckIncorrectUsernameFailure() throws Exception { + final JsonNode invalidConfig = Jsons.clone(config); + ((ObjectNode) invalidConfig).put("username", ""); + final RedshiftDestination destination = new RedshiftDestination(); + final AirbyteConnectionStatus status = destination.check(invalidConfig); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 28000; Error code: 500310;")); + } + + @Test + public void testCheckIncorrectHostFailure() throws Exception { + final JsonNode invalidConfig = Jsons.clone(config); + ((ObjectNode) invalidConfig).put("host", "localhost2"); + final RedshiftDestination destination = new RedshiftDestination(); + final AirbyteConnectionStatus status = destination.check(invalidConfig); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: HY000; Error code: 500150;")); + } + + @Test + public void testCheckIncorrectDataBaseFailure() throws Exception { + final JsonNode invalidConfig = Jsons.clone(config); + ((ObjectNode) invalidConfig).put("database", "wrongdatabase"); + final RedshiftDestination destination = new RedshiftDestination(); + final AirbyteConnectionStatus status = destination.check(invalidConfig); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 3D000; Error code: 500310;")); + } + @Override protected TestDataComparator getTestDataComparator() { return new RedshiftTestDataComparator(); @@ -139,6 +185,9 @@ protected void setup(final TestDestinationEnv testEnv) throws Exception { final String createSchemaQuery = String.format("CREATE SCHEMA %s", schemaName); baseConfig = getStaticConfig(); getDatabase().query(ctx -> ctx.execute(createSchemaQuery)); + final String createUser = String.format("create user %s with password '%s';", + USER_WITHOUT_CREDS, baseConfig.get("password").asText()); + getDatabase().query(ctx -> ctx.execute(createUser)); final JsonNode configForSchema = Jsons.clone(baseConfig); ((ObjectNode) configForSchema).put("schema", schemaName); config = configForSchema; @@ -148,7 +197,7 @@ protected void setup(final TestDestinationEnv testEnv) throws Exception { @Override protected void tearDown(final TestDestinationEnv testEnv) throws Exception { final String dropSchemaQuery = String.format("DROP SCHEMA IF EXISTS %s CASCADE", config.get("schema").asText()); - getDatabase().query(ctx -> ctx.execute(dropSchemaQuery)); + getDatabase().query(ctx -> ctx.execute(String.format("drop user if exists %s;", USER_WITHOUT_CREDS))); } protected Database getDatabase() { diff --git a/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java index f27619b15f32..fb4afdf0678c 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java @@ -333,7 +333,7 @@ protected void testDiscoverWithNonCursorFields() throws Exception { getFullyQualifiedTableName(TABLE_NAME_WITHOUT_CURSOR_TYPE))); }); final AirbyteCatalog actual = filterOutOtherSchemas(source.discover(config)); - AirbyteStream stream = + final AirbyteStream stream = actual.getStreams().stream().filter(s -> s.getName().equalsIgnoreCase(TABLE_NAME_WITHOUT_CURSOR_TYPE)).findFirst().orElse(null); assertNotNull(stream); assertEquals(TABLE_NAME_WITHOUT_CURSOR_TYPE.toLowerCase(), stream.getName().toLowerCase()); @@ -351,7 +351,7 @@ protected void testDiscoverWithNullableCursorFields() throws Exception { getFullyQualifiedTableName(TABLE_NAME_WITH_NULLABLE_CURSOR_TYPE))); }); final AirbyteCatalog actual = filterOutOtherSchemas(source.discover(config)); - AirbyteStream stream = + final AirbyteStream stream = actual.getStreams().stream().filter(s -> s.getName().equalsIgnoreCase(TABLE_NAME_WITH_NULLABLE_CURSOR_TYPE)).findFirst().orElse(null); assertNotNull(stream); assertEquals(TABLE_NAME_WITH_NULLABLE_CURSOR_TYPE.toLowerCase(), stream.getName().toLowerCase()); diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSource.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSource.java index e8bbaeb9f93b..be0c0b5634e6 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSource.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSource.java @@ -17,11 +17,15 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; +import com.mongodb.MongoCommandException; +import com.mongodb.MongoException; +import com.mongodb.MongoSecurityException; import com.mongodb.client.MongoCollection; import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.commons.util.AutoCloseableIterators; +import io.airbyte.db.exception.ConnectionErrorException; import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.db.mongodb.MongoDatabase; import io.airbyte.db.mongodb.MongoUtils; @@ -92,12 +96,11 @@ protected MongoDatabase createDatabase(final JsonNode config) throws Exception { } @Override - public List> getCheckOperations(final JsonNode config) - throws Exception { + public List> getCheckOperations(final JsonNode config) { final List> checkList = new ArrayList<>(); checkList.add(database -> { if (getAuthorizedCollections(database).isEmpty()) { - throw new Exception("Unable to execute any operation on the source!"); + throw new ConnectionErrorException("Unable to execute any operation on the source!"); } else { LOGGER.info("The source passed the basic operation test!"); } @@ -145,17 +148,24 @@ private Set getAuthorizedCollections(final MongoDatabase database) { * find or any other action, on the database resource, the command lists all collections in the * database. */ - final Document document = database.getDatabase().runCommand(new Document("listCollections", 1) - .append("authorizedCollections", true) - .append("nameOnly", true)) - .append("filter", "{ 'type': 'collection' }"); - return document.toBsonDocument() - .get("cursor").asDocument() - .getArray("firstBatch") - .stream() - .map(bsonValue -> bsonValue.asDocument().getString("name").getValue()) - .collect(Collectors.toSet()); + try { + final Document document = database.getDatabase().runCommand(new Document("listCollections", 1) + .append("authorizedCollections", true) + .append("nameOnly", true)) + .append("filter", "{ 'type': 'collection' }"); + return document.toBsonDocument() + .get("cursor").asDocument() + .getArray("firstBatch") + .stream() + .map(bsonValue -> bsonValue.asDocument().getString("name").getValue()) + .collect(Collectors.toSet()); + } catch (final MongoSecurityException e) { + final MongoCommandException exception = (MongoCommandException) e.getCause(); + throw new ConnectionErrorException(String.valueOf(exception.getCode()), e); + } catch (final MongoException e) { + throw new ConnectionErrorException(String.valueOf(e.getCode()), e); + } } @Override @@ -199,7 +209,7 @@ public AutoCloseableIterator queryTableIncremental(final MongoDatabase } @Override - public boolean isCursorType(BsonType bsonType) { + public boolean isCursorType(final BsonType bsonType) { // while reading from mongo primary key "id" is always added, so there will be no situation // when we have no cursor field here, at least id could be used as cursor here. // This logic will be used feather when we will implement part which will show only list of possible diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAtlasAcceptanceTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAtlasAcceptanceTest.java index 329a7231ff06..03f5b6dd1cea 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAtlasAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceAtlasAcceptanceTest.java @@ -6,15 +6,19 @@ import static io.airbyte.db.mongodb.MongoUtils.MongoInstanceType.ATLAS; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; import com.mongodb.client.MongoCollection; import io.airbyte.commons.json.Jsons; import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.db.mongodb.MongoDatabase; +import io.airbyte.integrations.source.mongodb.MongoDbSource; import io.airbyte.integrations.standardtest.source.TestDestinationEnv; import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.Field; @@ -25,6 +29,7 @@ import org.bson.BsonArray; import org.bson.BsonString; import org.bson.Document; +import org.junit.jupiter.api.Test; public class MongoDbSourceAtlasAcceptanceTest extends MongoDbSourceAbstractAcceptanceTest { @@ -111,4 +116,38 @@ protected void verifyCatalog(final AirbyteCatalog catalog) { assertEquals(CatalogHelpers.fieldsToJsonSchema(FIELDS), actualStream.getJsonSchema()); } + @Test + public void testCheckIncorrectUsername() throws Exception { + ((ObjectNode) config).put("user", "fake"); + final AirbyteConnectionStatus status = new MongoDbSource().check(config); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 18")); + } + + @Test + public void testCheckIncorrectPassword() throws Exception { + ((ObjectNode) config).put("password", "fake"); + final AirbyteConnectionStatus status = new MongoDbSource().check(config); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 18")); + } + + @Test + public void testCheckIncorrectCluster() throws Exception { + ((ObjectNode) config).with("instance_type") + .put("cluster_url", "cluster0.iqgf8.mongodb.netfail"); + final AirbyteConnectionStatus status = new MongoDbSource().check(config); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: -4")); + } + + @Test + public void testCheckIncorrectAccessToDataBase() throws Exception { + ((ObjectNode) config).put("user", "test_user_without_access") + .put("password", "test12321"); + final AirbyteConnectionStatus status = new MongoDbSource().check(config); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 13")); + } + } diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceStandaloneAcceptanceTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceStandaloneAcceptanceTest.java index 546d306293be..9213dbaf744a 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceStandaloneAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/MongoDbSourceStandaloneAcceptanceTest.java @@ -6,6 +6,7 @@ import static io.airbyte.db.mongodb.MongoUtils.MongoInstanceType.STANDALONE; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; @@ -13,9 +14,11 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.db.mongodb.MongoDatabase; +import io.airbyte.integrations.source.mongodb.MongoDbSource; import io.airbyte.integrations.standardtest.source.TestDestinationEnv; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.JsonSchemaType; @@ -23,6 +26,7 @@ import org.bson.BsonArray; import org.bson.BsonString; import org.bson.Document; +import org.junit.jupiter.api.Test; import org.testcontainers.containers.MongoDBContainer; import org.testcontainers.utility.DockerImageName; @@ -103,4 +107,42 @@ protected void verifyCatalog(final AirbyteCatalog catalog) { assertEquals(CatalogHelpers.fieldsToJsonSchema(FIELDS), actualStream.getJsonSchema()); } + @Test + public void testCheckIncorrectHost() throws Exception { + final JsonNode instanceConfig = Jsons.jsonNode(ImmutableMap.builder() + .put("instance", STANDALONE.getType()) + .put("host", "localhost2") + .put("port", mongoDBContainer.getFirstMappedPort()) + .put("tls", false) + .build()); + + final JsonNode conf = Jsons.jsonNode(ImmutableMap.builder() + .put("instance_type", instanceConfig) + .put("database", DATABASE_NAME) + .put("auth_source", "admin") + .build()); + final AirbyteConnectionStatus status = new MongoDbSource().check(conf); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: -3")); + } + + @Test + public void testCheckIncorrectPort() throws Exception { + final JsonNode instanceConfig = Jsons.jsonNode(ImmutableMap.builder() + .put("instance", STANDALONE.getType()) + .put("host", mongoDBContainer.getHost()) + .put("port", 1234) + .put("tls", false) + .build()); + + final JsonNode conf = Jsons.jsonNode(ImmutableMap.builder() + .put("instance_type", instanceConfig) + .put("database", DATABASE_NAME) + .put("auth_source", "admin") + .build()); + final AirbyteConnectionStatus status = new MongoDbSource().check(conf); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: -3")); + } + } diff --git a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlJdbcSourceAcceptanceTest.java index 0b8fa9c15c07..a72a5991e7b5 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlJdbcSourceAcceptanceTest.java @@ -4,6 +4,9 @@ package io.airbyte.integrations.source.mssql; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; @@ -16,15 +19,20 @@ import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; +import io.airbyte.protocol.models.AirbyteConnectionStatus; import java.sql.JDBCType; import javax.sql.DataSource; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.testcontainers.containers.MSSQLServerContainer; public class MssqlJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest { + protected static final String USERNAME_WITHOUT_PERMISSION = "new_user"; + protected static final String PASSWORD_WITHOUT_PERMISSION = "password_3435!"; private static MSSQLServerContainer dbContainer; private JsonNode config; @@ -92,4 +100,55 @@ public String getDriverClass() { return MssqlSource.DRIVER_CLASS; } + @Test + void testCheckIncorrectPasswordFailure() throws Exception { + ((ObjectNode) config).put(JdbcUtils.PASSWORD_KEY, "fake"); + final AirbyteConnectionStatus status = source.check(config); + Assertions.assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: S0001; Error code: 18456;")); + } + + @Test + public void testCheckIncorrectUsernameFailure() throws Exception { + ((ObjectNode) config).put(JdbcUtils.USERNAME_KEY, "fake"); + final AirbyteConnectionStatus status = source.check(config); + Assertions.assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: S0001; Error code: 18456;")); + } + + @Test + public void testCheckIncorrectHostFailure() throws Exception { + ((ObjectNode) config).put(JdbcUtils.HOST_KEY, "localhost2"); + final AirbyteConnectionStatus status = source.check(config); + Assertions.assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 08S01;")); + } + + @Test + public void testCheckIncorrectPortFailure() throws Exception { + ((ObjectNode) config).put(JdbcUtils.PORT_KEY, "0000"); + final AirbyteConnectionStatus status = source.check(config); + Assertions.assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 08S01;")); + } + + @Test + public void testCheckIncorrectDataBaseFailure() throws Exception { + ((ObjectNode) config).put(JdbcUtils.DATABASE_KEY, "wrongdatabase"); + final AirbyteConnectionStatus status = source.check(config); + Assertions.assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: S0001; Error code: 4060;")); + } + + @Test + public void testUserHasNoPermissionToDataBase() throws Exception { + database.execute(ctx -> ctx.createStatement() + .execute(String.format("CREATE LOGIN %s WITH PASSWORD = '%s'; ", USERNAME_WITHOUT_PERMISSION, PASSWORD_WITHOUT_PERMISSION))); + ((ObjectNode) config).put(JdbcUtils.USERNAME_KEY, USERNAME_WITHOUT_PERMISSION); + ((ObjectNode) config).put(JdbcUtils.PASSWORD_KEY, PASSWORD_WITHOUT_PERMISSION); + final AirbyteConnectionStatus status = source.check(config); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: S0001; Error code: 4060;")); + } + } diff --git a/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile index fc46f47819dc..033cad6c5028 100644 --- a/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-mysql-strict-encrypt/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-mysql-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.6.10 +LABEL io.airbyte.version=0.6.11 LABEL io.airbyte.name=airbyte/source-mysql-strict-encrypt diff --git a/airbyte-integrations/connectors/source-mysql/Dockerfile b/airbyte-integrations/connectors/source-mysql/Dockerfile index 976852a20f59..6b71315fda68 100644 --- a/airbyte-integrations/connectors/source-mysql/Dockerfile +++ b/airbyte-integrations/connectors/source-mysql/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-mysql COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.6.10 +LABEL io.airbyte.version=0.6.11 LABEL io.airbyte.name=airbyte/source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java index 6f9a5f94ab74..d819f9c6027f 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java @@ -5,8 +5,10 @@ package io.airbyte.integrations.source.mysql; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; import com.mysql.cj.MysqlType; import io.airbyte.commons.features.EnvVariableFeatureFlags; @@ -21,6 +23,7 @@ import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; import io.airbyte.integrations.source.relationaldb.models.DbStreamState; import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.AirbyteRecordMessage; @@ -47,6 +50,8 @@ class MySqlJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest { + protected static final String USERNAME_WITHOUT_PERMISSION = "new_user"; + protected static final String PASSWORD_WITHOUT_PERMISSION = "new_password"; protected static final String TEST_USER = "test"; protected static final Callable TEST_PASSWORD = () -> "test"; protected static MySQLContainer container; @@ -135,6 +140,67 @@ void testSpec() throws Exception { assertEquals(expected, actual); } + /** + * MySQL Error Codes: + *

+ * https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-error-sqlstates.html + *

+ * + * @throws Exception + */ + @Test + void testCheckIncorrectPasswordFailure() throws Exception { + ((ObjectNode) config).put(JdbcUtils.PASSWORD_KEY, "fake"); + final AirbyteConnectionStatus status = source.check(config); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 08001;")); + } + + @Test + public void testCheckIncorrectUsernameFailure() throws Exception { + ((ObjectNode) config).put(JdbcUtils.USERNAME_KEY, "fake"); + final AirbyteConnectionStatus status = source.check(config); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + // do not test for message since there seems to be flakiness where sometimes the test will get the message with + // State code: 08001 or State code: 28000 + } + + @Test + public void testCheckIncorrectHostFailure() throws Exception { + ((ObjectNode) config).put(JdbcUtils.HOST_KEY, "localhost2"); + final AirbyteConnectionStatus status = source.check(config); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 08S01;")); + } + + @Test + public void testCheckIncorrectPortFailure() throws Exception { + ((ObjectNode) config).put(JdbcUtils.PORT_KEY, "0000"); + final AirbyteConnectionStatus status = source.check(config); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 08S01;")); + } + + @Test + public void testCheckIncorrectDataBaseFailure() throws Exception { + ((ObjectNode) config).put(JdbcUtils.DATABASE_KEY, "wrongdatabase"); + final AirbyteConnectionStatus status = source.check(config); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 42000; Error code: 1049;")); + } + + @Test + public void testUserHasNoPermissionToDataBase() throws Exception { + final Connection connection = DriverManager.getConnection(container.getJdbcUrl(), "root", TEST_PASSWORD.call()); + connection.createStatement() + .execute("create user '" + USERNAME_WITHOUT_PERMISSION + "'@'%' IDENTIFIED BY '" + PASSWORD_WITHOUT_PERMISSION + "';\n"); + ((ObjectNode) config).put(JdbcUtils.USERNAME_KEY, USERNAME_WITHOUT_PERMISSION); + ((ObjectNode) config).put(JdbcUtils.PASSWORD_KEY, PASSWORD_WITHOUT_PERMISSION); + final AirbyteConnectionStatus status = source.check(config); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 08001;")); + } + @Override protected AirbyteCatalog getCatalog(final String defaultNamespace) { return new AirbyteCatalog().withStreams(List.of( @@ -199,7 +265,7 @@ protected List getTestMessages() { } @Override - protected List getExpectedAirbyteMessagesSecondSync(String namespace) { + protected List getExpectedAirbyteMessagesSecondSync(final String namespace) { final List expectedMessages = new ArrayList<>(); expectedMessages.add(new AirbyteMessage().withType(Type.RECORD) .withRecord(new AirbyteRecordMessage().withStream(streamName).withNamespace(namespace) diff --git a/airbyte-integrations/connectors/source-oracle/src/test/java/io/airbyte/integrations/source/oracle/OracleJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-oracle/src/test/java/io/airbyte/integrations/source/oracle/OracleJdbcSourceAcceptanceTest.java index 50c77c1bf75f..714ca43d55eb 100644 --- a/airbyte-integrations/connectors/source-oracle/src/test/java/io/airbyte/integrations/source/oracle/OracleJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-oracle/src/test/java/io/airbyte/integrations/source/oracle/OracleJdbcSourceAcceptanceTest.java @@ -9,6 +9,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -16,11 +17,13 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; import io.airbyte.commons.util.MoreIterators; +import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; import io.airbyte.integrations.source.relationaldb.models.DbState; import io.airbyte.integrations.source.relationaldb.models.DbStreamState; import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.AirbyteRecordMessage; @@ -57,6 +60,8 @@ class OracleJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest { private static final Logger LOGGER = LoggerFactory.getLogger(OracleJdbcSourceAcceptanceTest.class); + protected static final String USERNAME_WITHOUT_PERMISSION = "new_user"; + protected static final String PASSWORD_WITHOUT_PERMISSION = "new_password"; private static OracleContainer ORACLE_DB; @BeforeAll @@ -385,4 +390,50 @@ void testSpec() throws Exception { assertEquals(expected, actual); } + @Test + void testCheckIncorrectPasswordFailure() throws Exception { + // by using a fake password oracle can block user account so we will create separate account for + // this test + executeOracleStatement(String.format("CREATE USER locked_user IDENTIFIED BY password DEFAULT TABLESPACE USERS QUOTA UNLIMITED ON USERS")); + ((ObjectNode) config).put(JdbcUtils.USERNAME_KEY, "locked_user"); + ((ObjectNode) config).put(JdbcUtils.PASSWORD_KEY, "fake"); + final AirbyteConnectionStatus status = source.check(config); + Assertions.assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 99999; Error code: 28000;")); + } + + @Test + public void testCheckIncorrectUsernameFailure() throws Exception { + ((ObjectNode) config).put(JdbcUtils.USERNAME_KEY, "fake"); + final AirbyteConnectionStatus status = source.check(config); + Assertions.assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 72000; Error code: 1017;")); + } + + @Test + public void testCheckIncorrectHostFailure() throws Exception { + ((ObjectNode) config).put(JdbcUtils.HOST_KEY, "localhost2"); + final AirbyteConnectionStatus status = source.check(config); + Assertions.assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 08006; Error code: 17002;")); + } + + @Test + public void testCheckIncorrectPortFailure() throws Exception { + ((ObjectNode) config).put(JdbcUtils.PORT_KEY, "0000"); + final AirbyteConnectionStatus status = source.check(config); + Assertions.assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 08006; Error code: 17002;")); + } + + @Test + public void testUserHasNoPermissionToDataBase() throws Exception { + executeOracleStatement(String.format("CREATE USER %s IDENTIFIED BY %s", USERNAME_WITHOUT_PERMISSION, PASSWORD_WITHOUT_PERMISSION)); + ((ObjectNode) config).put(JdbcUtils.USERNAME_KEY, USERNAME_WITHOUT_PERMISSION); + ((ObjectNode) config).put(JdbcUtils.PASSWORD_KEY, PASSWORD_WITHOUT_PERMISSION); + final AirbyteConnectionStatus status = source.check(config); + Assertions.assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 72000; Error code: 1045;")); + } + } diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile index 7720f0274156..9c6f68d03ffc 100644 --- a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.0.7 +LABEL io.airbyte.version=1.0.8 LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt diff --git a/airbyte-integrations/connectors/source-postgres/Dockerfile b/airbyte-integrations/connectors/source-postgres/Dockerfile index f708a7f03f73..4f870e35df62 100644 --- a/airbyte-integrations/connectors/source-postgres/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.0.7 +LABEL io.airbyte.version=1.0.8 LABEL io.airbyte.name=airbyte/source-postgres diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java index 19d69ccc6ef3..e09667d7dcdf 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java @@ -5,6 +5,7 @@ package io.airbyte.integrations.source.postgres; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -25,6 +26,7 @@ import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; import io.airbyte.integrations.source.relationaldb.models.DbStreamState; import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.CatalogHelpers; @@ -41,6 +43,7 @@ import java.util.List; import java.util.stream.Collectors; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -49,6 +52,9 @@ class PostgresJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest { + private static final String DATABASE = "new_db"; + protected static final String USERNAME_WITHOUT_PERMISSION = "new_user"; + protected static final String PASSWORD_WITHOUT_PERMISSION = "new_password"; private static PostgreSQLContainer PSQL_DB; public static String COL_WAKEUP_AT = "wakeup_at"; public static String COL_LAST_VISITED_AT = "last_visited_at"; @@ -450,4 +456,69 @@ protected boolean supportsPerStream() { return true; } + /** + * Postgres Source Error Codes: + *

+ * https://www.postgresql.org/docs/current/errcodes-appendix.html + *

+ * + * @throws Exception + */ + @Test + void testCheckIncorrectPasswordFailure() throws Exception { + ((ObjectNode) config).put(JdbcUtils.PASSWORD_KEY, "fake"); + final AirbyteConnectionStatus status = source.check(config); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 28P01;")); + } + + @Test + public void testCheckIncorrectUsernameFailure() throws Exception { + ((ObjectNode) config).put(JdbcUtils.USERNAME_KEY, "fake"); + final AirbyteConnectionStatus status = source.check(config); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 28P01;")); + } + + @Test + public void testCheckIncorrectHostFailure() throws Exception { + ((ObjectNode) config).put(JdbcUtils.HOST_KEY, "localhost2"); + final AirbyteConnectionStatus status = source.check(config); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 08001;")); + } + + @Test + public void testCheckIncorrectPortFailure() throws Exception { + ((ObjectNode) config).put(JdbcUtils.PORT_KEY, "30000"); + final AirbyteConnectionStatus status = source.check(config); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 08001;")); + } + + @Test + public void testCheckIncorrectDataBaseFailure() throws Exception { + ((ObjectNode) config).put(JdbcUtils.DATABASE_KEY, "wrongdatabase"); + final AirbyteConnectionStatus status = source.check(config); + assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 3D000;")); + } + + @Test + public void testUserHasNoPermissionToDataBase() throws Exception { + database.execute(connection -> connection.createStatement() + .execute(String.format("create user %s with password '%s';", USERNAME_WITHOUT_PERMISSION, PASSWORD_WITHOUT_PERMISSION))); + database.execute(connection -> connection.createStatement() + .execute(String.format("create database %s;", DATABASE))); + // deny access for database for all users from group public + database.execute(connection -> connection.createStatement() + .execute(String.format("revoke all on database %s from public;", DATABASE))); + ((ObjectNode) config).put("username", USERNAME_WITHOUT_PERMISSION); + ((ObjectNode) config).put("password", PASSWORD_WITHOUT_PERMISSION); + ((ObjectNode) config).put("database", DATABASE); + final AirbyteConnectionStatus status = source.check(config); + Assertions.assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 42501;")); + } + } diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java index e4c5d7724147..78a1ba766cc1 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java @@ -4,6 +4,8 @@ package io.airbyte.integrations.source.relationaldb; +import static io.airbyte.integrations.base.errors.messages.ErrorMessage.getErrorMessage; + import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -19,9 +21,11 @@ import io.airbyte.config.helpers.StateMessageHelper; import io.airbyte.db.AbstractDatabase; import io.airbyte.db.IncrementalUtils; +import io.airbyte.db.exception.ConnectionErrorException; import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.integrations.BaseConnector; import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.integrations.base.AirbyteTraceMessageUtility; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.source.relationaldb.models.DbState; import io.airbyte.integrations.source.relationaldb.state.StateManager; @@ -80,6 +84,12 @@ public AirbyteConnectionStatus check(final JsonNode config) throws Exception { } return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED); + } catch (final ConnectionErrorException ex) { + final String message = getErrorMessage(ex.getStateCode(), ex.getErrorCode(), ex.getExceptionMessage(), ex); + AirbyteTraceMessageUtility.emitConfigErrorTrace(ex, message); + return new AirbyteConnectionStatus() + .withStatus(Status.FAILED) + .withMessage(message); } catch (final Exception e) { LOGGER.info("Exception while checking connection: ", e); return new AirbyteConnectionStatus() diff --git a/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeJdbcSourceAcceptanceTest.java index b4da8e83ca6e..8720817cabc1 100644 --- a/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeJdbcSourceAcceptanceTest.java @@ -5,6 +5,7 @@ package io.airbyte.integrations.io.airbyte.integration_tests.sources; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; @@ -13,6 +14,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.commons.string.Strings; import io.airbyte.db.factory.DataSourceFactory; +import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; import io.airbyte.integrations.source.snowflake.SnowflakeSource; @@ -97,9 +99,34 @@ public AbstractJdbcSource getJdbcSource() { @Test void testCheckFailure() throws Exception { - ((ObjectNode) config.get("credentials")).put("password", "fake"); - final AirbyteConnectionStatus actual = source.check(config); - assertEquals(Status.FAILED, actual.getStatus()); + ((ObjectNode) config).with("credentials").put(JdbcUtils.PASSWORD_KEY, "fake"); + final AirbyteConnectionStatus status = source.check(config); + assertEquals(Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 08001; Error code: 390100;")); + } + + @Test + public void testCheckIncorrectUsernameFailure() throws Exception { + ((ObjectNode) config).with("credentials").put(JdbcUtils.USERNAME_KEY, "fake"); + final AirbyteConnectionStatus status = source.check(config); + assertEquals(Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 08001; Error code: 390100;")); + } + + @Test + public void testCheckEmptyUsernameFailure() throws Exception { + ((ObjectNode) config).with("credentials").put(JdbcUtils.USERNAME_KEY, ""); + final AirbyteConnectionStatus status = source.check(config); + assertEquals(Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 28000; Error code: 200011;")); + } + + @Test + public void testCheckIncorrectHostFailure() throws Exception { + ((ObjectNode) config).put(JdbcUtils.HOST_KEY, "localhost2"); + final AirbyteConnectionStatus status = source.check(config); + assertEquals(Status.FAILED, status.getStatus()); + assertTrue(status.getMessage().contains("State code: 28000; Error code: 200028;")); } @Override diff --git a/docs/integrations/sources/mysql.md b/docs/integrations/sources/mysql.md index 5b8a6c291857..611d7304d439 100644 --- a/docs/integrations/sources/mysql.md +++ b/docs/integrations/sources/mysql.md @@ -186,61 +186,62 @@ If you do not see a type in this list, assume that it is coerced into a string. ## Changelog -| Version | Date | Pull Request | Subject | -|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------| -| 0.6.10 | 2022-09-08 | [16007](https://github.com/airbytehq/airbyte/pull/16007) | Implement per stream state support. | +| Version | Date | Pull Request | Subject | +|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------| +| 0.6.11 | 2022-09-08 | [16202](https://github.com/airbytehq/airbyte/pull/16202) | Adds error messaging factory to UI | +| 0.6.10 | 2022-09-08 | [16007](https://github.com/airbytehq/airbyte/pull/16007) | Implement per stream state support. | | 0.6.9 | 2022-09-03 | [16216](https://github.com/airbytehq/airbyte/pull/16216) | Standardize spec for CDC replication. Replace the `replication_method` enum with a config object with a `method` enum field. | -| 0.6.8 | 2022-09-01 | [16259](https://github.com/airbytehq/airbyte/pull/16259) | Emit state messages more frequently | -| 0.6.7 | 2022-08-30 | [16114](https://github.com/airbytehq/airbyte/pull/16114) | Prevent traffic going on an unsecured channel in strict-encryption version of source mysql | -| 0.6.6 | 2022-08-25 | [15993](https://github.com/airbytehq/airbyte/pull/15993) | Improved support for connecting over SSL | -| 0.6.5 | 2022-08-25 | [15917](https://github.com/airbytehq/airbyte/pull/15917) | Fix temporal data type default value bug | -| 0.6.4 | 2022-08-18 | [14356](https://github.com/airbytehq/airbyte/pull/14356) | DB Sources: only show a table can sync incrementally if at least one column can be used as a cursor field | -| 0.6.3 | 2022-08-12 | [15044](https://github.com/airbytehq/airbyte/pull/15044) | Added the ability to connect using different SSL modes and SSL certificates | -| 0.6.2 | 2022-08-11 | [15538](https://github.com/airbytehq/airbyte/pull/15538) | Allow additional properties in db stream state | -| 0.6.1 | 2022-08-02 | [14801](https://github.com/airbytehq/airbyte/pull/14801) | Fix multiple log bindings | -| 0.6.0 | 2022-07-26 | [14362](https://github.com/airbytehq/airbyte/pull/14362) | Integral columns are now discovered as int64 fields. | -| 0.5.17 | 2022-07-22 | [14714](https://github.com/airbytehq/airbyte/pull/14714) | Clarified error message when invalid cursor column selected | -| 0.5.16 | 2022-07-14 | [14574](https://github.com/airbytehq/airbyte/pull/14574) | Removed additionalProperties:false from JDBC source connectors | -| 0.5.15 | 2022-06-23 | [14077](https://github.com/airbytehq/airbyte/pull/14077) | Use the new state management | -| 0.5.13 | 2022-06-21 | [13945](https://github.com/airbytehq/airbyte/pull/13945) | Aligned datatype test | -| 0.5.12 | 2022-06-17 | [13864](https://github.com/airbytehq/airbyte/pull/13864) | Updated stacktrace format for any trace message errors | -| 0.5.11 | 2022-05-03 | [12544](https://github.com/airbytehq/airbyte/pull/12544) | Prevent source from hanging under certain circumstances by adding a watcher for orphaned threads. | -| 0.5.10 | 2022-04-29 | [12480](https://github.com/airbytehq/airbyte/pull/12480) | Query tables with adaptive fetch size to optimize JDBC memory consumption | -| 0.5.9 | 2022-04-06 | [11729](https://github.com/airbytehq/airbyte/pull/11729) | Bump mina-sshd from 2.7.0 to 2.8.0 | -| 0.5.6 | 2022-02-21 | [10242](https://github.com/airbytehq/airbyte/pull/10242) | Fixed cursor for old connectors that use non-microsecond format. Now connectors work with both formats | -| 0.5.5 | 2022-02-18 | [10242](https://github.com/airbytehq/airbyte/pull/10242) | Updated timestamp transformation with microseconds | -| 0.5.4 | 2022-02-11 | [10251](https://github.com/airbytehq/airbyte/issues/10251) | bug Source MySQL CDC: sync failed when has Zero-date value in mandatory column | -| 0.5.2 | 2021-12-14 | [6425](https://github.com/airbytehq/airbyte/issues/6425) | MySQL CDC sync fails because starting binlog position not found in DB | -| 0.5.1 | 2021-12-13 | [8582](https://github.com/airbytehq/airbyte/pull/8582) | Update connector fields title/description | -| 0.5.0 | 2021-12-11 | [7970](https://github.com/airbytehq/airbyte/pull/7970) | Support all MySQL types | -| 0.4.13 | 2021-12-03 | [8335](https://github.com/airbytehq/airbyte/pull/8335) | Source-MySql: do not check cdc required param binlog_row_image for standard replication | -| 0.4.12 | 2021-12-01 | [8371](https://github.com/airbytehq/airbyte/pull/8371) | Fixed incorrect handling "\n" in ssh key | -| 0.4.11 | 2021-11-19 | [8047](https://github.com/airbytehq/airbyte/pull/8047) | Source MySQL: transform binary data base64 format | -| 0.4.10 | 2021-11-15 | [7820](https://github.com/airbytehq/airbyte/pull/7820) | Added basic performance test | -| 0.4.9 | 2021-11-02 | [7559](https://github.com/airbytehq/airbyte/pull/7559) | Correctly process large unsigned short integer values which may fall outside java's `Short` data type capability | -| 0.4.8 | 2021-09-16 | [6093](https://github.com/airbytehq/airbyte/pull/6093) | Improve reliability of processing various data types like decimals, dates, datetime, binary, and text | -| 0.4.7 | 2021-09-30 | [6585](https://github.com/airbytehq/airbyte/pull/6585) | Improved SSH Tunnel key generation steps | -| 0.4.6 | 2021-09-29 | [6510](https://github.com/airbytehq/airbyte/pull/6510) | Support SSL connection | -| 0.4.5 | 2021-09-17 | [6146](https://github.com/airbytehq/airbyte/pull/6146) | Added option to connect to DB via SSH | -| 0.4.1 | 2021-07-23 | [4956](https://github.com/airbytehq/airbyte/pull/4956) | Fix log link | -| 0.3.7 | 2021-06-09 | [3179](https://github.com/airbytehq/airbyte/pull/3973) | Add AIRBYTE\_ENTRYPOINT for Kubernetes support | -| 0.3.6 | 2021-06-09 | [3966](https://github.com/airbytehq/airbyte/pull/3966) | Fix excessive logging for CDC method | -| 0.3.5 | 2021-06-07 | [3890](https://github.com/airbytehq/airbyte/pull/3890) | Fix CDC handle tinyint\(1\) and boolean types | -| 0.3.4 | 2021-06-04 | [3846](https://github.com/airbytehq/airbyte/pull/3846) | Fix max integer value failure | -| 0.3.3 | 2021-06-02 | [3789](https://github.com/airbytehq/airbyte/pull/3789) | MySQL CDC poll wait 5 minutes when not received a single record | -| 0.3.2 | 2021-06-01 | [3757](https://github.com/airbytehq/airbyte/pull/3757) | MySQL CDC poll 5s to 5 min | -| 0.3.1 | 2021-06-01 | [3505](https://github.com/airbytehq/airbyte/pull/3505) | Implemented MySQL CDC | -| 0.3.0 | 2021-04-21 | [2990](https://github.com/airbytehq/airbyte/pull/2990) | Support namespaces | -| 0.2.5 | 2021-04-15 | [2899](https://github.com/airbytehq/airbyte/pull/2899) | Fix bug in tests | -| 0.2.4 | 2021-03-28 | [2600](https://github.com/airbytehq/airbyte/pull/2600) | Add NCHAR and NVCHAR support to DB and cursor type casting | -| 0.2.3 | 2021-03-26 | [2611](https://github.com/airbytehq/airbyte/pull/2611) | Add an optional `jdbc_url_params` in parameters | -| 0.2.2 | 2021-03-26 | [2460](https://github.com/airbytehq/airbyte/pull/2460) | Destination supports destination sync mode | -| 0.2.1 | 2021-03-18 | [2488](https://github.com/airbytehq/airbyte/pull/2488) | Sources support primary keys | -| 0.2.0 | 2021-03-09 | [2238](https://github.com/airbytehq/airbyte/pull/2238) | Protocol allows future/unknown properties | -| 0.1.10 | 2021-02-02 | [1887](https://github.com/airbytehq/airbyte/pull/1887) | Migrate AbstractJdbcSource to use iterators | -| 0.1.9 | 2021-01-25 | [1746](https://github.com/airbytehq/airbyte/pull/1746) | Fix NPE in State Decorator | -| 0.1.8 | 2021-01-19 | [1724](https://github.com/airbytehq/airbyte/pull/1724) | Fix JdbcSource handling of tables with same names in different schemas | -| 0.1.7 | 2021-01-14 | [1655](https://github.com/airbytehq/airbyte/pull/1655) | Fix JdbcSource OOM | -| 0.1.6 | 2021-01-08 | [1307](https://github.com/airbytehq/airbyte/pull/1307) | Migrate Postgres and MySQL to use new JdbcSource | -| 0.1.5 | 2020-12-11 | [1267](https://github.com/airbytehq/airbyte/pull/1267) | Support incremental sync | -| 0.1.4 | 2020-11-30 | [1046](https://github.com/airbytehq/airbyte/pull/1046) | Add connectors using an index YAML file | +| 0.6.8 | 2022-09-01 | [16259](https://github.com/airbytehq/airbyte/pull/16259) | Emit state messages more frequently | +| 0.6.7 | 2022-08-30 | [16114](https://github.com/airbytehq/airbyte/pull/16114) | Prevent traffic going on an unsecured channel in strict-encryption version of source mysql | +| 0.6.6 | 2022-08-25 | [15993](https://github.com/airbytehq/airbyte/pull/15993) | Improved support for connecting over SSL | +| 0.6.5 | 2022-08-25 | [15917](https://github.com/airbytehq/airbyte/pull/15917) | Fix temporal data type default value bug | +| 0.6.4 | 2022-08-18 | [14356](https://github.com/airbytehq/airbyte/pull/14356) | DB Sources: only show a table can sync incrementally if at least one column can be used as a cursor field | +| 0.6.3 | 2022-08-12 | [15044](https://github.com/airbytehq/airbyte/pull/15044) | Added the ability to connect using different SSL modes and SSL certificates | +| 0.6.2 | 2022-08-11 | [15538](https://github.com/airbytehq/airbyte/pull/15538) | Allow additional properties in db stream state | +| 0.6.1 | 2022-08-02 | [14801](https://github.com/airbytehq/airbyte/pull/14801) | Fix multiple log bindings | +| 0.6.0 | 2022-07-26 | [14362](https://github.com/airbytehq/airbyte/pull/14362) | Integral columns are now discovered as int64 fields. | +| 0.5.17 | 2022-07-22 | [14714](https://github.com/airbytehq/airbyte/pull/14714) | Clarified error message when invalid cursor column selected | +| 0.5.16 | 2022-07-14 | [14574](https://github.com/airbytehq/airbyte/pull/14574) | Removed additionalProperties:false from JDBC source connectors | +| 0.5.15 | 2022-06-23 | [14077](https://github.com/airbytehq/airbyte/pull/14077) | Use the new state management | +| 0.5.13 | 2022-06-21 | [13945](https://github.com/airbytehq/airbyte/pull/13945) | Aligned datatype test | +| 0.5.12 | 2022-06-17 | [13864](https://github.com/airbytehq/airbyte/pull/13864) | Updated stacktrace format for any trace message errors | +| 0.5.11 | 2022-05-03 | [12544](https://github.com/airbytehq/airbyte/pull/12544) | Prevent source from hanging under certain circumstances by adding a watcher for orphaned threads. | +| 0.5.10 | 2022-04-29 | [12480](https://github.com/airbytehq/airbyte/pull/12480) | Query tables with adaptive fetch size to optimize JDBC memory consumption | +| 0.5.9 | 2022-04-06 | [11729](https://github.com/airbytehq/airbyte/pull/11729) | Bump mina-sshd from 2.7.0 to 2.8.0 | +| 0.5.6 | 2022-02-21 | [10242](https://github.com/airbytehq/airbyte/pull/10242) | Fixed cursor for old connectors that use non-microsecond format. Now connectors work with both formats | +| 0.5.5 | 2022-02-18 | [10242](https://github.com/airbytehq/airbyte/pull/10242) | Updated timestamp transformation with microseconds | +| 0.5.4 | 2022-02-11 | [10251](https://github.com/airbytehq/airbyte/issues/10251) | bug Source MySQL CDC: sync failed when has Zero-date value in mandatory column | +| 0.5.2 | 2021-12-14 | [6425](https://github.com/airbytehq/airbyte/issues/6425) | MySQL CDC sync fails because starting binlog position not found in DB | +| 0.5.1 | 2021-12-13 | [8582](https://github.com/airbytehq/airbyte/pull/8582) | Update connector fields title/description | +| 0.5.0 | 2021-12-11 | [7970](https://github.com/airbytehq/airbyte/pull/7970) | Support all MySQL types | +| 0.4.13 | 2021-12-03 | [8335](https://github.com/airbytehq/airbyte/pull/8335) | Source-MySql: do not check cdc required param binlog_row_image for standard replication | +| 0.4.12 | 2021-12-01 | [8371](https://github.com/airbytehq/airbyte/pull/8371) | Fixed incorrect handling "\n" in ssh key | +| 0.4.11 | 2021-11-19 | [8047](https://github.com/airbytehq/airbyte/pull/8047) | Source MySQL: transform binary data base64 format | +| 0.4.10 | 2021-11-15 | [7820](https://github.com/airbytehq/airbyte/pull/7820) | Added basic performance test | +| 0.4.9 | 2021-11-02 | [7559](https://github.com/airbytehq/airbyte/pull/7559) | Correctly process large unsigned short integer values which may fall outside java's `Short` data type capability | +| 0.4.8 | 2021-09-16 | [6093](https://github.com/airbytehq/airbyte/pull/6093) | Improve reliability of processing various data types like decimals, dates, datetime, binary, and text | +| 0.4.7 | 2021-09-30 | [6585](https://github.com/airbytehq/airbyte/pull/6585) | Improved SSH Tunnel key generation steps | +| 0.4.6 | 2021-09-29 | [6510](https://github.com/airbytehq/airbyte/pull/6510) | Support SSL connection | +| 0.4.5 | 2021-09-17 | [6146](https://github.com/airbytehq/airbyte/pull/6146) | Added option to connect to DB via SSH | +| 0.4.1 | 2021-07-23 | [4956](https://github.com/airbytehq/airbyte/pull/4956) | Fix log link | +| 0.3.7 | 2021-06-09 | [3179](https://github.com/airbytehq/airbyte/pull/3973) | Add AIRBYTE\_ENTRYPOINT for Kubernetes support | +| 0.3.6 | 2021-06-09 | [3966](https://github.com/airbytehq/airbyte/pull/3966) | Fix excessive logging for CDC method | +| 0.3.5 | 2021-06-07 | [3890](https://github.com/airbytehq/airbyte/pull/3890) | Fix CDC handle tinyint\(1\) and boolean types | +| 0.3.4 | 2021-06-04 | [3846](https://github.com/airbytehq/airbyte/pull/3846) | Fix max integer value failure | +| 0.3.3 | 2021-06-02 | [3789](https://github.com/airbytehq/airbyte/pull/3789) | MySQL CDC poll wait 5 minutes when not received a single record | +| 0.3.2 | 2021-06-01 | [3757](https://github.com/airbytehq/airbyte/pull/3757) | MySQL CDC poll 5s to 5 min | +| 0.3.1 | 2021-06-01 | [3505](https://github.com/airbytehq/airbyte/pull/3505) | Implemented MySQL CDC | +| 0.3.0 | 2021-04-21 | [2990](https://github.com/airbytehq/airbyte/pull/2990) | Support namespaces | +| 0.2.5 | 2021-04-15 | [2899](https://github.com/airbytehq/airbyte/pull/2899) | Fix bug in tests | +| 0.2.4 | 2021-03-28 | [2600](https://github.com/airbytehq/airbyte/pull/2600) | Add NCHAR and NVCHAR support to DB and cursor type casting | +| 0.2.3 | 2021-03-26 | [2611](https://github.com/airbytehq/airbyte/pull/2611) | Add an optional `jdbc_url_params` in parameters | +| 0.2.2 | 2021-03-26 | [2460](https://github.com/airbytehq/airbyte/pull/2460) | Destination supports destination sync mode | +| 0.2.1 | 2021-03-18 | [2488](https://github.com/airbytehq/airbyte/pull/2488) | Sources support primary keys | +| 0.2.0 | 2021-03-09 | [2238](https://github.com/airbytehq/airbyte/pull/2238) | Protocol allows future/unknown properties | +| 0.1.10 | 2021-02-02 | [1887](https://github.com/airbytehq/airbyte/pull/1887) | Migrate AbstractJdbcSource to use iterators | +| 0.1.9 | 2021-01-25 | [1746](https://github.com/airbytehq/airbyte/pull/1746) | Fix NPE in State Decorator | +| 0.1.8 | 2021-01-19 | [1724](https://github.com/airbytehq/airbyte/pull/1724) | Fix JdbcSource handling of tables with same names in different schemas | +| 0.1.7 | 2021-01-14 | [1655](https://github.com/airbytehq/airbyte/pull/1655) | Fix JdbcSource OOM | +| 0.1.6 | 2021-01-08 | [1307](https://github.com/airbytehq/airbyte/pull/1307) | Migrate Postgres and MySQL to use new JdbcSource | +| 0.1.5 | 2020-12-11 | [1267](https://github.com/airbytehq/airbyte/pull/1267) | Support incremental sync | +| 0.1.4 | 2020-11-30 | [1046](https://github.com/airbytehq/airbyte/pull/1046) | Add connectors using an index YAML file | diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index b29799b2bfb3..3aa0dcf7429b 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -372,6 +372,7 @@ Possible solutions include: | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 1.0.8 | 2022-09-08 | [16202](https://github.com/airbytehq/airbyte/pull/16202) | Adds error messaging factory to UI | | 1.0.7 | 2022-08-30 | [16114](https://github.com/airbytehq/airbyte/pull/16114) | Prevent traffic going on an unsecured channel in strict-encryption version of source postgres | | 1.0.6 | 2022-08-30 | [16138](https://github.com/airbytehq/airbyte/pull/16138) | Remove unnecessary logging | | 1.0.5 | 2022-08-25 | [15993](https://github.com/airbytehq/airbyte/pull/15993) | Add support for connection over SSL in CDC mode |