Skip to content

Commit

Permalink
Report Normalization failures to Sentry (#15695)
Browse files Browse the repository at this point in the history
* bulk

* simplification

* voila

* normalization version

* key prefix & pmd fix

* bits

* test fix

* handle more dbt error structures and DRY

* format

* better code comment

* enum for keys

* fix pmd

* I _love_ pmd
  • Loading branch information
Phlair authored Aug 25, 2022
1 parent 8182666 commit ea44a0c
Show file tree
Hide file tree
Showing 8 changed files with 515 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,23 +43,30 @@ public class JobErrorReporter {
private static final String CONNECTOR_DEFINITION_ID_META_KEY = "connector_definition_id";
private static final String CONNECTOR_RELEASE_STAGE_META_KEY = "connector_release_stage";
private static final String CONNECTOR_COMMAND_META_KEY = "connector_command";
private static final String NORMALIZATION_REPOSITORY_META_KEY = "normalization_repository";
private static final String JOB_ID_KEY = "job_id";

private final ConfigRepository configRepository;
private final DeploymentMode deploymentMode;
private final String airbyteVersion;
private final String normalizationImage;
private final String normalizationVersion;
private final WebUrlHelper webUrlHelper;
private final JobErrorReportingClient jobErrorReportingClient;

public JobErrorReporter(final ConfigRepository configRepository,
final DeploymentMode deploymentMode,
final String airbyteVersion,
final String normalizationImage,
final String normalizationVersion,
final WebUrlHelper webUrlHelper,
final JobErrorReportingClient jobErrorReportingClient) {

this.configRepository = configRepository;
this.deploymentMode = deploymentMode;
this.airbyteVersion = airbyteVersion;
this.normalizationImage = normalizationImage;
this.normalizationVersion = normalizationVersion;
this.webUrlHelper = webUrlHelper;
this.jobErrorReportingClient = jobErrorReportingClient;
}
Expand Down Expand Up @@ -96,6 +103,21 @@ public void reportSyncJobFailure(final UUID connectionId, final AttemptFailureSu
final String dockerImage = jobContext.destinationDockerImage();
final Map<String, String> metadata = MoreMaps.merge(commonMetadata, getDestinationMetadata(destinationDefinition));

reportJobFailureReason(workspace, failureReason, dockerImage, metadata);
} else if (failureOrigin == FailureOrigin.NORMALIZATION) {
final StandardSourceDefinition sourceDefinition = configRepository.getSourceDefinitionFromConnection(connectionId);
final StandardDestinationDefinition destinationDefinition = configRepository.getDestinationDefinitionFromConnection(connectionId);
// since error could be arising from source or destination or normalization itself, we want all the
// metadata
// prefixing source keys so we don't overlap (destination as 'true' keys since normalization runs on
// the destination)
final Map<String, String> metadata = MoreMaps.merge(
commonMetadata,
getNormalizationMetadata(),
prefixConnectorMetadataKeys(getSourceMetadata(sourceDefinition), "source"),
getDestinationMetadata(destinationDefinition));
final String dockerImage = String.format("%s:%s", normalizationImage, normalizationVersion);

reportJobFailureReason(workspace, failureReason, dockerImage, metadata);
}
}
Expand Down Expand Up @@ -208,6 +230,19 @@ private Map<String, String> getSourceMetadata(final StandardSourceDefinition sou
Map.entry(CONNECTOR_RELEASE_STAGE_META_KEY, sourceDefinition.getReleaseStage().value()));
}

private Map<String, String> getNormalizationMetadata() {
return Map.ofEntries(
Map.entry(NORMALIZATION_REPOSITORY_META_KEY, normalizationImage));
}

private Map<String, String> prefixConnectorMetadataKeys(final Map<String, String> connectorMetadata, final String prefix) {
Map<String, String> prefixedMetadata = new HashMap<>();
for (final Map.Entry<String, String> entry : connectorMetadata.entrySet()) {
prefixedMetadata.put(String.format("%s_%s", prefix, entry.getKey()), entry.getValue());
}
return prefixedMetadata;
}

