diff --git a/.env b/.env index 9ce529c0050b..3177aa4de26a 100644 --- a/.env +++ b/.env @@ -70,10 +70,9 @@ JOB_MAIN_CONTAINER_MEMORY_LIMIT= ### LOGGING/MONITORING/TRACKING ### TRACKING_STRATEGY=segment +JOB_ERROR_REPORTING_STRATEGY=logging # Although not present as an env var, expected by Log4J configuration. LOG_LEVEL=INFO -# Although not present as an env var, helps Airbyte track job healthiness. -SENTRY_DSN="https://d4b03de0c4574c78999b8d58e55243dc@o1009025.ingest.sentry.io/6102835" ### APPLICATIONS ### diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java index c480cefde298..d5f7d0ab8ebb 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/Configs.java @@ -466,6 +466,17 @@ public interface Configs { */ TrackingStrategy getTrackingStrategy(); + /** + * Define whether to send job failure events to Sentry or log-only. Airbyte internal use. + */ + JobErrorReportingStrategy getJobErrorReportingStrategy(); + + /** + * Determines the Sentry DSN that should be used when reporting connector job failures to Sentry. + * Used with SENTRY error reporting strategy. Airbyte internal use. + */ + String getJobErrorReportingSentryDSN(); + // APPLICATIONS // Worker /** @@ -578,6 +589,11 @@ enum TrackingStrategy { LOGGING } + enum JobErrorReportingStrategy { + SENTRY, + LOGGING + } + enum WorkerEnvironment { DOCKER, KUBERNETES diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java index 52fd6cc239b0..adb6e69edec3 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/EnvConfigs.java @@ -50,6 +50,8 @@ public class EnvConfigs implements Configs { public static final String CONFIG_ROOT = "CONFIG_ROOT"; public static final String DOCKER_NETWORK = "DOCKER_NETWORK"; public static final String TRACKING_STRATEGY = "TRACKING_STRATEGY"; + public static final String JOB_ERROR_REPORTING_STRATEGY = "JOB_ERROR_REPORTING_STRATEGY"; + public static final String JOB_ERROR_REPORTING_SENTRY_DSN = "JOB_ERROR_REPORTING_SENTRY_DSN"; public static final String DEPLOYMENT_MODE = "DEPLOYMENT_MODE"; public static final String DATABASE_USER = "DATABASE_USER"; public static final String DATABASE_PASSWORD = "DATABASE_PASSWORD"; @@ -805,6 +807,23 @@ public TrackingStrategy getTrackingStrategy() { }); } + @Override + public JobErrorReportingStrategy getJobErrorReportingStrategy() { + return getEnvOrDefault(JOB_ERROR_REPORTING_STRATEGY, JobErrorReportingStrategy.LOGGING, s -> { + try { + return JobErrorReportingStrategy.valueOf(s.toUpperCase()); + } catch (final IllegalArgumentException e) { + LOGGER.info(s + " not recognized, defaulting to " + JobErrorReportingStrategy.LOGGING); + return JobErrorReportingStrategy.LOGGING; + } + }); + } + + @Override + public String getJobErrorReportingSentryDSN() { + return getEnvOrDefault(JOB_ERROR_REPORTING_SENTRY_DSN, ""); + } + // APPLICATIONS // Worker @Override diff --git a/airbyte-config/config-models/src/test/java/io/airbyte/config/EnvConfigsTest.java b/airbyte-config/config-models/src/test/java/io/airbyte/config/EnvConfigsTest.java index ff9ff1cbe6fe..d44510c868fb 100644 --- a/airbyte-config/config-models/src/test/java/io/airbyte/config/EnvConfigsTest.java +++ b/airbyte-config/config-models/src/test/java/io/airbyte/config/EnvConfigsTest.java @@ -8,6 +8,7 @@ import io.airbyte.commons.version.AirbyteVersion; import io.airbyte.config.Configs.DeploymentMode; +import io.airbyte.config.Configs.JobErrorReportingStrategy; import io.airbyte.config.Configs.WorkerEnvironment; import java.nio.file.Paths; import java.util.HashMap; @@ -178,6 +179,27 @@ void testTrackingStrategy() { assertEquals(Configs.TrackingStrategy.LOGGING, config.getTrackingStrategy()); } + @Test + void testErrorReportingStrategy() { + envMap.put(EnvConfigs.JOB_ERROR_REPORTING_STRATEGY, null); + assertEquals(JobErrorReportingStrategy.LOGGING, config.getJobErrorReportingStrategy()); + + envMap.put(EnvConfigs.JOB_ERROR_REPORTING_STRATEGY, "abc"); + assertEquals(JobErrorReportingStrategy.LOGGING, config.getJobErrorReportingStrategy()); + + envMap.put(EnvConfigs.JOB_ERROR_REPORTING_STRATEGY, "logging"); + assertEquals(JobErrorReportingStrategy.LOGGING, config.getJobErrorReportingStrategy()); + + envMap.put(EnvConfigs.JOB_ERROR_REPORTING_STRATEGY, "sentry"); + assertEquals(JobErrorReportingStrategy.SENTRY, config.getJobErrorReportingStrategy()); + + envMap.put(EnvConfigs.JOB_ERROR_REPORTING_STRATEGY, "LOGGING"); + assertEquals(JobErrorReportingStrategy.LOGGING, config.getJobErrorReportingStrategy()); + + envMap.put(EnvConfigs.JOB_ERROR_REPORTING_STRATEGY, "SENTRY"); + assertEquals(JobErrorReportingStrategy.SENTRY, config.getJobErrorReportingStrategy()); + } + @Test void testDeploymentMode() { envMap.put(EnvConfigs.DEPLOYMENT_MODE, null); diff --git a/airbyte-scheduler/scheduler-models/src/main/java/io/airbyte/scheduler/models/Job.java b/airbyte-scheduler/scheduler-models/src/main/java/io/airbyte/scheduler/models/Job.java index a25d56451edb..ed2f1de729d9 100644 --- a/airbyte-scheduler/scheduler-models/src/main/java/io/airbyte/scheduler/models/Job.java +++ b/airbyte-scheduler/scheduler-models/src/main/java/io/airbyte/scheduler/models/Job.java @@ -109,6 +109,14 @@ public Optional getSuccessOutput() { return getSuccessfulAttempt().flatMap(Attempt::getOutput); } + public Optional getLastFailedAttempt() { + return getAttempts() + .stream() + .sorted(Comparator.comparing(Attempt::getCreatedAtInSecond).reversed()) + .filter(a -> a.getStatus() == AttemptStatus.FAILED) + .findFirst(); + } + public Optional getLastAttemptWithOutput() { return getAttempts() .stream() diff --git a/airbyte-scheduler/scheduler-models/src/test/java/io/airbyte/scheduler/models/JobTest.java b/airbyte-scheduler/scheduler-models/src/test/java/io/airbyte/scheduler/models/JobTest.java index 8fde2d1e75d9..e81a15bf58f2 100644 --- a/airbyte-scheduler/scheduler-models/src/test/java/io/airbyte/scheduler/models/JobTest.java +++ b/airbyte-scheduler/scheduler-models/src/test/java/io/airbyte/scheduler/models/JobTest.java @@ -10,9 +10,9 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.junit.jupiter.api.Test; class JobTest { @@ -42,8 +42,8 @@ void testHasRunningAttempt() { } private static Job jobWithAttemptWithStatus(final AttemptStatus... attemptStatuses) { - final List attempts = Arrays.stream(attemptStatuses) - .map(attemptStatus -> new Attempt(1L, 1L, null, null, attemptStatus, null, 0L, 0L, null)) + final List attempts = IntStream.range(0, attemptStatuses.length) + .mapToObj(idx -> new Attempt(idx + 1, 1L, null, null, attemptStatuses[idx], null, idx, 0L, null)) .collect(Collectors.toList()); return new Job(1L, null, null, null, attempts, null, 0L, 0L, 0L); } @@ -60,6 +60,17 @@ void testGetSuccessfulAttempt() { assertEquals(job.getAttempts().get(1), job.getSuccessfulAttempt().get()); } + @Test + void testGetLastFailedAttempt() { + assertTrue(jobWithAttemptWithStatus().getLastFailedAttempt().isEmpty()); + assertTrue(jobWithAttemptWithStatus(AttemptStatus.SUCCEEDED).getLastFailedAttempt().isEmpty()); + assertTrue(jobWithAttemptWithStatus(AttemptStatus.FAILED).getLastFailedAttempt().isPresent()); + + final Job job = jobWithAttemptWithStatus(AttemptStatus.FAILED, AttemptStatus.FAILED); + assertTrue(job.getLastFailedAttempt().isPresent()); + assertEquals(2, job.getLastFailedAttempt().get().getId()); + } + @Test void testValidateStatusTransitionFromPending() { final Job pendingJob = jobWithStatus(JobStatus.PENDING); diff --git a/airbyte-scheduler/scheduler-persistence/build.gradle b/airbyte-scheduler/scheduler-persistence/build.gradle index ef970f189029..c40c4355a6ae 100644 --- a/airbyte-scheduler/scheduler-persistence/build.gradle +++ b/airbyte-scheduler/scheduler-persistence/build.gradle @@ -3,6 +3,8 @@ plugins { } dependencies { + implementation 'io.sentry:sentry:6.1.0' + implementation project(':airbyte-analytics') implementation project(':airbyte-commons-docker') implementation project(':airbyte-config:config-models') diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporter.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporter.java new file mode 100644 index 000000000000..c82cae5dcd95 --- /dev/null +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporter.java @@ -0,0 +1,102 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.scheduler.persistence.job_error_reporter; + +import io.airbyte.config.AttemptFailureSummary; +import io.airbyte.config.Configs.DeploymentMode; +import io.airbyte.config.FailureReason; +import io.airbyte.config.FailureReason.FailureOrigin; +import io.airbyte.config.JobSyncConfig; +import io.airbyte.config.StandardDestinationDefinition; +import io.airbyte.config.StandardSourceDefinition; +import io.airbyte.config.StandardWorkspace; +import io.airbyte.config.persistence.ConfigRepository; +import java.util.HashMap; +import java.util.List; +import java.util.UUID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JobErrorReporter { + + private static final Logger LOGGER = LoggerFactory.getLogger(JobErrorReporter.class); + + private static final String FROM_TRACE_MESSAGE = "from_trace_message"; + private static final String DEPLOYMENT_MODE_META_KEY = "deployment_mode"; + private static final String AIRBYTE_VERSION_META_KEY = "airbyte_version"; + private static final String FAILURE_ORIGIN_META_KEY = "failure_origin"; + private static final String FAILURE_TYPE_META_KEY = "failure_type"; + private static final String CONNECTION_ID_META_KEY = "connection_id"; + private static final String CONNECTOR_NAME_META_KEY = "connector_name"; + private static final String CONNECTOR_DEFINITION_ID_META_KEY = "connector_definition_id"; + private static final String CONNECTOR_RELEASE_STAGE_META_KEY = "connector_release_stage"; + + private final ConfigRepository configRepository; + private final DeploymentMode deploymentMode; + private final String airbyteVersion; + private final JobErrorReportingClient jobErrorReportingClient; + + public JobErrorReporter(final ConfigRepository configRepository, + final DeploymentMode deploymentMode, + final String airbyteVersion, + final JobErrorReportingClient jobErrorReportingClient) { + + this.configRepository = configRepository; + this.deploymentMode = deploymentMode; + this.airbyteVersion = airbyteVersion; + this.jobErrorReportingClient = jobErrorReportingClient; + } + + /** + * Reports a Sync Job's connector-caused FailureReasons to the JobErrorReportingClient + * + * @param connectionId - connection that had the failure + * @param failureSummary - final attempt failure summary + * @param jobSyncConfig - config for the sync job + */ + public void reportSyncJobFailure(final UUID connectionId, final AttemptFailureSummary failureSummary, final JobSyncConfig jobSyncConfig) { + final List traceMessageFailures = failureSummary.getFailures().stream() + .filter(failure -> failure.getMetadata() != null && failure.getMetadata().getAdditionalProperties().containsKey(FROM_TRACE_MESSAGE)) + .toList(); + + final StandardWorkspace workspace = configRepository.getStandardWorkspaceFromConnection(connectionId, true); + + for (final FailureReason failureReason : traceMessageFailures) { + final FailureOrigin failureOrigin = failureReason.getFailureOrigin(); + + final HashMap metadata = new HashMap<>(); + metadata.put(CONNECTION_ID_META_KEY, connectionId.toString()); + metadata.put(AIRBYTE_VERSION_META_KEY, airbyteVersion); + metadata.put(DEPLOYMENT_MODE_META_KEY, deploymentMode.name()); + metadata.put(FAILURE_ORIGIN_META_KEY, failureOrigin.value()); + metadata.put(FAILURE_TYPE_META_KEY, failureReason.getFailureType().value()); + + try { + if (failureOrigin == FailureOrigin.SOURCE) { + final StandardSourceDefinition sourceDefinition = configRepository.getSourceDefinitionFromConnection(connectionId); + final String dockerImage = jobSyncConfig.getSourceDockerImage(); + + metadata.put(CONNECTOR_DEFINITION_ID_META_KEY, sourceDefinition.getSourceDefinitionId().toString()); + metadata.put(CONNECTOR_NAME_META_KEY, sourceDefinition.getName()); + metadata.put(CONNECTOR_RELEASE_STAGE_META_KEY, sourceDefinition.getReleaseStage().value()); + + jobErrorReportingClient.reportJobFailureReason(workspace, failureReason, dockerImage, metadata); + } else if (failureOrigin == FailureOrigin.DESTINATION) { + final StandardDestinationDefinition destinationDefinition = configRepository.getDestinationDefinitionFromConnection(connectionId); + final String dockerImage = jobSyncConfig.getDestinationDockerImage(); + + metadata.put(CONNECTOR_DEFINITION_ID_META_KEY, destinationDefinition.getDestinationDefinitionId().toString()); + metadata.put(CONNECTOR_NAME_META_KEY, destinationDefinition.getName()); + metadata.put(CONNECTOR_RELEASE_STAGE_META_KEY, destinationDefinition.getReleaseStage().value()); + + jobErrorReportingClient.reportJobFailureReason(workspace, failureReason, dockerImage, metadata); + } + } catch (final Exception e) { + LOGGER.error("Error when reporting job failure reason: {}", failureReason, e); + } + } + } + +} diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReportingClient.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReportingClient.java new file mode 100644 index 000000000000..3d52f558b667 --- /dev/null +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReportingClient.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.scheduler.persistence.job_error_reporter; + +import io.airbyte.config.FailureReason; +import io.airbyte.config.StandardWorkspace; +import java.util.Map; + +/** + * A generic interface for a client that reports errors + */ +public interface JobErrorReportingClient { + + /** + * Report a job failure reason + */ + void reportJobFailureReason(StandardWorkspace workspace, final FailureReason reason, final String dockerImage, Map metadata); + +} diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReportingClientFactory.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReportingClientFactory.java new file mode 100644 index 000000000000..e24586781fc7 --- /dev/null +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReportingClientFactory.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.scheduler.persistence.job_error_reporter; + +import io.airbyte.config.Configs; +import io.airbyte.config.Configs.JobErrorReportingStrategy; + +public class JobErrorReportingClientFactory { + + /** + * Creates an error reporting client based on the desired strategy to use + * + * @param strategy - which type of error reporting client should be created + * @return JobErrorReportingClient + */ + public static JobErrorReportingClient getClient(final JobErrorReportingStrategy strategy, final Configs configs) { + return switch (strategy) { + case SENTRY -> new SentryJobErrorReportingClient(configs.getJobErrorReportingSentryDSN(), new SentryExceptionHelper()); + case LOGGING -> new LoggingJobErrorReportingClient(); + }; + } + +} diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/LoggingJobErrorReportingClient.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/LoggingJobErrorReportingClient.java new file mode 100644 index 000000000000..cf1cebf1404b --- /dev/null +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/LoggingJobErrorReportingClient.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.scheduler.persistence.job_error_reporter; + +import io.airbyte.config.FailureReason; +import io.airbyte.config.StandardWorkspace; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LoggingJobErrorReportingClient implements JobErrorReportingClient { + + private static final Logger LOGGER = LoggerFactory.getLogger(LoggingJobErrorReportingClient.class); + + @Override + public void reportJobFailureReason(final StandardWorkspace workspace, + final FailureReason reason, + final String dockerImage, + final Map metadata) { + LOGGER.info("Report Job Error -> workspaceId: {}, dockerImage: {}, failureReason: {}, metadata: {}", + workspace.getWorkspaceId(), + dockerImage, + reason, + metadata); + } + +} diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/SentryExceptionHelper.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/SentryExceptionHelper.java new file mode 100644 index 000000000000..1fe083490c12 --- /dev/null +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/SentryExceptionHelper.java @@ -0,0 +1,169 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.scheduler.persistence.job_error_reporter; + +import io.airbyte.commons.lang.Exceptions; +import io.sentry.protocol.SentryException; +import io.sentry.protocol.SentryStackFrame; +import io.sentry.protocol.SentryStackTrace; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class SentryExceptionHelper { + + /** + * Processes a raw stacktrace string into structured SentryExceptions + *

