Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add JobErrorReporter for sending sync job connector failures to Sentry #13899

Merged
merged 31 commits into from
Jun 24, 2022
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
da1761b
skeleton for reporting connector errors to sentry
pedroslopez Jun 13, 2022
975e947
report on job failures instead of attempt failures
pedroslopez Jun 14, 2022
d70e63c
report sync job failures with relevant metadata using JobErrorReporter
pedroslopez Jun 16, 2022
3beeab6
send stack traces from python connectors to sentry
pedroslopez Jun 16, 2022
91ce310
test JobCreationAndStatusUpdate and JobErrorReporter
pedroslopez Jun 17, 2022
3962a56
logs
pedroslopez Jun 17, 2022
36f17fe
refactor into helper, initial tests
pedroslopez Jun 17, 2022
dda5137
using sentry
pedroslopez Jun 17, 2022
b497626
run format
pedroslopez Jun 21, 2022
f749332
load reporting client from env
pedroslopez Jun 21, 2022
fb2bfd8
load sentry dsn from env
pedroslopez Jun 21, 2022
0c7bf86
send java stack traces to sentry
pedroslopez Jun 22, 2022
64379b3
test sentryclient, refactor to use Hub instance
pedroslopez Jun 22, 2022
01bdafa
ErrorReportingClient.report -> .reportJobFailureReason
pedroslopez Jun 22, 2022
f3d2953
inject exception helper, test stack trace parse error tagging
pedroslopez Jun 23, 2022
ab5dba5
rm logs
pedroslopez Jun 23, 2022
ed92e70
more stack trace tests
pedroslopez Jun 23, 2022
4a9dbf3
remove logs
pedroslopez Jun 23, 2022
cfd0828
Merge branch 'master' into pedroslopez/job-error-reporter-sentry
pedroslopez Jun 23, 2022
3f049b4
fix failing tests
pedroslopez Jun 23, 2022
5183514
rename ErrorReportingClient to JobErrorReportingClient
pedroslopez Jun 23, 2022
7dcdea5
rename vars in docker-compose
pedroslopez Jun 23, 2022
b3d9f12
Return an Optional instead of null when parsing stack traces
pedroslopez Jun 23, 2022
0bafd6f
dont remove airbyte prefix when setting release name
pedroslopez Jun 23, 2022
25b8015
from_trace_message static
pedroslopez Jun 23, 2022
bcad293
remove failureSummary from jobfailure input, get from Job
pedroslopez Jun 23, 2022
2b2c86e
send stacktrace string if we weren't able to parse
pedroslopez Jun 24, 2022
825b966
Merge branch 'master' into pedroslopez/job-error-reporter-sentry
pedroslopez Jun 24, 2022
a707a53
set deployment mode tag
pedroslopez Jun 24, 2022
107a244
update .env
pedroslopez Jun 24, 2022
225791a
just log if something goes wrong
pedroslopez Jun 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,17 @@ public interface Configs {
*/
TrackingStrategy getTrackingStrategy();

/**
* Define whether to send job failure events to Sentry or log-only. Airbyte internal use.
*/
JobErrorReportingStrategy getJobErrorReportingStrategy();

/**
* Determines the Sentry DSN that should be used when reporting connector job failures to Sentry.
* Used with SENTRY error reporting strategy. Airbyte internal use.
*/
String getJobErrorReportingSentryDSN();

// APPLICATIONS
// Worker
/**
Expand Down Expand Up @@ -561,6 +572,11 @@ enum TrackingStrategy {
LOGGING
}

enum JobErrorReportingStrategy {
SENTRY,
LOGGING
}

enum WorkerEnvironment {
DOCKER,
KUBERNETES
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public class EnvConfigs implements Configs {
public static final String CONFIG_ROOT = "CONFIG_ROOT";
public static final String DOCKER_NETWORK = "DOCKER_NETWORK";
public static final String TRACKING_STRATEGY = "TRACKING_STRATEGY";
public static final String JOB_ERROR_REPORTING_STRATEGY = "JOB_ERROR_REPORTING_STRATEGY";
public static final String JOB_ERROR_REPORTING_SENTRY_DSN = "JOB_ERROR_REPORTING_SENTRY_DSN";
public static final String DEPLOYMENT_MODE = "DEPLOYMENT_MODE";
public static final String DATABASE_USER = "DATABASE_USER";
public static final String DATABASE_PASSWORD = "DATABASE_PASSWORD";
Expand Down Expand Up @@ -786,6 +788,23 @@ public TrackingStrategy getTrackingStrategy() {
});
}

@Override
public JobErrorReportingStrategy getJobErrorReportingStrategy() {
return getEnvOrDefault(JOB_ERROR_REPORTING_STRATEGY, JobErrorReportingStrategy.LOGGING, s -> {
try {
return JobErrorReportingStrategy.valueOf(s.toUpperCase());
} catch (final IllegalArgumentException e) {
LOGGER.info(s + " not recognized, defaulting to " + JobErrorReportingStrategy.LOGGING);
return JobErrorReportingStrategy.LOGGING;
}
});
}

@Override
public String getJobErrorReportingSentryDSN() {
return getEnvOrDefault(JOB_ERROR_REPORTING_SENTRY_DSN, "");
}

// APPLICATIONS
// Worker
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.Configs.DeploymentMode;
import io.airbyte.config.Configs.JobErrorReportingStrategy;
import io.airbyte.config.Configs.WorkerEnvironment;
import java.nio.file.Paths;
import java.util.HashMap;
Expand Down Expand Up @@ -178,6 +179,27 @@ void testTrackingStrategy() {
assertEquals(Configs.TrackingStrategy.LOGGING, config.getTrackingStrategy());
}

@Test
void testErrorReportingStrategy() {
envMap.put(EnvConfigs.JOB_ERROR_REPORTING_STRATEGY, null);
assertEquals(JobErrorReportingStrategy.LOGGING, config.getJobErrorReportingStrategy());

envMap.put(EnvConfigs.JOB_ERROR_REPORTING_STRATEGY, "abc");
assertEquals(JobErrorReportingStrategy.LOGGING, config.getJobErrorReportingStrategy());

envMap.put(EnvConfigs.JOB_ERROR_REPORTING_STRATEGY, "logging");
assertEquals(JobErrorReportingStrategy.LOGGING, config.getJobErrorReportingStrategy());

envMap.put(EnvConfigs.JOB_ERROR_REPORTING_STRATEGY, "sentry");
assertEquals(JobErrorReportingStrategy.SENTRY, config.getJobErrorReportingStrategy());

envMap.put(EnvConfigs.JOB_ERROR_REPORTING_STRATEGY, "LOGGING");
assertEquals(JobErrorReportingStrategy.LOGGING, config.getJobErrorReportingStrategy());

envMap.put(EnvConfigs.JOB_ERROR_REPORTING_STRATEGY, "SENTRY");
assertEquals(JobErrorReportingStrategy.SENTRY, config.getJobErrorReportingStrategy());
}

@Test
void testDeploymentMode() {
envMap.put(EnvConfigs.DEPLOYMENT_MODE, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,14 @@ public Optional<JobOutput> getSuccessOutput() {
return getSuccessfulAttempt().flatMap(Attempt::getOutput);
}

public Optional<Attempt> getLastFailedAttempt() {
return getAttempts()
.stream()
.sorted(Comparator.comparing(Attempt::getCreatedAtInSecond).reversed())
.filter(a -> a.getStatus() == AttemptStatus.FAILED)
.findFirst();
}

public Optional<Attempt> getLastAttemptWithOutput() {
return getAttempts()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.jupiter.api.Test;

class JobTest {
Expand Down Expand Up @@ -42,8 +42,8 @@ void testHasRunningAttempt() {
}

private static Job jobWithAttemptWithStatus(final AttemptStatus... attemptStatuses) {
final List<Attempt> attempts = Arrays.stream(attemptStatuses)
.map(attemptStatus -> new Attempt(1L, 1L, null, null, attemptStatus, null, 0L, 0L, null))
final List<Attempt> attempts = IntStream.range(0, attemptStatuses.length)
.mapToObj(idx -> new Attempt(idx + 1, 1L, null, null, attemptStatuses[idx], null, idx, 0L, null))
.collect(Collectors.toList());
return new Job(1L, null, null, null, attempts, null, 0L, 0L, 0L);
}
Expand All @@ -60,6 +60,17 @@ void testGetSuccessfulAttempt() {
assertEquals(job.getAttempts().get(1), job.getSuccessfulAttempt().get());
}

@Test
void testGetLastFailedAttempt() {
assertTrue(jobWithAttemptWithStatus().getLastFailedAttempt().isEmpty());
assertTrue(jobWithAttemptWithStatus(AttemptStatus.SUCCEEDED).getLastFailedAttempt().isEmpty());
assertTrue(jobWithAttemptWithStatus(AttemptStatus.FAILED).getLastFailedAttempt().isPresent());

final Job job = jobWithAttemptWithStatus(AttemptStatus.FAILED, AttemptStatus.FAILED);
assertTrue(job.getLastFailedAttempt().isPresent());
assertEquals(2, job.getLastFailedAttempt().get().getId());
}

@Test
void testValidateStatusTransitionFromPending() {
final Job pendingJob = jobWithStatus(JobStatus.PENDING);
Expand Down
2 changes: 2 additions & 0 deletions airbyte-scheduler/scheduler-persistence/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ plugins {
}

dependencies {
implementation 'io.sentry:sentry:6.1.0'

implementation project(':airbyte-analytics')
implementation project(':airbyte-commons-docker')
implementation project(':airbyte-config:config-models')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.scheduler.persistence.job_error_reporter;

import io.airbyte.config.AttemptFailureSummary;
import io.airbyte.config.FailureReason;
import io.airbyte.config.FailureReason.FailureOrigin;
import io.airbyte.config.JobSyncConfig;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.persistence.ConfigRepository;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;

public class JobErrorReporter {

private static final String FROM_TRACE_MESSAGE = "from_trace_message";
private static final String AIRBYTE_VERSION_META_KEY = "airbyte_version";
private static final String FAILURE_ORIGIN_META_KEY = "failure_origin";
private static final String FAILURE_TYPE_META_KEY = "failure_type";
private static final String CONNECTION_ID_META_KEY = "connection_id";
private static final String CONNECTOR_NAME_META_KEY = "connector_name";
private static final String CONNECTOR_DEFINITION_ID_META_KEY = "connector_definition_id";
private static final String CONNECTOR_RELEASE_STAGE_META_KEY = "connector_release_stage";
pedroslopez marked this conversation as resolved.
Show resolved Hide resolved

private final ConfigRepository configRepository;
private final String airbyteVersion;
private final JobErrorReportingClient jobErrorReportingClient;

public JobErrorReporter(final ConfigRepository configRepository,
final String airbyteVersion,
final JobErrorReportingClient jobErrorReportingClient) {

this.configRepository = configRepository;
this.airbyteVersion = airbyteVersion;
this.jobErrorReportingClient = jobErrorReportingClient;
}

/**
* Reports a Sync Job's connector-caused FailureReasons to the JobErrorReportingClient
*
* @param connectionId - connection that had the failure
* @param failureSummary - final attempt failure summary
* @param jobSyncConfig - config for the sync job
*/
public void reportSyncJobFailure(final UUID connectionId, final AttemptFailureSummary failureSummary, final JobSyncConfig jobSyncConfig) {
final List<FailureReason> traceMessageFailures = failureSummary.getFailures().stream()
.filter(failure -> failure.getMetadata() != null && failure.getMetadata().getAdditionalProperties().containsKey(FROM_TRACE_MESSAGE))
.toList();

final StandardWorkspace workspace = configRepository.getStandardWorkspaceFromConnection(connectionId, true);

for (final FailureReason failureReason : traceMessageFailures) {
final FailureOrigin failureOrigin = failureReason.getFailureOrigin();

final HashMap<String, String> metadata = new HashMap<>();
metadata.put(CONNECTION_ID_META_KEY, connectionId.toString());
metadata.put(AIRBYTE_VERSION_META_KEY, airbyteVersion);
metadata.put(FAILURE_ORIGIN_META_KEY, failureOrigin.value());
metadata.put(FAILURE_TYPE_META_KEY, failureReason.getFailureType().value());

if (failureOrigin == FailureOrigin.SOURCE) {
final StandardSourceDefinition sourceDefinition = configRepository.getSourceDefinitionFromConnection(connectionId);
final String dockerImage = jobSyncConfig.getSourceDockerImage();

metadata.put(CONNECTOR_DEFINITION_ID_META_KEY, sourceDefinition.getSourceDefinitionId().toString());
metadata.put(CONNECTOR_NAME_META_KEY, sourceDefinition.getName());
metadata.put(CONNECTOR_RELEASE_STAGE_META_KEY, sourceDefinition.getReleaseStage().value());

jobErrorReportingClient.reportJobFailureReason(workspace, failureReason, dockerImage, metadata);
} else if (failureOrigin == FailureOrigin.DESTINATION) {
final StandardDestinationDefinition destinationDefinition = configRepository.getDestinationDefinitionFromConnection(connectionId);
final String dockerImage = jobSyncConfig.getDestinationDockerImage();

metadata.put(CONNECTOR_DEFINITION_ID_META_KEY, destinationDefinition.getDestinationDefinitionId().toString());
metadata.put(CONNECTOR_NAME_META_KEY, destinationDefinition.getName());
metadata.put(CONNECTOR_RELEASE_STAGE_META_KEY, destinationDefinition.getReleaseStage().value());

jobErrorReportingClient.reportJobFailureReason(workspace, failureReason, dockerImage, metadata);
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.scheduler.persistence.job_error_reporter;

import io.airbyte.config.FailureReason;
import io.airbyte.config.StandardWorkspace;
import java.util.Map;

/**
* A generic interface for a client that reports errors
*/
public interface JobErrorReportingClient {

/**
* Report a job failure reason
*/
void reportJobFailureReason(StandardWorkspace workspace, final FailureReason reason, final String dockerImage, Map<String, String> metadata);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.scheduler.persistence.job_error_reporter;

import io.airbyte.config.Configs;
import io.airbyte.config.Configs.JobErrorReportingStrategy;

public class JobErrorReportingClientFactory {

/**
* Creates an error reporting client based on the desired strategy to use
*
* @param strategy - which type of error reporting client should be created
* @return JobErrorReportingClient
*/
public static JobErrorReportingClient getClient(final JobErrorReportingStrategy strategy, final Configs configs) {
return switch (strategy) {
case SENTRY -> new SentryJobErrorReportingClient(configs.getJobErrorReportingSentryDSN(), new SentryExceptionHelper());
case LOGGING -> new LoggingJobErrorReportingClient();
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.scheduler.persistence.job_error_reporter;

import io.airbyte.config.FailureReason;
import io.airbyte.config.StandardWorkspace;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LoggingJobErrorReportingClient implements JobErrorReportingClient {

private static final Logger LOGGER = LoggerFactory.getLogger(LoggingJobErrorReportingClient.class);

@Override
public void reportJobFailureReason(final StandardWorkspace workspace,
final FailureReason reason,
final String dockerImage,
final Map<String, String> metadata) {
LOGGER.info("Report Job Error -> workspaceId: {}, dockerImage: {}, failureReason: {}, metadata: {}",
workspace.getWorkspaceId(),
dockerImage,
reason,
metadata);
}

}
Loading