Skip to content

Commit

Permalink
Add JobErrorReporter for sending sync job connector failures to Sentry (
Browse files Browse the repository at this point in the history
#13899)

* skeleton for reporting connector errors to sentry

* report on job failures instead of attempt failures

* report sync job failures with relevant metadata using JobErrorReporter

* send stack traces from python connectors to sentry

* test JobCreationAndStatusUpdate and JobErrorReporter

* logs

* refactor into helper, initial tests

* using sentry

* run format

* load reporting client from env

* load sentry dsn from env

* send java stack traces to sentry

* test sentryclient, refactor to use Hub instance

* ErrorReportingClient.report -> .reportJobFailureReason

* inject exception helper, test stack trace parse error tagging

* rm logs

* more stack trace tests

* remove logs

* fix failing tests

* rename ErrorReportingClient to JobErrorReportingClient

* rename vars in docker-compose

* Return an Optional instead of null when parsing stack traces

* dont remove airbyte prefix when setting release name

* from_trace_message static

* remove failureSummary from jobfailure input, get from Job

* send stacktrace string if we weren't able to parse

* set deployment mode tag

* update .env

* just log if something goes wrong
  • Loading branch information
pedroslopez authored Jun 24, 2022
1 parent cc2b82c commit d6d32b3
Show file tree
Hide file tree
Showing 21 changed files with 1,296 additions and 7 deletions.
3 changes: 1 addition & 2 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,9 @@ JOB_MAIN_CONTAINER_MEMORY_LIMIT=

### LOGGING/MONITORING/TRACKING ###
TRACKING_STRATEGY=segment
JOB_ERROR_REPORTING_STRATEGY=logging
# Although not present as an env var, expected by Log4J configuration.
LOG_LEVEL=INFO
# Although not present as an env var, helps Airbyte track job healthiness.
SENTRY_DSN="https://d4b03de0c4574c78999b8d58e55243dc@o1009025.ingest.sentry.io/6102835"


### APPLICATIONS ###
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,17 @@ public interface Configs {
*/
TrackingStrategy getTrackingStrategy();

/**
* Define whether to send job failure events to Sentry or log-only. Airbyte internal use.
*/
JobErrorReportingStrategy getJobErrorReportingStrategy();

/**
* Determines the Sentry DSN that should be used when reporting connector job failures to Sentry.
* Used with SENTRY error reporting strategy. Airbyte internal use.
*/
String getJobErrorReportingSentryDSN();

// APPLICATIONS
// Worker
/**
Expand Down Expand Up @@ -578,6 +589,11 @@ enum TrackingStrategy {
LOGGING
}

enum JobErrorReportingStrategy {
SENTRY,
LOGGING
}

enum WorkerEnvironment {
DOCKER,
KUBERNETES
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public class EnvConfigs implements Configs {
public static final String CONFIG_ROOT = "CONFIG_ROOT";
public static final String DOCKER_NETWORK = "DOCKER_NETWORK";
public static final String TRACKING_STRATEGY = "TRACKING_STRATEGY";
public static final String JOB_ERROR_REPORTING_STRATEGY = "JOB_ERROR_REPORTING_STRATEGY";
public static final String JOB_ERROR_REPORTING_SENTRY_DSN = "JOB_ERROR_REPORTING_SENTRY_DSN";
public static final String DEPLOYMENT_MODE = "DEPLOYMENT_MODE";
public static final String DATABASE_USER = "DATABASE_USER";
public static final String DATABASE_PASSWORD = "DATABASE_PASSWORD";
Expand Down Expand Up @@ -805,6 +807,23 @@ public TrackingStrategy getTrackingStrategy() {
});
}

@Override
public JobErrorReportingStrategy getJobErrorReportingStrategy() {
return getEnvOrDefault(JOB_ERROR_REPORTING_STRATEGY, JobErrorReportingStrategy.LOGGING, s -> {
try {
return JobErrorReportingStrategy.valueOf(s.toUpperCase());
} catch (final IllegalArgumentException e) {
LOGGER.info(s + " not recognized, defaulting to " + JobErrorReportingStrategy.LOGGING);
return JobErrorReportingStrategy.LOGGING;
}
});
}

@Override
public String getJobErrorReportingSentryDSN() {
return getEnvOrDefault(JOB_ERROR_REPORTING_SENTRY_DSN, "");
}

// APPLICATIONS
// Worker
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.Configs.DeploymentMode;
import io.airbyte.config.Configs.JobErrorReportingStrategy;
import io.airbyte.config.Configs.WorkerEnvironment;
import java.nio.file.Paths;
import java.util.HashMap;
Expand Down Expand Up @@ -178,6 +179,27 @@ void testTrackingStrategy() {
assertEquals(Configs.TrackingStrategy.LOGGING, config.getTrackingStrategy());
}

@Test
void testErrorReportingStrategy() {
envMap.put(EnvConfigs.JOB_ERROR_REPORTING_STRATEGY, null);
assertEquals(JobErrorReportingStrategy.LOGGING, config.getJobErrorReportingStrategy());

envMap.put(EnvConfigs.JOB_ERROR_REPORTING_STRATEGY, "abc");
assertEquals(JobErrorReportingStrategy.LOGGING, config.getJobErrorReportingStrategy());

envMap.put(EnvConfigs.JOB_ERROR_REPORTING_STRATEGY, "logging");
assertEquals(JobErrorReportingStrategy.LOGGING, config.getJobErrorReportingStrategy());

envMap.put(EnvConfigs.JOB_ERROR_REPORTING_STRATEGY, "sentry");
assertEquals(JobErrorReportingStrategy.SENTRY, config.getJobErrorReportingStrategy());

envMap.put(EnvConfigs.JOB_ERROR_REPORTING_STRATEGY, "LOGGING");
assertEquals(JobErrorReportingStrategy.LOGGING, config.getJobErrorReportingStrategy());

envMap.put(EnvConfigs.JOB_ERROR_REPORTING_STRATEGY, "SENTRY");
assertEquals(JobErrorReportingStrategy.SENTRY, config.getJobErrorReportingStrategy());
}

@Test
void testDeploymentMode() {
envMap.put(EnvConfigs.DEPLOYMENT_MODE, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,14 @@ public Optional<JobOutput> getSuccessOutput() {
return getSuccessfulAttempt().flatMap(Attempt::getOutput);
}

public Optional<Attempt> getLastFailedAttempt() {
return getAttempts()
.stream()
.sorted(Comparator.comparing(Attempt::getCreatedAtInSecond).reversed())
.filter(a -> a.getStatus() == AttemptStatus.FAILED)
.findFirst();
}

public Optional<Attempt> getLastAttemptWithOutput() {
return getAttempts()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.jupiter.api.Test;

class JobTest {
Expand Down Expand Up @@ -42,8 +42,8 @@ void testHasRunningAttempt() {
}

private static Job jobWithAttemptWithStatus(final AttemptStatus... attemptStatuses) {
final List<Attempt> attempts = Arrays.stream(attemptStatuses)
.map(attemptStatus -> new Attempt(1L, 1L, null, null, attemptStatus, null, 0L, 0L, null))
final List<Attempt> attempts = IntStream.range(0, attemptStatuses.length)
.mapToObj(idx -> new Attempt(idx + 1, 1L, null, null, attemptStatuses[idx], null, idx, 0L, null))
.collect(Collectors.toList());
return new Job(1L, null, null, null, attempts, null, 0L, 0L, 0L);
}
Expand All @@ -60,6 +60,17 @@ void testGetSuccessfulAttempt() {
assertEquals(job.getAttempts().get(1), job.getSuccessfulAttempt().get());
}

@Test
void testGetLastFailedAttempt() {
assertTrue(jobWithAttemptWithStatus().getLastFailedAttempt().isEmpty());
assertTrue(jobWithAttemptWithStatus(AttemptStatus.SUCCEEDED).getLastFailedAttempt().isEmpty());
assertTrue(jobWithAttemptWithStatus(AttemptStatus.FAILED).getLastFailedAttempt().isPresent());

final Job job = jobWithAttemptWithStatus(AttemptStatus.FAILED, AttemptStatus.FAILED);
assertTrue(job.getLastFailedAttempt().isPresent());
assertEquals(2, job.getLastFailedAttempt().get().getId());
}

@Test
void testValidateStatusTransitionFromPending() {
final Job pendingJob = jobWithStatus(JobStatus.PENDING);
Expand Down
2 changes: 2 additions & 0 deletions airbyte-scheduler/scheduler-persistence/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ plugins {
}

dependencies {
implementation 'io.sentry:sentry:6.1.0'

implementation project(':airbyte-analytics')
implementation project(':airbyte-commons-docker')
implementation project(':airbyte-config:config-models')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.scheduler.persistence.job_error_reporter;

import io.airbyte.config.AttemptFailureSummary;
import io.airbyte.config.Configs.DeploymentMode;
import io.airbyte.config.FailureReason;
import io.airbyte.config.FailureReason.FailureOrigin;
import io.airbyte.config.JobSyncConfig;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.persistence.ConfigRepository;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobErrorReporter {

private static final Logger LOGGER = LoggerFactory.getLogger(JobErrorReporter.class);

private static final String FROM_TRACE_MESSAGE = "from_trace_message";
private static final String DEPLOYMENT_MODE_META_KEY = "deployment_mode";
private static final String AIRBYTE_VERSION_META_KEY = "airbyte_version";
private static final String FAILURE_ORIGIN_META_KEY = "failure_origin";
private static final String FAILURE_TYPE_META_KEY = "failure_type";
private static final String CONNECTION_ID_META_KEY = "connection_id";
private static final String CONNECTOR_NAME_META_KEY = "connector_name";
private static final String CONNECTOR_DEFINITION_ID_META_KEY = "connector_definition_id";
private static final String CONNECTOR_RELEASE_STAGE_META_KEY = "connector_release_stage";

private final ConfigRepository configRepository;
private final DeploymentMode deploymentMode;
private final String airbyteVersion;
private final JobErrorReportingClient jobErrorReportingClient;

public JobErrorReporter(final ConfigRepository configRepository,
final DeploymentMode deploymentMode,
final String airbyteVersion,
final JobErrorReportingClient jobErrorReportingClient) {

this.configRepository = configRepository;
this.deploymentMode = deploymentMode;
this.airbyteVersion = airbyteVersion;
this.jobErrorReportingClient = jobErrorReportingClient;
}

/**
* Reports a Sync Job's connector-caused FailureReasons to the JobErrorReportingClient
*
* @param connectionId - connection that had the failure
* @param failureSummary - final attempt failure summary
* @param jobSyncConfig - config for the sync job
*/
public void reportSyncJobFailure(final UUID connectionId, final AttemptFailureSummary failureSummary, final JobSyncConfig jobSyncConfig) {
final List<FailureReason> traceMessageFailures = failureSummary.getFailures().stream()
.filter(failure -> failure.getMetadata() != null && failure.getMetadata().getAdditionalProperties().containsKey(FROM_TRACE_MESSAGE))
.toList();

final StandardWorkspace workspace = configRepository.getStandardWorkspaceFromConnection(connectionId, true);

for (final FailureReason failureReason : traceMessageFailures) {
final FailureOrigin failureOrigin = failureReason.getFailureOrigin();

final HashMap<String, String> metadata = new HashMap<>();
metadata.put(CONNECTION_ID_META_KEY, connectionId.toString());
metadata.put(AIRBYTE_VERSION_META_KEY, airbyteVersion);
metadata.put(DEPLOYMENT_MODE_META_KEY, deploymentMode.name());
metadata.put(FAILURE_ORIGIN_META_KEY, failureOrigin.value());
metadata.put(FAILURE_TYPE_META_KEY, failureReason.getFailureType().value());

try {
if (failureOrigin == FailureOrigin.SOURCE) {
final StandardSourceDefinition sourceDefinition = configRepository.getSourceDefinitionFromConnection(connectionId);
final String dockerImage = jobSyncConfig.getSourceDockerImage();

metadata.put(CONNECTOR_DEFINITION_ID_META_KEY, sourceDefinition.getSourceDefinitionId().toString());
metadata.put(CONNECTOR_NAME_META_KEY, sourceDefinition.getName());
metadata.put(CONNECTOR_RELEASE_STAGE_META_KEY, sourceDefinition.getReleaseStage().value());

jobErrorReportingClient.reportJobFailureReason(workspace, failureReason, dockerImage, metadata);
} else if (failureOrigin == FailureOrigin.DESTINATION) {
final StandardDestinationDefinition destinationDefinition = configRepository.getDestinationDefinitionFromConnection(connectionId);
final String dockerImage = jobSyncConfig.getDestinationDockerImage();

metadata.put(CONNECTOR_DEFINITION_ID_META_KEY, destinationDefinition.getDestinationDefinitionId().toString());
metadata.put(CONNECTOR_NAME_META_KEY, destinationDefinition.getName());
metadata.put(CONNECTOR_RELEASE_STAGE_META_KEY, destinationDefinition.getReleaseStage().value());

jobErrorReportingClient.reportJobFailureReason(workspace, failureReason, dockerImage, metadata);
}
} catch (final Exception e) {
LOGGER.error("Error when reporting job failure reason: {}", failureReason, e);
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.scheduler.persistence.job_error_reporter;

import io.airbyte.config.FailureReason;
import io.airbyte.config.StandardWorkspace;
import java.util.Map;

/**
* A generic interface for a client that reports errors
*/
public interface JobErrorReportingClient {

/**
* Report a job failure reason
*/
void reportJobFailureReason(StandardWorkspace workspace, final FailureReason reason, final String dockerImage, Map<String, String> metadata);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.scheduler.persistence.job_error_reporter;

import io.airbyte.config.Configs;
import io.airbyte.config.Configs.JobErrorReportingStrategy;

public class JobErrorReportingClientFactory {

/**
* Creates an error reporting client based on the desired strategy to use
*
* @param strategy - which type of error reporting client should be created
* @return JobErrorReportingClient
*/
public static JobErrorReportingClient getClient(final JobErrorReportingStrategy strategy, final Configs configs) {
return switch (strategy) {
case SENTRY -> new SentryJobErrorReportingClient(configs.getJobErrorReportingSentryDSN(), new SentryExceptionHelper());
case LOGGING -> new LoggingJobErrorReportingClient();
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.scheduler.persistence.job_error_reporter;

import io.airbyte.config.FailureReason;
import io.airbyte.config.StandardWorkspace;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LoggingJobErrorReportingClient implements JobErrorReportingClient {

private static final Logger LOGGER = LoggerFactory.getLogger(LoggingJobErrorReportingClient.class);

@Override
public void reportJobFailureReason(final StandardWorkspace workspace,
final FailureReason reason,
final String dockerImage,
final Map<String, String> metadata) {
LOGGER.info("Report Job Error -> workspaceId: {}, dockerImage: {}, failureReason: {}, metadata: {}",
workspace.getWorkspaceId(),
dockerImage,
reason,
metadata);
}

}
Loading

0 comments on commit d6d32b3

Please sign in to comment.