+ * Currently, Java and Python stacktraces are supported. If an unsupported stacktrace format is + * encountered, an empty optional will be returned, in which case we can fall back to alternate + * grouping. + */ + public Optional> buildSentryExceptions(final String stacktrace) { + return Exceptions.swallowWithDefault(() -> { + if (stacktrace.startsWith("Traceback (most recent call last):")) { + return buildPythonSentryExceptions(stacktrace); + } + if (stacktrace.contains("\tat ") && stacktrace.contains(".java")) { + return buildJavaSentryExceptions(stacktrace); + } + + return Optional.empty(); + }, Optional.empty()); + } + + private static Optional> buildPythonSentryExceptions(final String stacktrace) { + final List sentryExceptions = new ArrayList<>(); + + // separate chained exceptions + // e.g "\n\nThe above exception was the direct cause of the following exception:\n\n" + // "\n\nDuring handling of the above exception, another exception occurred:\n\n" + final String exceptionSeparator = "\n\n[\\w ,]+:\n\n"; + final String[] exceptions = stacktrace.split(exceptionSeparator); + + for (final String exceptionStr : exceptions) { + final SentryStackTrace stackTrace = new SentryStackTrace(); + final List stackFrames = new ArrayList<>(); + + // Use a regex to grab stack trace frame information + final Pattern framePattern = Pattern.compile("File \"(?.+)\", line (?\\d+), in (?.+)\\n {4}(?.+)\\n"); + final Matcher matcher = framePattern.matcher(exceptionStr); + int lastMatchIdx = -1; + + while (matcher.find()) { + final String absPath = matcher.group("absPath"); + final String lineno = matcher.group("lineno"); + final String function = matcher.group("function"); + final String contextLine = matcher.group("contextLine"); + + final SentryStackFrame stackFrame = new SentryStackFrame(); + stackFrame.setAbsPath(absPath); + stackFrame.setLineno(Integer.valueOf(lineno)); + stackFrame.setFunction(function); + stackFrame.setContextLine(contextLine); + stackFrames.add(stackFrame); + + lastMatchIdx = matcher.end(); + } + + if (stackFrames.size() > 0) { + stackTrace.setFrames(stackFrames); + + final SentryException sentryException = new SentryException(); + sentryException.setStacktrace(stackTrace); + + // The final part of our stack trace has the exception type and (optionally) a value + // (e.g. "RuntimeError: This is the value") + final String remaining = exceptionStr.substring(lastMatchIdx); + final String[] parts = remaining.split(":", 2); + + if (parts.length > 0) { + sentryException.setType(parts[0].trim()); + if (parts.length == 2) { + sentryException.setValue(parts[1].trim()); + } + + sentryExceptions.add(sentryException); + } + } + } + + if (sentryExceptions.size() == 0) + return Optional.empty(); + + return Optional.of(sentryExceptions); + } + + private static Optional> buildJavaSentryExceptions(final String stacktrace) { + final List sentryExceptions = new ArrayList<>(); + + // separate chained exceptions + // e.g "\nCaused By: " + final String exceptionSeparator = "\n[\\w ]+: "; + final String[] exceptions = stacktrace.split(exceptionSeparator); + + for (final String exceptionStr : exceptions) { + final SentryStackTrace stackTrace = new SentryStackTrace(); + final List stackFrames = new ArrayList<>(); + + // Use a regex to grab stack trace frame information + final Pattern framePattern = Pattern.compile( + "\n\tat (?:[\\w.$/]+/)?(?[\\w$.]+)\\.(?[\\w<>$]+)\\((?:(?[\\w]+\\.java):(?\\d+)\\)|(?[\\w\\s]*))"); + final Matcher matcher = framePattern.matcher(exceptionStr); + + while (matcher.find()) { + final String module = matcher.group("module"); + final String filename = matcher.group("filename"); + final String lineno = matcher.group("lineno"); + final String function = matcher.group("function"); + final String sourceDescription = matcher.group("desc"); + + final SentryStackFrame stackFrame = new SentryStackFrame(); + stackFrame.setModule(module); + stackFrame.setFunction(function); + stackFrame.setFilename(filename); + + if (lineno != null) { + stackFrame.setLineno(Integer.valueOf(lineno)); + } + if (sourceDescription != null && sourceDescription.equals("Native Method")) { + stackFrame.setNative(true); + } + + stackFrames.add(stackFrame); + } + + if (stackFrames.size() > 0) { + Collections.reverse(stackFrames); + stackTrace.setFrames(stackFrames); + + final SentryException sentryException = new SentryException(); + sentryException.setStacktrace(stackTrace); + + // The first section of our stacktrace before the first frame has exception type and value + final String[] sections = exceptionStr.split("\n\tat ", 2); + final String[] headerParts = sections[0].split(": ", 2); + + if (headerParts.length > 0) { + sentryException.setType(headerParts[0].trim()); + if (headerParts.length == 2) { + sentryException.setValue(headerParts[1].trim()); + } + + sentryExceptions.add(sentryException); + } + } + } + + if (sentryExceptions.size() == 0) + return Optional.empty(); + + return Optional.of(sentryExceptions); + } + +} diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/SentryJobErrorReportingClient.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/SentryJobErrorReportingClient.java new file mode 100644 index 000000000000..ff509b7ce254 --- /dev/null +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/job_error_reporter/SentryJobErrorReportingClient.java @@ -0,0 +1,133 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.scheduler.persistence.job_error_reporter; + +import io.airbyte.config.FailureReason; +import io.airbyte.config.Metadata; +import io.airbyte.config.StandardWorkspace; +import io.sentry.Hub; +import io.sentry.IHub; +import io.sentry.NoOpHub; +import io.sentry.SentryEvent; +import io.sentry.SentryOptions; +import io.sentry.protocol.Message; +import io.sentry.protocol.SentryException; +import io.sentry.protocol.User; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +public class SentryJobErrorReportingClient implements JobErrorReportingClient { + + static final String STACKTRACE_PARSE_ERROR_TAG_KEY = "stacktrace_parse_error"; + private final IHub sentryHub; + private final SentryExceptionHelper exceptionHelper; + + SentryJobErrorReportingClient(final IHub sentryHub, final SentryExceptionHelper exceptionHelper) { + this.sentryHub = sentryHub; + this.exceptionHelper = exceptionHelper; + } + + public SentryJobErrorReportingClient(final String sentryDSN, final SentryExceptionHelper exceptionHelper) { + this(createSentryHubWithDSN(sentryDSN), exceptionHelper); + } + + static IHub createSentryHubWithDSN(final String sentryDSN) { + if (sentryDSN == null || sentryDSN.isEmpty()) { + return NoOpHub.getInstance(); + } + + final SentryOptions options = new SentryOptions(); + options.setDsn(sentryDSN); + options.setAttachStacktrace(false); + options.setEnableUncaughtExceptionHandler(false); + return new Hub(options); + } + + /** + * Reports a Connector Job FailureReason to Sentry + * + * @param workspace - Workspace where this failure occurred + * @param failureReason - FailureReason to report + * @param dockerImage - Tagged docker image that represents the release where this failure occurred + * @param metadata - Extra metadata to set as tags on the event + */ + @Override + public void reportJobFailureReason(final StandardWorkspace workspace, + final FailureReason failureReason, + final String dockerImage, + final Map metadata) { + final SentryEvent event = new SentryEvent(); + + // Remove invalid characters from the release name, use @ so sentry knows how to grab the tag + // e.g. airbyte/source-xyz:1.2.0 -> airbyte-source-xyz@1.2.0 + // More info at https://docs.sentry.io/product/cli/releases/#creating-releases + final String release = dockerImage.replace("/", "-").replace(":", "@"); + event.setRelease(release); + + // enhance event fingerprint to ensure separate grouping per connector + final String[] releaseParts = release.split("@"); + if (releaseParts.length > 0) { + event.setFingerprints(List.of("{{ default }}", releaseParts[0])); + } + + // set workspace as the user in sentry to get impact and priority + final User sentryUser = new User(); + sentryUser.setId(String.valueOf(workspace.getWorkspaceId())); + sentryUser.setUsername(workspace.getName()); + event.setUser(sentryUser); + + // set metadata as tags + event.setTags(metadata); + + // set failure reason's internalMessage as event message + // Sentry will use this to fuzzy-group if no stacktrace information is available + final Message message = new Message(); + message.setFormatted(failureReason.getInternalMessage()); + event.setMessage(message); + + // events can come from any platform + event.setPlatform("other"); + + // attach failure reason stack trace + final String failureStackTrace = failureReason.getStacktrace(); + if (failureStackTrace != null && !failureStackTrace.isBlank()) { + final Optional> parsedExceptions = exceptionHelper.buildSentryExceptions(failureStackTrace); + if (parsedExceptions.isPresent()) { + event.setExceptions(parsedExceptions.get()); + } else { + event.setTag(STACKTRACE_PARSE_ERROR_TAG_KEY, "1"); + + // We couldn't parse the stacktrace, but we can still give it to Sentry for (less accurate) grouping + final String normalizedStacktrace = failureStackTrace + .replace("\n", ", ") + .replace(failureReason.getInternalMessage(), ""); + + final SentryException sentryException = new SentryException(); + sentryException.setValue(normalizedStacktrace); + event.setExceptions(List.of(sentryException)); + } + } + + sentryHub.configureScope(scope -> { + final Map failureReasonContext = new HashMap<>(); + failureReasonContext.put("internalMessage", failureReason.getInternalMessage()); + failureReasonContext.put("externalMessage", failureReason.getExternalMessage()); + failureReasonContext.put("stacktrace", failureReason.getStacktrace()); + failureReasonContext.put("timestamp", failureReason.getTimestamp().toString()); + + final Metadata failureReasonMeta = failureReason.getMetadata(); + if (failureReasonMeta != null) { + failureReasonContext.put("metadata", failureReasonMeta.toString()); + } + + scope.setContexts("Failure Reason", failureReasonContext); + }); + + sentryHub.captureEvent(event); + } + +} diff --git a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporterTest.java b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporterTest.java new file mode 100644 index 000000000000..ae99ad02ad53 --- /dev/null +++ b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReporterTest.java @@ -0,0 +1,147 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.scheduler.persistence.job_error_reporter; + +import static org.mockito.Mockito.mock; + +import io.airbyte.config.AttemptFailureSummary; +import io.airbyte.config.Configs.DeploymentMode; +import io.airbyte.config.FailureReason; +import io.airbyte.config.FailureReason.FailureOrigin; +import io.airbyte.config.FailureReason.FailureType; +import io.airbyte.config.JobSyncConfig; +import io.airbyte.config.Metadata; +import io.airbyte.config.StandardDestinationDefinition; +import io.airbyte.config.StandardSourceDefinition; +import io.airbyte.config.StandardWorkspace; +import io.airbyte.config.persistence.ConfigRepository; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class JobErrorReporterTest { + + private static final UUID CONNECTION_ID = UUID.randomUUID(); + private static final DeploymentMode DEPLOYMENT_MODE = DeploymentMode.OSS; + private static final String AIRBYTE_VERSION = "0.1.40"; + private static final UUID SOURCE_DEFINITION_ID = UUID.randomUUID(); + private static final String SOURCE_DEFINITION_NAME = "stripe"; + private static final String SOURCE_DOCKER_IMAGE = "airbyte/source-stripe:1.2.3"; + private static final StandardSourceDefinition.ReleaseStage SOURCE_RELEASE_STAGE = StandardSourceDefinition.ReleaseStage.BETA; + private static final UUID DESTINATION_DEFINITION_ID = UUID.randomUUID(); + private static final String DESTINATION_DEFINITION_NAME = "snowflake"; + private static final StandardDestinationDefinition.ReleaseStage DESTINATION_RELEASE_STAGE = StandardDestinationDefinition.ReleaseStage.BETA; + private static final String DESTINATION_DOCKER_IMAGE = "airbyte/destination-snowflake:1.2.3"; + + private ConfigRepository configRepository; + private JobErrorReportingClient jobErrorReportingClient; + private JobErrorReporter jobErrorReporter; + + @BeforeEach + void setup() { + configRepository = mock(ConfigRepository.class); + jobErrorReportingClient = mock(JobErrorReportingClient.class); + jobErrorReporter = new JobErrorReporter(configRepository, DEPLOYMENT_MODE, AIRBYTE_VERSION, jobErrorReportingClient); + } + + @Test + void testReportSyncJobFailure() { + final AttemptFailureSummary mFailureSummary = Mockito.mock(AttemptFailureSummary.class); + + final FailureReason sourceFailureReason = new FailureReason() + .withMetadata(new Metadata().withAdditionalProperty("from_trace_message", true)) + .withFailureOrigin(FailureOrigin.SOURCE) + .withFailureType(FailureType.SYSTEM_ERROR); + + final FailureReason destinationFailureReason = new FailureReason() + .withMetadata(new Metadata().withAdditionalProperty("from_trace_message", true)) + .withFailureOrigin(FailureOrigin.DESTINATION) + .withFailureType(FailureType.SYSTEM_ERROR); + + final FailureReason nonTraceMessageFailureReason = new FailureReason().withFailureOrigin(FailureOrigin.SOURCE); + final FailureReason replicationFailureReason = new FailureReason().withFailureOrigin(FailureOrigin.REPLICATION); + + Mockito.when(mFailureSummary.getFailures()) + .thenReturn(List.of(sourceFailureReason, destinationFailureReason, nonTraceMessageFailureReason, replicationFailureReason)); + + final JobSyncConfig mJobSyncConfig = Mockito.mock(JobSyncConfig.class); + Mockito.when(mJobSyncConfig.getSourceDockerImage()).thenReturn(SOURCE_DOCKER_IMAGE); + Mockito.when(mJobSyncConfig.getDestinationDockerImage()).thenReturn(DESTINATION_DOCKER_IMAGE); + + Mockito.when(configRepository.getSourceDefinitionFromConnection(CONNECTION_ID)) + .thenReturn(new StandardSourceDefinition() + .withReleaseStage(SOURCE_RELEASE_STAGE) + .withSourceDefinitionId(SOURCE_DEFINITION_ID) + .withName(SOURCE_DEFINITION_NAME)); + + Mockito.when(configRepository.getDestinationDefinitionFromConnection(CONNECTION_ID)) + .thenReturn(new StandardDestinationDefinition() + .withReleaseStage(DESTINATION_RELEASE_STAGE) + .withDestinationDefinitionId(DESTINATION_DEFINITION_ID) + .withName(DESTINATION_DEFINITION_NAME)); + + final StandardWorkspace mWorkspace = Mockito.mock(StandardWorkspace.class); + Mockito.when(configRepository.getStandardWorkspaceFromConnection(CONNECTION_ID, true)).thenReturn(mWorkspace); + + jobErrorReporter.reportSyncJobFailure(CONNECTION_ID, mFailureSummary, mJobSyncConfig); + + final Map expectedSourceMetadata = Map.of( + "connection_id", CONNECTION_ID.toString(), + "deployment_mode", DEPLOYMENT_MODE.name(), + "airbyte_version", AIRBYTE_VERSION, + "failure_origin", "source", + "failure_type", "system_error", + "connector_definition_id", SOURCE_DEFINITION_ID.toString(), + "connector_name", SOURCE_DEFINITION_NAME, + "connector_release_stage", SOURCE_RELEASE_STAGE.toString()); + + final Map expectedDestinationMetadata = Map.of( + "connection_id", CONNECTION_ID.toString(), + "deployment_mode", DEPLOYMENT_MODE.name(), + "airbyte_version", AIRBYTE_VERSION, + "failure_origin", "destination", + "failure_type", "system_error", + "connector_definition_id", DESTINATION_DEFINITION_ID.toString(), + "connector_name", DESTINATION_DEFINITION_NAME, + "connector_release_stage", DESTINATION_RELEASE_STAGE.toString()); + + Mockito.verify(jobErrorReportingClient).reportJobFailureReason(mWorkspace, sourceFailureReason, SOURCE_DOCKER_IMAGE, expectedSourceMetadata); + Mockito.verify(jobErrorReportingClient).reportJobFailureReason(mWorkspace, destinationFailureReason, DESTINATION_DOCKER_IMAGE, + expectedDestinationMetadata); + Mockito.verifyNoMoreInteractions(jobErrorReportingClient); + } + + @Test + void testReportSyncJobFailureDoesNotThrow() { + final AttemptFailureSummary mFailureSummary = Mockito.mock(AttemptFailureSummary.class); + final JobSyncConfig mJobSyncConfig = Mockito.mock(JobSyncConfig.class); + + final FailureReason sourceFailureReason = new FailureReason() + .withMetadata(new Metadata().withAdditionalProperty("from_trace_message", true)) + .withFailureOrigin(FailureOrigin.SOURCE) + .withFailureType(FailureType.SYSTEM_ERROR); + + Mockito.when(mFailureSummary.getFailures()).thenReturn(List.of(sourceFailureReason)); + + Mockito.when(configRepository.getSourceDefinitionFromConnection(CONNECTION_ID)) + .thenReturn(new StandardSourceDefinition() + .withReleaseStage(SOURCE_RELEASE_STAGE) + .withSourceDefinitionId(SOURCE_DEFINITION_ID) + .withName(SOURCE_DEFINITION_NAME)); + + Mockito.doThrow(new RuntimeException("some exception")) + .when(jobErrorReportingClient) + .reportJobFailureReason(Mockito.any(), Mockito.eq(sourceFailureReason), Mockito.any(), Mockito.any()); + + Assertions.assertDoesNotThrow(() -> jobErrorReporter.reportSyncJobFailure(CONNECTION_ID, mFailureSummary, mJobSyncConfig)); + Mockito.verify(jobErrorReportingClient, Mockito.times(1)) + .reportJobFailureReason(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + } + +} diff --git a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReportingClientFactoryTest.java b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReportingClientFactoryTest.java new file mode 100644 index 000000000000..b6ebd65ad6a5 --- /dev/null +++ b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/JobErrorReportingClientFactoryTest.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.scheduler.persistence.job_error_reporter; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.airbyte.config.Configs; +import io.airbyte.config.Configs.JobErrorReportingStrategy; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +public class JobErrorReportingClientFactoryTest { + + @Test + void testCreateErrorReportingClientLogging() { + assertTrue( + JobErrorReportingClientFactory.getClient( + JobErrorReportingStrategy.LOGGING, Mockito.mock(Configs.class)) instanceof LoggingJobErrorReportingClient); + } + + @Test + void testCreateErrorReportingClientSentry() { + final Configs configsMock = Mockito.mock(Configs.class); + Mockito.when(configsMock.getJobErrorReportingSentryDSN()).thenReturn(""); + + assertTrue( + JobErrorReportingClientFactory.getClient( + JobErrorReportingStrategy.SENTRY, configsMock) instanceof SentryJobErrorReportingClient); + } + +} diff --git a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/SentryExceptionHelperTest.java b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/SentryExceptionHelperTest.java new file mode 100644 index 000000000000..55aa7dc2c385 --- /dev/null +++ b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/SentryExceptionHelperTest.java @@ -0,0 +1,366 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.scheduler.persistence.job_error_reporter; + +import io.sentry.protocol.SentryException; +import io.sentry.protocol.SentryStackFrame; +import io.sentry.protocol.SentryStackTrace; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class SentryExceptionHelperTest { + + final SentryExceptionHelper exceptionHelper = new SentryExceptionHelper(); + + @Test + void testBuildSentryExceptionsInvalid() { + final String stacktrace = "this is not a stacktrace"; + final Optional> exceptionList = exceptionHelper.buildSentryExceptions(stacktrace); + Assertions.assertTrue(exceptionList.isEmpty()); + } + + @Test + void testBuildSentryExceptionsPartiallyInvalid() { + final String stacktrace = "Traceback (most recent call last):\n Oops!"; + final Optional> exceptionList = exceptionHelper.buildSentryExceptions(stacktrace); + Assertions.assertTrue(exceptionList.isEmpty()); + } + + @Test + void testBuildSentryExceptionsPythonChained() { + final String stacktrace = + """ + Traceback (most recent call last): + File "/airbyte/connector-errors/error.py", line 31, in read_records + failing_method() + File "/airbyte/connector-errors/error.py", line 36, in failing_method + raise HTTPError(http_error_msg, response=self) + requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: https://airbyte.com + + The above exception was the direct cause of the following exception: + + Traceback (most recent call last): + File "/airbyte/connector-errors/error.py", line 39, in + main() + File "/airbyte/connector-errors/error.py", line 13, in main + sync_mode("incremental") + File "/airbyte/connector-errors/error.py", line 17, in sync_mode + incremental() + File "/airbyte/connector-errors/error.py", line 33, in incremental + raise RuntimeError("My other error") from err + RuntimeError: My other error + """; + + final Optional> optionalSentryExceptions = exceptionHelper.buildSentryExceptions(stacktrace); + Assertions.assertTrue(optionalSentryExceptions.isPresent()); + final List exceptionList = optionalSentryExceptions.get(); + Assertions.assertEquals(2, exceptionList.size()); + + assertExceptionContent(exceptionList.get(0), "requests.exceptions.HTTPError", "400 Client Error: Bad Request for url: https://airbyte.com", + List.of( + Map.of( + "abspath", "/airbyte/connector-errors/error.py", + "lineno", 31, + "function", "read_records", + "context_line", "failing_method()"), + Map.of( + "abspath", "/airbyte/connector-errors/error.py", + "lineno", 36, + "function", "failing_method", + "context_line", "raise HTTPError(http_error_msg, response=self)"))); + + assertExceptionContent(exceptionList.get(1), "RuntimeError", "My other error", List.of( + Map.of( + "abspath", "/airbyte/connector-errors/error.py", + "lineno", 39, + "function", "", + "context_line", "main()"), + Map.of( + "abspath", "/airbyte/connector-errors/error.py", + "lineno", 13, + "function", "main", + "context_line", "sync_mode(\"incremental\")"), + Map.of( + "abspath", "/airbyte/connector-errors/error.py", + "lineno", 17, + "function", "sync_mode", + "context_line", "incremental()"), + Map.of( + "abspath", "/airbyte/connector-errors/error.py", + "lineno", 33, + "function", "incremental", + "context_line", "raise RuntimeError(\"My other error\") from err"))); + + } + + @Test + void testBuildSentryExceptionsPythonNoValue() { + final String stacktrace = + """ + Traceback (most recent call last): + File "/airbyte/connector-errors/error.py", line 33, in incremental + raise RuntimeError() + RuntimeError + """; + + final Optional> optionalSentryExceptions = exceptionHelper.buildSentryExceptions(stacktrace); + Assertions.assertTrue(optionalSentryExceptions.isPresent()); + final List exceptionList = optionalSentryExceptions.get(); + Assertions.assertEquals(1, exceptionList.size()); + + assertExceptionContent(exceptionList.get(0), "RuntimeError", null, List.of( + Map.of( + "abspath", "/airbyte/connector-errors/error.py", + "lineno", 33, + "function", "incremental", + "context_line", "raise RuntimeError()"))); + } + + @Test + void testBuildSentryExceptionsPythonMultilineValue() { + final String stacktrace = + """ + Traceback (most recent call last): + File "/usr/local/lib/python3.9/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking + raise _InactiveRpcError(state) + grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with: + status = StatusCode.INTERNAL + details = "Internal error encountered." + > + + During handling of the above exception, another exception occurred: + + Traceback (most recent call last): + File "/usr/local/lib/python3.9/site-packages/google/api_core/exceptions.py", line 553, in _parse_grpc_error_details + status = rpc_status.from_call(rpc_exc) + AttributeError: 'NoneType' object has no attribute 'from_call' + """; + + final Optional> optionalSentryExceptions = exceptionHelper.buildSentryExceptions(stacktrace); + Assertions.assertTrue(optionalSentryExceptions.isPresent()); + final List exceptionList = optionalSentryExceptions.get(); + Assertions.assertEquals(2, exceptionList.size()); + + final String expectedValue = + """ + <_InactiveRpcError of RPC that terminated with: + status = StatusCode.INTERNAL + details = "Internal error encountered." + >"""; + + assertExceptionContent(exceptionList.get(0), "grpc._channel._InactiveRpcError", expectedValue, List.of( + Map.of( + "abspath", "/usr/local/lib/python3.9/site-packages/grpc/_channel.py", + "lineno", 849, + "function", "_end_unary_response_blocking", + "context_line", "raise _InactiveRpcError(state)"))); + + assertExceptionContent(exceptionList.get(1), "AttributeError", "'NoneType' object has no attribute 'from_call'", List.of( + Map.of( + "abspath", "/usr/local/lib/python3.9/site-packages/google/api_core/exceptions.py", + "lineno", 553, + "function", "_parse_grpc_error_details", + "context_line", "status = rpc_status.from_call(rpc_exc)"))); + } + + @Test + void testBuildSentryExceptionsJava() { + final String stacktrace = + """ + java.lang.ArithmeticException: / by zero + at io.airbyte.integrations.base.AirbyteTraceMessageUtilityTest.testCorrectStacktraceFormat(AirbyteTraceMessageUtilityTest.java:61) + at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) + at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115) + at app//org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141) + at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73) + at jdk.proxy2/jdk.proxy2.$Proxy5.stop(Unknown Source) + at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74) + """; + + final Optional> optionalSentryExceptions = exceptionHelper.buildSentryExceptions(stacktrace); + Assertions.assertTrue(optionalSentryExceptions.isPresent()); + final List exceptionList = optionalSentryExceptions.get(); + Assertions.assertEquals(1, exceptionList.size()); + + assertExceptionContent(exceptionList.get(0), "java.lang.ArithmeticException", "/ by zero", + List.of( + Map.of( + "filename", "GradleWorkerMain.java", + "lineno", 74, + "module", "worker.org.gradle.process.internal.worker.GradleWorkerMain", + "function", "main"), + Map.of( + "module", "jdk.proxy2.$Proxy5", + "function", "stop"), + Map.of( + "filename", "ThrowableCollector.java", + "lineno", 73, + "module", "org.junit.platform.engine.support.hierarchical.ThrowableCollector", + "function", "execute"), + Map.of( + "filename", "NodeTestTask.java", + "lineno", 141, + "module", "org.junit.platform.engine.support.hierarchical.NodeTestTask", + "function", "lambda$executeRecursively$8"), + Map.of( + "filename", "ExecutableInvoker.java", + "lineno", 115, + "module", "org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall", + "function", "lambda$ofVoidMethod$0"), + Map.of( + "isNative", true, + "module", "jdk.internal.reflect.NativeMethodAccessorImpl", + "function", "invoke0"), + Map.of( + "filename", "AirbyteTraceMessageUtilityTest.java", + "lineno", 61, + "module", "io.airbyte.integrations.base.AirbyteTraceMessageUtilityTest", + "function", "testCorrectStacktraceFormat"))); + } + + @Test + void testBuildSentryExceptionsJavaChained() { + final String stacktrace = + """ + java.util.concurrent.CompletionException: io.airbyte.workers.DefaultReplicationWorker$DestinationException: Destination process exited with non-zero exit code 1 + at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315) + at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) + at java.base/java.lang.Thread.run(Thread.java:833) + Suppressed: io.airbyte.workers.exception.WorkerException: Source process exit with code 1. This warning is normal if the job was cancelled. + at io.airbyte.workers.internal.DefaultAirbyteSource.close(DefaultAirbyteSource.java:136) + at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:137) + at io.airbyte.workers.general.DefaultReplicationWorker.run(DefaultReplicationWorker.java:65) + at io.airbyte.workers.temporal.TemporalAttemptExecution.lambda$getWorkerThread$2(TemporalAttemptExecution.java:158) + at java.lang.Thread.run(Thread.java:833) + Caused by: io.airbyte.workers.DefaultReplicationWorker$DestinationException: Destination process exited with non-zero exit code 1 + at io.airbyte.workers.DefaultReplicationWorker.lambda$getDestinationOutputRunnable$7(DefaultReplicationWorker.java:397) + at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) + ... 3 more + """; + + final Optional> optionalSentryExceptions = exceptionHelper.buildSentryExceptions(stacktrace); + Assertions.assertTrue(optionalSentryExceptions.isPresent()); + final List exceptionList = optionalSentryExceptions.get(); + Assertions.assertEquals(2, exceptionList.size()); + + assertExceptionContent(exceptionList.get(0), "java.util.concurrent.CompletionException", + "io.airbyte.workers.DefaultReplicationWorker$DestinationException: Destination process exited with non-zero exit code 1", + List.of( + Map.of( + "filename", "Thread.java", + "lineno", 833, + "module", "java.lang.Thread", + "function", "run"), + Map.of( + "filename", "ThreadPoolExecutor.java", + "lineno", 635, + "module", "java.util.concurrent.ThreadPoolExecutor$Worker", + "function", "run"), + Map.of( + "filename", "CompletableFuture.java", + "lineno", 315, + "module", "java.util.concurrent.CompletableFuture", + "function", "encodeThrowable"))); + + assertExceptionContent(exceptionList.get(1), "io.airbyte.workers.DefaultReplicationWorker$DestinationException", + "Destination process exited with non-zero exit code 1", List.of( + Map.of( + "filename", "CompletableFuture.java", + "lineno", 1804, + "module", "java.util.concurrent.CompletableFuture$AsyncRun", + "function", "run"), + Map.of( + "filename", "DefaultReplicationWorker.java", + "lineno", 397, + "module", "io.airbyte.workers.DefaultReplicationWorker", + "function", "lambda$getDestinationOutputRunnable$7"))); + } + + @Test + void testBuildSentryExceptionsJavaMultilineValue() { + final String stacktrace = + """ + io.temporal.failure.ApplicationFailure: GET https://storage.googleapis.com/ + { + "code" : 401, + "message" : "Invalid Credentials" + } + at com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146) + ... 22 more + """; + + final Optional> optionalSentryExceptions = exceptionHelper.buildSentryExceptions(stacktrace); + Assertions.assertTrue(optionalSentryExceptions.isPresent()); + final List exceptionList = optionalSentryExceptions.get(); + Assertions.assertEquals(1, exceptionList.size()); + + final String expectedValue = + """ + GET https://storage.googleapis.com/ + { + "code" : 401, + "message" : "Invalid Credentials" + }"""; + + assertExceptionContent(exceptionList.get(0), "io.temporal.failure.ApplicationFailure", + expectedValue, List.of( + Map.of( + "filename", "GoogleJsonResponseException.java", + "lineno", 146, + "module", "com.google.api.client.googleapis.json.GoogleJsonResponseException", + "function", "from"))); + } + + private void assertExceptionContent(final SentryException exception, + final String type, + final String value, + final List> frames) { + Assertions.assertEquals(type, exception.getType()); + Assertions.assertEquals(value, exception.getValue()); + + final SentryStackTrace stackTrace = exception.getStacktrace(); + Assertions.assertNotNull(stackTrace); + final List sentryFrames = stackTrace.getFrames(); + Assertions.assertNotNull(sentryFrames); + Assertions.assertEquals(frames.size(), sentryFrames.size()); + + for (int i = 0; i < frames.size(); i++) { + final Map expectedFrame = frames.get(i); + final SentryStackFrame sentryFrame = sentryFrames.get(i); + + if (expectedFrame.containsKey("module")) { + Assertions.assertEquals(expectedFrame.get("module"), sentryFrame.getModule()); + } + + if (expectedFrame.containsKey("filename")) { + Assertions.assertEquals(expectedFrame.get("filename"), sentryFrame.getFilename()); + } + + if (expectedFrame.containsKey("abspath")) { + Assertions.assertEquals(expectedFrame.get("abspath"), sentryFrame.getAbsPath()); + } + + if (expectedFrame.containsKey("function")) { + Assertions.assertEquals(expectedFrame.get("function"), sentryFrame.getFunction()); + } + + if (expectedFrame.containsKey("lineno")) { + Assertions.assertEquals(expectedFrame.get("lineno"), sentryFrame.getLineno()); + } + + if (expectedFrame.containsKey("context_line")) { + Assertions.assertEquals(expectedFrame.get("context_line"), sentryFrame.getContextLine()); + } + + if (expectedFrame.containsKey("isNative")) { + Assertions.assertEquals(expectedFrame.get("isNative"), sentryFrame.isNative()); + } + } + } + +} diff --git a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/SentryJobErrorReportingClientTest.java b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/SentryJobErrorReportingClientTest.java new file mode 100644 index 000000000000..cff663df1b19 --- /dev/null +++ b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/job_error_reporter/SentryJobErrorReportingClientTest.java @@ -0,0 +1,153 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.scheduler.persistence.job_error_reporter; + +import static io.airbyte.scheduler.persistence.job_error_reporter.SentryJobErrorReportingClient.STACKTRACE_PARSE_ERROR_TAG_KEY; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.airbyte.config.FailureReason; +import io.airbyte.config.FailureReason.FailureOrigin; +import io.airbyte.config.FailureReason.FailureType; +import io.airbyte.config.StandardWorkspace; +import io.sentry.IHub; +import io.sentry.NoOpHub; +import io.sentry.SentryEvent; +import io.sentry.protocol.Message; +import io.sentry.protocol.SentryException; +import io.sentry.protocol.User; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +public class SentryJobErrorReportingClientTest { + + private static final UUID WORKSPACE_ID = UUID.randomUUID(); + private static final String WORKSPACE_NAME = "My Workspace"; + private static final String DOCKER_IMAGE = "airbyte/source-stripe:1.2.3"; + + private final StandardWorkspace workspace = new StandardWorkspace().withWorkspaceId(WORKSPACE_ID).withName(WORKSPACE_NAME); + private SentryJobErrorReportingClient sentryErrorReportingClient; + private IHub mockSentryHub; + private SentryExceptionHelper mockSentryExceptionHelper; + + @BeforeEach + void setup() { + mockSentryHub = mock(IHub.class); + mockSentryExceptionHelper = mock(SentryExceptionHelper.class); + sentryErrorReportingClient = new SentryJobErrorReportingClient(mockSentryHub, mockSentryExceptionHelper); + } + + @Test + void testCreateSentryHubWithBlankDSN() { + final String sentryDSN = ""; + final IHub sentryHub = SentryJobErrorReportingClient.createSentryHubWithDSN(sentryDSN); + assertEquals(NoOpHub.getInstance(), sentryHub); + } + + @Test + void testCreateSentryHubWithNullDSN() { + final IHub sentryHub = SentryJobErrorReportingClient.createSentryHubWithDSN(null); + assertEquals(NoOpHub.getInstance(), sentryHub); + } + + @Test + void testCreateSentryHubWithDSN() { + final String sentryDSN = "https://public@sentry.example.com/1"; + final IHub sentryHub = SentryJobErrorReportingClient.createSentryHubWithDSN(sentryDSN); + assertNotNull(sentryHub); + assertEquals(sentryDSN, sentryHub.getOptions().getDsn()); + assertFalse(sentryHub.getOptions().isAttachStacktrace()); + assertFalse(sentryHub.getOptions().isEnableUncaughtExceptionHandler()); + } + + @Test + void testReportJobFailureReason() { + final ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(SentryEvent.class); + + final FailureReason failureReason = new FailureReason() + .withFailureOrigin(FailureOrigin.SOURCE) + .withFailureType(FailureType.SYSTEM_ERROR) + .withInternalMessage("RuntimeError: Something went wrong"); + final Map metadata = Map.of("some_metadata", "some_metadata_value"); + + sentryErrorReportingClient.reportJobFailureReason(workspace, failureReason, DOCKER_IMAGE, metadata); + + verify(mockSentryHub).captureEvent(eventCaptor.capture()); + final SentryEvent actualEvent = eventCaptor.getValue(); + assertEquals("other", actualEvent.getPlatform()); + assertEquals("airbyte-source-stripe@1.2.3", actualEvent.getRelease()); + assertEquals(List.of("{{ default }}", "airbyte-source-stripe"), actualEvent.getFingerprints()); + assertEquals("some_metadata_value", actualEvent.getTag("some_metadata")); + assertNull(actualEvent.getTag(STACKTRACE_PARSE_ERROR_TAG_KEY)); + assertNull(actualEvent.getExceptions()); + + final User sentryUser = actualEvent.getUser(); + assertNotNull(sentryUser); + assertEquals(WORKSPACE_ID.toString(), sentryUser.getId()); + assertEquals(WORKSPACE_NAME, sentryUser.getUsername()); + + final Message message = actualEvent.getMessage(); + assertNotNull(message); + assertEquals("RuntimeError: Something went wrong", message.getFormatted()); + } + + @Test + void testReportJobFailureReasonWithStacktrace() { + final ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(SentryEvent.class); + + final List exceptions = new ArrayList<>(); + final SentryException exception = new SentryException(); + exception.setType("RuntimeError"); + exception.setValue("Something went wrong"); + exceptions.add(exception); + + when(mockSentryExceptionHelper.buildSentryExceptions("Some valid stacktrace")).thenReturn(Optional.of(exceptions)); + + final FailureReason failureReason = new FailureReason() + .withInternalMessage("RuntimeError: Something went wrong") + .withStacktrace("Some valid stacktrace"); + + sentryErrorReportingClient.reportJobFailureReason(workspace, failureReason, DOCKER_IMAGE, Map.of()); + + verify(mockSentryHub).captureEvent(eventCaptor.capture()); + final SentryEvent actualEvent = eventCaptor.getValue(); + assertEquals(exceptions, actualEvent.getExceptions()); + assertNull(actualEvent.getTag(STACKTRACE_PARSE_ERROR_TAG_KEY)); + } + + @Test + void testReportJobFailureReasonWithInvalidStacktrace() { + final ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(SentryEvent.class); + final String invalidStacktrace = "Invalid stacktrace\nRuntimeError: Something went wrong"; + + when(mockSentryExceptionHelper.buildSentryExceptions(invalidStacktrace)).thenReturn(Optional.empty()); + + final FailureReason failureReason = new FailureReason() + .withInternalMessage("Something went wrong") + .withStacktrace(invalidStacktrace); + + sentryErrorReportingClient.reportJobFailureReason(workspace, failureReason, DOCKER_IMAGE, Map.of()); + + verify(mockSentryHub).captureEvent(eventCaptor.capture()); + final SentryEvent actualEvent = eventCaptor.getValue(); + assertEquals("1", actualEvent.getTag(STACKTRACE_PARSE_ERROR_TAG_KEY)); + final List exceptions = actualEvent.getExceptions(); + assertNotNull(exceptions); + assertEquals(1, exceptions.size()); + assertEquals("Invalid stacktrace, RuntimeError: ", exceptions.get(0).getValue()); + } + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java index 28153a05a47c..f0a4f96a2804 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java @@ -41,6 +41,9 @@ import io.airbyte.scheduler.persistence.JobNotifier; import io.airbyte.scheduler.persistence.JobPersistence; import io.airbyte.scheduler.persistence.WorkspaceHelper; +import io.airbyte.scheduler.persistence.job_error_reporter.JobErrorReporter; +import io.airbyte.scheduler.persistence.job_error_reporter.JobErrorReportingClient; +import io.airbyte.scheduler.persistence.job_error_reporter.JobErrorReportingClientFactory; import io.airbyte.scheduler.persistence.job_factory.DefaultSyncJobFactory; import io.airbyte.scheduler.persistence.job_factory.OAuthConfigSupplier; import io.airbyte.scheduler.persistence.job_factory.SyncJobFactory; @@ -134,6 +137,7 @@ public class WorkerApp { private final Optional containerOrchestratorConfig; private final JobNotifier jobNotifier; private final JobTracker jobTracker; + private final JobErrorReporter jobErrorReporter; private final StreamResetPersistence streamResetPersistence; public void start() { @@ -193,7 +197,8 @@ private void registerConnectionManager(final WorkerFactory factory) { jobTracker, configRepository, jobCreator, - streamResetPersistence), + streamResetPersistence, + jobErrorReporter), new ConfigFetchActivityImpl(configRepository, jobPersistence, configs, () -> Instant.now().getEpochSecond()), new ConnectionDeletionActivityImpl(connectionHelper), new CheckConnectionActivityImpl( @@ -435,8 +440,11 @@ private static void launchWorkerApp(final Configs configs, final DSLContext conf final JobTracker jobTracker = new JobTracker(configRepository, jobPersistence, trackingClient); - final StreamResetPersistence streamResetPersistence = new StreamResetPersistence(configDatabase); + final JobErrorReportingClient jobErrorReportingClient = JobErrorReportingClientFactory.getClient(configs.getJobErrorReportingStrategy(), configs); + final JobErrorReporter jobErrorReporter = + new JobErrorReporter(configRepository, configs.getDeploymentMode(), configs.getAirbyteVersionOrWarning(), jobErrorReportingClient); + final StreamResetPersistence streamResetPersistence = new StreamResetPersistence(configDatabase); new WorkerApp( workspaceRoot, defaultProcessFactory, @@ -464,6 +472,7 @@ private static void launchWorkerApp(final Configs configs, final DSLContext conf containerOrchestratorConfig, jobNotifier, jobTracker, + jobErrorReporter, streamResetPersistence).start(); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index dc37eb4a731f..7f548778f5be 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -30,6 +30,7 @@ import io.airbyte.scheduler.persistence.JobCreator; import io.airbyte.scheduler.persistence.JobNotifier; import io.airbyte.scheduler.persistence.JobPersistence; +import io.airbyte.scheduler.persistence.job_error_reporter.JobErrorReporter; import io.airbyte.scheduler.persistence.job_factory.SyncJobFactory; import io.airbyte.scheduler.persistence.job_tracker.JobTracker; import io.airbyte.scheduler.persistence.job_tracker.JobTracker.JobState; @@ -61,6 +62,7 @@ public class JobCreationAndStatusUpdateActivityImpl implements JobCreationAndSta private final ConfigRepository configRepository; private final JobCreator jobCreator; private final StreamResetPersistence streamResetPersistence; + private final JobErrorReporter jobErrorReporter; @Override public JobCreationOutput createNewJob(final JobCreationInput input) { @@ -199,6 +201,10 @@ public void jobFailure(final JobFailureInput input) { jobNotifier.failJob(input.getReason(), job); emitJobIdToReleaseStagesMetric(OssMetricsRegistry.JOB_FAILED_BY_RELEASE_STAGE, jobId); trackCompletion(job, JobStatus.FAILED); + + final UUID connectionId = UUID.fromString(job.getScope()); + job.getLastFailedAttempt().flatMap(Attempt::getFailureSummary) + .ifPresent(failureSummary -> jobErrorReporter.reportSyncJobFailure(connectionId, failureSummary, job.getConfig().getSync())); } catch (final IOException e) { throw new RetryableException(e); } @@ -224,6 +230,7 @@ public void attemptFailure(final AttemptFailureInput input) { MetricClientFactory.getMetricClient().count(OssMetricsRegistry.ATTEMPT_FAILED_BY_FAILURE_ORIGIN, 1, MetricTags.getFailureOrigin(reason.getFailureOrigin())); } + } catch (final IOException e) { throw new RetryableException(e); } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java index 1c9c3da9275f..211734d0d674 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityTest.java @@ -35,6 +35,7 @@ import io.airbyte.scheduler.persistence.JobCreator; import io.airbyte.scheduler.persistence.JobNotifier; import io.airbyte.scheduler.persistence.JobPersistence; +import io.airbyte.scheduler.persistence.job_error_reporter.JobErrorReporter; import io.airbyte.scheduler.persistence.job_factory.SyncJobFactory; import io.airbyte.scheduler.persistence.job_tracker.JobTracker; import io.airbyte.scheduler.persistence.job_tracker.JobTracker.JobState; @@ -94,6 +95,9 @@ public class JobCreationAndStatusUpdateActivityTest { @Mock private JobTracker mJobtracker; + @Mock + private JobErrorReporter mJobErrorReporter; + @Mock private ConfigRepository mConfigRepository; @@ -293,10 +297,22 @@ public void setJobSuccessWrapException() throws IOException { @Test public void setJobFailure() throws IOException { + final Attempt mAttempt = Mockito.mock(Attempt.class); + Mockito.when(mAttempt.getFailureSummary()).thenReturn(Optional.of(failureSummary)); + + final Job mJob = Mockito.mock(Job.class); + Mockito.when(mJob.getScope()).thenReturn(CONNECTION_ID.toString()); + Mockito.when(mJob.getConfig()).thenReturn(new JobConfig()); + Mockito.when(mJob.getLastFailedAttempt()).thenReturn(Optional.of(mAttempt)); + + Mockito.when(mJobPersistence.getJob(JOB_ID)) + .thenReturn(mJob); + jobCreationAndStatusUpdateActivity.jobFailure(new JobFailureInput(JOB_ID, "reason")); Mockito.verify(mJobPersistence).failJob(JOB_ID); Mockito.verify(mJobNotifier).failJob(eq("reason"), Mockito.any()); + Mockito.verify(mJobErrorReporter).reportSyncJobFailure(eq(CONNECTION_ID), eq(failureSummary), Mockito.any()); } @Test diff --git a/docker-compose.yaml b/docker-compose.yaml index 79a53b4d1d26..eeaa49fb2bb3 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -88,6 +88,8 @@ services: - WORKSPACE_ROOT=${WORKSPACE_ROOT} - METRIC_CLIENT=${METRIC_CLIENT} - OTEL_COLLECTOR_ENDPOINT=${OTEL_COLLECTOR_ENDPOINT} + - JOB_ERROR_REPORTING_STRATEGY=${JOB_ERROR_REPORTING_STRATEGY} + - JOB_ERROR_REPORTING_SENTRY_DSN=${JOB_ERROR_REPORTING_SENTRY_DSN} - ACTIVITY_MAX_ATTEMPT=${ACTIVITY_MAX_ATTEMPT} - ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS=${ACTIVITY_INITIAL_DELAY_BETWEEN_ATTEMPTS_SECONDS} - ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS=${ACTIVITY_MAX_DELAY_BETWEEN_ATTEMPTS_SECONDS}