Skip to content

Commit

Permalink
Cleans and Rebase Error Message Factory PR (#16202)
Browse files Browse the repository at this point in the history
* Cleaned error messages factory PR

* Bumped MySQL and Postgres version

* Fixed messages and typos in test

* Fixes the changelog conflict with per-stream state

* Added note for flaky test

* Bumps mysql version to match changelog

* Added exception objects to all LOGGER.error for more visibility

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
ryankfu and octavia-squidington-iii authored Sep 12, 2022
1 parent dcfcb75 commit 50a8d03
Show file tree
Hide file tree
Showing 39 changed files with 988 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@
- name: MySQL
sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerRepository: airbyte/source-mysql
dockerImageTag: 0.6.10
dockerImageTag: 0.6.11
documentationUrl: https://docs.airbyte.io/integrations/sources/mysql
icon: mysql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6480,7 +6480,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mysql:0.6.10"
- dockerImage: "airbyte/source-mysql:0.6.11"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/mysql"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.exception;

public class ConnectionErrorException extends RuntimeException {

private String stateCode;
private int errorCode;
private String exceptionMessage;

public ConnectionErrorException(final String exceptionMessage) {
super(exceptionMessage);
}

public ConnectionErrorException(final String stateCode, final Throwable exception) {
super(exception);
this.stateCode = stateCode;
this.exceptionMessage = exception.getMessage();
}

public ConnectionErrorException(final String stateCode,
final String exceptionMessage,
final Throwable exception) {
super(exception);
this.stateCode = stateCode;
this.exceptionMessage = exceptionMessage;
}

public ConnectionErrorException(final String stateCode,
final int errorCode,
final String exceptionMessage,
final Throwable exception) {
super(exception);
this.stateCode = stateCode;
this.errorCode = errorCode;
this.exceptionMessage = exceptionMessage;
}

public String getStateCode() {
return this.stateCode;
}

public int getErrorCode() {
return errorCode;
}

public String getExceptionMessage() {
return exceptionMessage;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.db.JdbcCompatibleSourceOperations;
import io.airbyte.db.exception.ConnectionErrorException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.sql.DataSource;
Expand Down Expand Up @@ -76,6 +78,14 @@ public DatabaseMetaData getMetaData() throws SQLException {
try (final Connection connection = dataSource.getConnection()) {
final DatabaseMetaData metaData = connection.getMetaData();
return metaData;
} catch (final SQLException e) {
// Some databases like Redshift will have null cause
if (Objects.isNull(e.getCause()) || !(e.getCause() instanceof SQLException)) {
throw new ConnectionErrorException(e.getSQLState(), e.getErrorCode(), e.getMessage(), e);
} else {
final SQLException cause = (SQLException) e.getCause();
throw new ConnectionErrorException(e.getSQLState(), cause.getErrorCode(), cause.getMessage(), e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.mongodb.ConnectionString;
import com.mongodb.MongoConfigurationException;
import com.mongodb.ReadConcern;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
Expand All @@ -16,6 +17,7 @@
import io.airbyte.commons.functional.CheckedFunction;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.db.AbstractDatabase;
import io.airbyte.db.exception.ConnectionErrorException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -47,8 +49,11 @@ public MongoDatabase(final String connectionString, final String databaseName) {
this.connectionString = new ConnectionString(connectionString);
mongoClient = MongoClients.create(this.connectionString);
database = mongoClient.getDatabase(databaseName);
} catch (final MongoConfigurationException e) {
LOGGER.error(e.getMessage(), e);
throw new ConnectionErrorException(String.valueOf(e.getCode()), e.getMessage(), e);
} catch (final Exception e) {
LOGGER.error(e.getMessage());
LOGGER.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.base.errors.messages;

import java.util.Objects;

public class ErrorMessage {

// TODO: this could be built using a Builder design pattern instead of passing in 0 to indicate no
// errorCode exists
public static String getErrorMessage(final String stateCode, final int errorCode, final String message, final Exception exception) {
if (Objects.isNull(message)) {
return configMessage(stateCode, 0, exception.getMessage());
} else {
return configMessage(stateCode, errorCode, message);
}
}

private static String configMessage(final String stateCode, final int errorCode, final String message) {
final String stateCodePart = Objects.isNull(stateCode) ? "" : String.format("State code: %s; ", stateCode);
final String errorCodePart = errorCode == 0 ? "" : String.format("Error code: %s; ", errorCode);
return String.format("%s%sMessage: %s", stateCodePart, errorCodePart, message);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
AIRBYTE_PRIVATE_KEY
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ private static void attemptWriteAndDeleteDynamodbItem(final DynamodbDestinationC
.putItem(
new Item().withPrimaryKey(JavaBaseConstants.COLUMN_NAME_AB_ID, UUID.randomUUID().toString(), "sync_time", System.currentTimeMillis()));
} catch (final Exception e) {
LOGGER.error(e.getMessage());
LOGGER.error(e.getMessage(), e);
}

table.delete(); // delete table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public DynamodbWriter(final DynamodbDestinationConfig config,
final var table = createTableIfNotExists(amazonDynamodb, outputTableName);
table.waitForActive();
} catch (final Exception e) {
LOGGER.error(e.getMessage());
LOGGER.error(e.getMessage(), e);
}

this.tableWriteItems = new TableWriteItems(outputTableName);
Expand Down Expand Up @@ -134,7 +134,7 @@ public void write(final UUID id, final AirbyteRecordMessage recordMessage) {
LOGGER.warn(String.format("Unprocessed items count after retry %d times: %s", 5, Integer.toString(outcome.getUnprocessedItems().size())));
}
} catch (final Exception e) {
LOGGER.error(e.getMessage());
LOGGER.error(e.getMessage(), e);
}
}
}
Expand All @@ -156,7 +156,7 @@ public void close(final boolean hasFailed) throws IOException {
}
}
} catch (final Exception e) {
LOGGER.error(e.getMessage());
LOGGER.error(e.getMessage(), e);
}
LOGGER.info("Data writing completed for DynamoDB.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ protected List<Item> getAllSyncedObjects(final String streamName, final String n
maxSyncTime = Math.max(maxSyncTime, ((BigDecimal) item.get("sync_time")).longValue());
}
} catch (final Exception e) {
LOGGER.error(e.getMessage());
LOGGER.error(e.getMessage(), e);
}

items.sort(Comparator.comparingLong(o -> ((BigDecimal) o.get(JavaBaseConstants.COLUMN_NAME_EMITTED_AT)).longValue()));
Expand Down Expand Up @@ -155,7 +155,7 @@ protected void tearDown(final TestDestinationEnv testEnv) {
LOGGER.info(String.format("Delete table %s", tableName));
}
} catch (final Exception e) {
LOGGER.error(e.getMessage());
LOGGER.error(e.getMessage(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@

package io.airbyte.integrations.destination.gcs;

import static io.airbyte.integrations.base.errors.messages.ErrorMessage.getErrorMessage;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.destination.NamingConventionTransformer;
Expand Down Expand Up @@ -52,10 +56,16 @@ public AirbyteConnectionStatus check(final JsonNode config) {
S3Destination.testMultipartUpload(s3Client, destinationConfig.getBucketName(), destinationConfig.getBucketPath());

return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
} catch (final AmazonS3Exception e) {
LOGGER.error("Exception attempting to access the AWS bucket: {}", e.getMessage());
final String message = getErrorMessage(e.getErrorCode(), 0, e.getMessage(), e);
AirbyteTraceMessageUtility.emitConfigErrorTrace(e, message);
return new AirbyteConnectionStatus()
.withStatus(Status.FAILED)
.withMessage(message);
} catch (final Exception e) {
LOGGER.error("Exception attempting to access the Gcs bucket: {}", e.getMessage());
LOGGER.error("Please make sure you account has all of these roles: " + EXPECTED_ROLES);

LOGGER.error("Exception attempting to access the AWS bucket: {}. Please make sure you account has all of these roles: {}", e.getMessage(), EXPECTED_ROLES);
AirbyteTraceMessageUtility.emitConfigErrorTrace(e, e.getMessage());
return new AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED)
.withMessage("Could not connect to the Gcs bucket with the provided configuration. \n" + e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
package io.airbyte.integrations.destination.gcs;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.jackson.MoreMappers;
import io.airbyte.commons.json.Jsons;
Expand All @@ -23,6 +25,7 @@
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import java.nio.file.Path;
import java.util.Comparator;
import java.util.LinkedList;
Expand Down Expand Up @@ -112,8 +115,8 @@ protected JsonNode getFailCheckConfig() {
final JsonNode baseJson = getBaseConfigJson();
final JsonNode failCheckJson = Jsons.clone(baseJson);
// invalid credential
((ObjectNode) failCheckJson).put("access_key_id", "fake-key");
((ObjectNode) failCheckJson).put("secret_access_key", "fake-secret");
((ObjectNode) failCheckJson).put("hmac_key_access_id", "fake-key");
((ObjectNode) failCheckJson).put("hmac_key_secret", "fake-secret");
return failCheckJson;
}

Expand Down Expand Up @@ -218,4 +221,52 @@ public void testCheckConnectionInsufficientRoles() throws Exception {
assertEquals(Status.FAILED, runCheck(configJson).getStatus());
}

@Test
public void testCheckIncorrectHmacKeyAccessIdCredential() {
final JsonNode baseJson = getBaseConfigJson();
final JsonNode credential = Jsons.jsonNode(ImmutableMap.builder()
.put("credential_type", "HMAC_KEY")
.put("hmac_key_access_id", "fake-key")
.put("hmac_key_secret", baseJson.get("credential").get("hmac_key_secret").asText())
.build());

((ObjectNode) baseJson).put("credential", credential);
((ObjectNode) baseJson).set("format", getFormatConfig());

final GcsDestination destination = new GcsDestination();
final AirbyteConnectionStatus status = destination.check(baseJson);
assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus());
assertTrue(status.getMessage().contains("State code: SignatureDoesNotMatch;"));
}

@Test
public void testCheckIncorrectHmacKeySecretCredential() {
final JsonNode baseJson = getBaseConfigJson();
final JsonNode credential = Jsons.jsonNode(ImmutableMap.builder()
.put("credential_type", "HMAC_KEY")
.put("hmac_key_access_id", baseJson.get("credential").get("hmac_key_access_id").asText())
.put("hmac_key_secret", "fake-secret")
.build());

((ObjectNode) baseJson).put("credential", credential);
((ObjectNode) baseJson).set("format", getFormatConfig());

final GcsDestination destination = new GcsDestination();
final AirbyteConnectionStatus status = destination.check(baseJson);
assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus());
assertTrue(status.getMessage().contains("State code: SignatureDoesNotMatch;"));
}

@Test
public void testCheckIncorrectBucketCredential() {
final JsonNode baseJson = getBaseConfigJson();
((ObjectNode) baseJson).put("gcs_bucket_name", "fake_bucket");
((ObjectNode) baseJson).set("format", getFormatConfig());

final GcsDestination destination = new GcsDestination();
final AirbyteConnectionStatus status = destination.check(baseJson);
assertEquals(AirbyteConnectionStatus.Status.FAILED, status.getStatus());
assertTrue(status.getMessage().contains("State code: NoSuchKey;"));
}

}
Loading

0 comments on commit 50a8d03

Please sign in to comment.