private void reportJobFailureReason(@Nullable final StandardWorkspace workspace,
final FailureReason failureReason,
final String dockerImage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,29 @@
import io.sentry.protocol.SentryStackFrame;
import io.sentry.protocol.SentryStackTrace;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SentryExceptionHelper {

private static final Logger LOGGER = LoggerFactory.getLogger(SentryExceptionHelper.class);

public static final String ERROR_MAP_MESSAGE_KEY = "errorMessage";
public static final String ERROR_MAP_TYPE_KEY = "errorType";

public enum ERROR_MAP_KEYS {
ERROR_MAP_MESSAGE_KEY,
ERROR_MAP_TYPE_KEY
}

/**
* Processes a raw stacktrace string into structured SentryExceptions
* <p>
Expand All @@ -32,6 +47,9 @@ public Optional<List<SentryException>> buildSentryExceptions(final String stackt
if (stacktrace.contains("\tat ") && stacktrace.contains(".java")) {
return buildJavaSentryExceptions(stacktrace);
}
if (stacktrace.startsWith("AirbyteDbtError: ")) {
return buildNormalizationDbtSentryExceptions(stacktrace);
}

return Optional.empty();
}, Optional.empty());
Expand Down Expand Up @@ -166,4 +184,134 @@ private static Optional<List<SentryException>> buildJavaSentryExceptions(final S
return Optional.of(sentryExceptions);
}

private static Optional<List<SentryException>> buildNormalizationDbtSentryExceptions(final String stacktrace) {
final List<SentryException> sentryExceptions = new ArrayList<>();

Map<ERROR_MAP_KEYS, String> usefulErrorMap = getUsefulErrorMessageAndTypeFromDbtError(stacktrace);

// if our errorMessage from the function != stacktrace then we know we've pulled out something
// useful
if (!usefulErrorMap.get(ERROR_MAP_KEYS.ERROR_MAP_MESSAGE_KEY).equals(stacktrace)) {
final SentryException usefulException = new SentryException();
usefulException.setValue(usefulErrorMap.get(ERROR_MAP_KEYS.ERROR_MAP_MESSAGE_KEY));
usefulException.setType(usefulErrorMap.get(ERROR_MAP_KEYS.ERROR_MAP_TYPE_KEY));
sentryExceptions.add(usefulException);
}

if (sentryExceptions.isEmpty())
return Optional.empty();

return Optional.of(sentryExceptions);
}

public static Map<ERROR_MAP_KEYS, String> getUsefulErrorMessageAndTypeFromDbtError(String stacktrace) {
// the dbt 'stacktrace' is really just all the log messages at 'error' level, stuck together.
// therefore there is not a totally consistent structure to these,
// see the docs: https://docs.getdbt.com/guides/legacy/debugging-errors
// the logic below is built based on the ~450 unique dbt errors we encountered before this PR
// and is a best effort to isolate the useful part of the error logs for debugging and grouping
// and bring some semblance of exception 'types' to differentiate between errors.
Map<ERROR_MAP_KEYS, String> errorMessageAndType = new HashMap<>();
String[] stacktraceLines = stacktrace.split("\n");

boolean defaultNextLine = false;
// TODO: this whole code block is quite ugh, commented to try and make each part clear but could be
// much more readable.
mainLoop: for (int i = 0; i < stacktraceLines.length; i++) {
// This order is important due to how these errors can co-occur.
// This order attempts to keep error definitions consistent based on our observations of possible
// dbt error structures.
try {
// Database Errors
if (stacktraceLines[i].contains("Database Error in model")) {
// Database Error : SQL compilation error
if (stacktraceLines[i + 1].contains("SQL compilation error")) {
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_MESSAGE_KEY,
String.format("%s %s", stacktraceLines[i + 1].trim(), stacktraceLines[i + 2].trim()));
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_TYPE_KEY, "DbtDatabaseSQLCompilationError");
break;
}
// Database Error: Invalid input
else if (stacktraceLines[i + 1].contains("Invalid input")) {
for (String followingLine : Arrays.copyOfRange(stacktraceLines, i + 1, stacktraceLines.length)) {
if (followingLine.trim().startsWith("context:")) {
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_MESSAGE_KEY,
String.format("%s\n%s", stacktraceLines[i + 1].trim(), followingLine.trim()));
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_TYPE_KEY, "DbtDatabaseInvalidInputError");
break mainLoop;
}
}
}
// Database Error: Syntax error
else if (stacktraceLines[i + 1].contains("syntax error at or near \"")) {
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_MESSAGE_KEY,
String.format("%s\n%s", stacktraceLines[i + 1].trim(), stacktraceLines[i + 2].trim()));
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_TYPE_KEY, "DbtDatabaseSyntaxError");
break;
}
// Database Error: default
else {
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_TYPE_KEY, "DbtDatabaseError");
defaultNextLine = true;
}
}
// Unhandled Error
else if (stacktraceLines[i].contains("Unhandled error while executing model")) {
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_TYPE_KEY, "DbtUnhandledError");
defaultNextLine = true;
}
// Compilation Errors
else if (stacktraceLines[i].contains("Compilation Error")) {
// Compilation Error: Ambiguous Relation
if (stacktraceLines[i + 1].contains("When searching for a relation, dbt found an approximate match.")) {
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_MESSAGE_KEY,
String.format("%s %s", stacktraceLines[i + 1].trim(), stacktraceLines[i + 2].trim()));
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_TYPE_KEY, "DbtCompilationAmbiguousRelationError");
break;
}
// Compilation Error: default
else {
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_TYPE_KEY, "DbtCompilationError");
defaultNextLine = true;
}
}
// Runtime Errors
else if (stacktraceLines[i].contains("Runtime Error")) {
// Runtime Error: Database error
for (String followingLine : Arrays.copyOfRange(stacktraceLines, i + 1, stacktraceLines.length)) {
if ("Database Error".equals(followingLine.trim())) {
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_MESSAGE_KEY,
String.format("%s", stacktraceLines[Arrays.stream(stacktraceLines).toList().indexOf(followingLine) + 1].trim()));
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_TYPE_KEY, "DbtRuntimeDatabaseError");
break mainLoop;
}
}
// Runtime Error: default
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_TYPE_KEY, "DbtRuntimeError");
defaultNextLine = true;
}
// Database Error: formatted differently, catch last to avoid counting other types of errors as
// Database Error
else if ("Database Error".equals(stacktraceLines[i].trim())) {
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_TYPE_KEY, "DbtDatabaseError");
defaultNextLine = true;
}
// handle the default case without repeating code
if (defaultNextLine) {
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_MESSAGE_KEY, stacktraceLines[i + 1].trim());
break;
}
} catch (final ArrayIndexOutOfBoundsException e) {
// this means our logic is slightly off, our assumption of where error lines are is incorrect
LOGGER.warn("Failed trying to parse useful error message out of dbt error, defaulting to full stacktrace");
}
}
if (errorMessageAndType.isEmpty()) {
// For anything we haven't caught, just return full stacktrace
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_MESSAGE_KEY, stacktrace);
errorMessageAndType.put(ERROR_MAP_KEYS.ERROR_MAP_TYPE_KEY, "AirbyteDbtError");
}
return errorMessageAndType;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ class JobErrorReporterTest {
private static final String CONNECTION_URL = "http://localhost:8000/connection/my_connection";
private static final DeploymentMode DEPLOYMENT_MODE = DeploymentMode.OSS;
private static final String AIRBYTE_VERSION = "0.1.40";
private static final String NORMALIZATION_IMAGE = "airbyte/normalization";
private static final String NORMALIZATION_VERSION = "0.2.18";
private static final UUID SOURCE_DEFINITION_ID = UUID.randomUUID();
private static final String SOURCE_DEFINITION_NAME = "stripe";
private static final String SOURCE_DOCKER_REPOSITORY = "airbyte/source-stripe";
Expand All @@ -53,13 +55,15 @@ class JobErrorReporterTest {
private static final String AIRBYTE_VERSION_KEY = "airbyte_version";
private static final String FAILURE_ORIGIN_KEY = "failure_origin";
private static final String SOURCE = "source";
private static final String PREFIX_FORMAT_STRING = "%s_%s";
private static final String FAILURE_TYPE_KEY = "failure_type";
private static final String SYSTEM_ERROR = "system_error";
private static final String CONNECTOR_DEFINITION_ID_KEY = "connector_definition_id";
private static final String CONNECTOR_REPOSITORY_KEY = "connector_repository";
private static final String CONNECTOR_NAME_KEY = "connector_name";
private static final String CONNECTOR_RELEASE_STAGE_KEY = "connector_release_stage";
private static final String CONNECTOR_COMMAND_KEY = "connector_command";
private static final String NORMALIZATION_REPOSITORY_KEY = "normalization_repository";

private ConfigRepository configRepository;
private JobErrorReportingClient jobErrorReportingClient;
Expand All @@ -71,7 +75,8 @@ void setup() {
configRepository = mock(ConfigRepository.class);
jobErrorReportingClient = mock(JobErrorReportingClient.class);
webUrlHelper = mock(WebUrlHelper.class);
jobErrorReporter = new JobErrorReporter(configRepository, DEPLOYMENT_MODE, AIRBYTE_VERSION, webUrlHelper, jobErrorReportingClient);
jobErrorReporter = new JobErrorReporter(
configRepository, DEPLOYMENT_MODE, AIRBYTE_VERSION, NORMALIZATION_IMAGE, NORMALIZATION_VERSION, webUrlHelper, jobErrorReportingClient);
}

@Test
Expand All @@ -88,11 +93,16 @@ void testReportSyncJobFailure() {
.withFailureOrigin(FailureOrigin.DESTINATION)
.withFailureType(FailureType.SYSTEM_ERROR);

final FailureReason normalizationFailureReason = new FailureReason()
.withMetadata(new Metadata().withAdditionalProperty(FROM_TRACE_MESSAGE, true))
.withFailureOrigin(FailureOrigin.NORMALIZATION)
.withFailureType(FailureType.SYSTEM_ERROR);

final FailureReason nonTraceMessageFailureReason = new FailureReason().withFailureOrigin(FailureOrigin.SOURCE);
final FailureReason replicationFailureReason = new FailureReason().withFailureOrigin(FailureOrigin.REPLICATION);

Mockito.when(mFailureSummary.getFailures())
.thenReturn(List.of(sourceFailureReason, destinationFailureReason, nonTraceMessageFailureReason, replicationFailureReason));
Mockito.when(mFailureSummary.getFailures()).thenReturn(List.of(
sourceFailureReason, destinationFailureReason, normalizationFailureReason, nonTraceMessageFailureReason, replicationFailureReason));

final long syncJobId = 1L;
final SyncJobReportingContext jobReportingContext = new SyncJobReportingContext(
Expand Down Expand Up @@ -150,9 +160,30 @@ void testReportSyncJobFailure() {
Map.entry(CONNECTOR_NAME_KEY, DESTINATION_DEFINITION_NAME),
Map.entry(CONNECTOR_RELEASE_STAGE_KEY, DESTINATION_RELEASE_STAGE.toString()));

final Map<String, String> expectedNormalizationMetadata = Map.ofEntries(
Map.entry(JOB_ID_KEY, String.valueOf(syncJobId)),
Map.entry(WORKSPACE_ID_KEY, WORKSPACE_ID.toString()),
Map.entry("connection_id", CONNECTION_ID.toString()),
Map.entry("connection_url", CONNECTION_URL),
Map.entry(DEPLOYMENT_MODE_KEY, DEPLOYMENT_MODE.name()),
Map.entry(AIRBYTE_VERSION_KEY, AIRBYTE_VERSION),
Map.entry(FAILURE_ORIGIN_KEY, "normalization"),
Map.entry(FAILURE_TYPE_KEY, SYSTEM_ERROR),
Map.entry(NORMALIZATION_REPOSITORY_KEY, NORMALIZATION_IMAGE),
Map.entry(String.format(PREFIX_FORMAT_STRING, SOURCE, CONNECTOR_DEFINITION_ID_KEY), SOURCE_DEFINITION_ID.toString()),
Map.entry(String.format(PREFIX_FORMAT_STRING, SOURCE, CONNECTOR_REPOSITORY_KEY), SOURCE_DOCKER_REPOSITORY),
Map.entry(String.format(PREFIX_FORMAT_STRING, SOURCE, CONNECTOR_NAME_KEY), SOURCE_DEFINITION_NAME),
Map.entry(String.format(PREFIX_FORMAT_STRING, SOURCE, CONNECTOR_RELEASE_STAGE_KEY), SOURCE_RELEASE_STAGE.toString()),
Map.entry(CONNECTOR_DEFINITION_ID_KEY, DESTINATION_DEFINITION_ID.toString()),
Map.entry(CONNECTOR_REPOSITORY_KEY, DESTINATION_DOCKER_REPOSITORY),
Map.entry(CONNECTOR_NAME_KEY, DESTINATION_DEFINITION_NAME),
Map.entry(CONNECTOR_RELEASE_STAGE_KEY, DESTINATION_RELEASE_STAGE.toString()));

Mockito.verify(jobErrorReportingClient).reportJobFailureReason(mWorkspace, sourceFailureReason, SOURCE_DOCKER_IMAGE, expectedSourceMetadata);
Mockito.verify(jobErrorReportingClient).reportJobFailureReason(mWorkspace, destinationFailureReason, DESTINATION_DOCKER_IMAGE,
expectedDestinationMetadata);
Mockito.verify(jobErrorReportingClient).reportJobFailureReason(
mWorkspace, normalizationFailureReason, String.format("%s:%s", NORMALIZATION_IMAGE, NORMALIZATION_VERSION), expectedNormalizationMetadata);
Mockito.verifyNoMoreInteractions(jobErrorReportingClient);
}

Expand Down
Loading

0 comments on commit ea44a0c

Please sign in to comment.