Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add JobErrorReporter for sending sync job connector failures to Sentry #13899

Merged
merged 31 commits into from
Jun 24, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
da1761b
skeleton for reporting connector errors to sentry
pedroslopez Jun 13, 2022
975e947
report on job failures instead of attempt failures
pedroslopez Jun 14, 2022
d70e63c
report sync job failures with relevant metadata using JobErrorReporter
pedroslopez Jun 16, 2022
3beeab6
send stack traces from python connectors to sentry
pedroslopez Jun 16, 2022
91ce310
test JobCreationAndStatusUpdate and JobErrorReporter
pedroslopez Jun 17, 2022
3962a56
logs
pedroslopez Jun 17, 2022
36f17fe
refactor into helper, initial tests
pedroslopez Jun 17, 2022
dda5137
using sentry
pedroslopez Jun 17, 2022
b497626
run format
pedroslopez Jun 21, 2022
f749332
load reporting client from env
pedroslopez Jun 21, 2022
fb2bfd8
load sentry dsn from env
pedroslopez Jun 21, 2022
0c7bf86
send java stack traces to sentry
pedroslopez Jun 22, 2022
64379b3
test sentryclient, refactor to use Hub instance
pedroslopez Jun 22, 2022
01bdafa
ErrorReportingClient.report -> .reportJobFailureReason
pedroslopez Jun 22, 2022
f3d2953
inject exception helper, test stack trace parse error tagging
pedroslopez Jun 23, 2022
ab5dba5
rm logs
pedroslopez Jun 23, 2022
ed92e70
more stack trace tests
pedroslopez Jun 23, 2022
4a9dbf3
remove logs
pedroslopez Jun 23, 2022
cfd0828
Merge branch 'master' into pedroslopez/job-error-reporter-sentry
pedroslopez Jun 23, 2022
3f049b4
fix failing tests
pedroslopez Jun 23, 2022
5183514
rename ErrorReportingClient to JobErrorReportingClient
pedroslopez Jun 23, 2022
7dcdea5
rename vars in docker-compose
pedroslopez Jun 23, 2022
b3d9f12
Return an Optional instead of null when parsing stack traces
pedroslopez Jun 23, 2022
0bafd6f
dont remove airbyte prefix when setting release name
pedroslopez Jun 23, 2022
25b8015
from_trace_message static
pedroslopez Jun 23, 2022
bcad293
remove failureSummary from jobfailure input, get from Job
pedroslopez Jun 23, 2022
2b2c86e
send stacktrace string if we weren't able to parse
pedroslopez Jun 24, 2022
825b966
Merge branch 'master' into pedroslopez/job-error-reporter-sentry
pedroslopez Jun 24, 2022
a707a53
set deployment mode tag
pedroslopez Jun 24, 2022
107a244
update .env
pedroslopez Jun 24, 2022
225791a
just log if something goes wrong
pedroslopez Jun 24, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,17 @@ public interface Configs {
*/
TrackingStrategy getTrackingStrategy();

/**
* Define whether to send job failure events to Sentry or log-only. Airbyte internal use.
*/
ErrorReportingStrategy getErrorReportingStrategy();

/**
* Determines the Sentry DSN that should be used when reporting connector job failures to Sentry.
* Used with SENTRY error reporting strategy. Airbyte internal use.
*/
String getErrorReportingSentryDSN();

// APPLICATIONS
// Worker
/**
Expand Down Expand Up @@ -522,6 +533,11 @@ enum TrackingStrategy {
LOGGING
}

enum ErrorReportingStrategy {
SENTRY,
LOGGING
}

enum WorkerEnvironment {
DOCKER,
KUBERNETES
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public class EnvConfigs implements Configs {
public static final String CONFIG_ROOT = "CONFIG_ROOT";
public static final String DOCKER_NETWORK = "DOCKER_NETWORK";
public static final String TRACKING_STRATEGY = "TRACKING_STRATEGY";
public static final String ERROR_REPORTING_STRATEGY = "ERROR_REPORTING_STRATEGY";
public static final String ERROR_REPORTING_SENTRY_DSN = "ERROR_REPORTING_SENTRY_DSN";
public static final String DEPLOYMENT_MODE = "DEPLOYMENT_MODE";
public static final String DATABASE_USER = "DATABASE_USER";
public static final String DATABASE_PASSWORD = "DATABASE_PASSWORD";
Expand Down Expand Up @@ -751,6 +753,23 @@ public TrackingStrategy getTrackingStrategy() {
});
}

@Override
public ErrorReportingStrategy getErrorReportingStrategy() {
return getEnvOrDefault(ERROR_REPORTING_STRATEGY, ErrorReportingStrategy.LOGGING, s -> {
try {
return ErrorReportingStrategy.valueOf(s.toUpperCase());
} catch (final IllegalArgumentException e) {
LOGGER.info(s + " not recognized, defaulting to " + ErrorReportingStrategy.LOGGING);
return ErrorReportingStrategy.LOGGING;
}
});
}

@Override
public String getErrorReportingSentryDSN() {
return getEnvOrDefault(ERROR_REPORTING_SENTRY_DSN, "");
}

// APPLICATIONS
// Worker
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,27 @@ void testTrackingStrategy() {
assertEquals(Configs.TrackingStrategy.LOGGING, config.getTrackingStrategy());
}

@Test
void testErrorReportingStrategy() {
envMap.put(EnvConfigs.ERROR_REPORTING_STRATEGY, null);
assertEquals(Configs.ErrorReportingStrategy.LOGGING, config.getErrorReportingStrategy());

envMap.put(EnvConfigs.ERROR_REPORTING_STRATEGY, "abc");
assertEquals(Configs.ErrorReportingStrategy.LOGGING, config.getErrorReportingStrategy());

envMap.put(EnvConfigs.ERROR_REPORTING_STRATEGY, "logging");
assertEquals(Configs.ErrorReportingStrategy.LOGGING, config.getErrorReportingStrategy());

envMap.put(EnvConfigs.ERROR_REPORTING_STRATEGY, "sentry");
assertEquals(Configs.ErrorReportingStrategy.SENTRY, config.getErrorReportingStrategy());

envMap.put(EnvConfigs.ERROR_REPORTING_STRATEGY, "LOGGING");
assertEquals(Configs.ErrorReportingStrategy.LOGGING, config.getErrorReportingStrategy());

envMap.put(EnvConfigs.ERROR_REPORTING_STRATEGY, "SENTRY");
assertEquals(Configs.ErrorReportingStrategy.SENTRY, config.getErrorReportingStrategy());
}

@Test
void testDeploymentMode() {
envMap.put(EnvConfigs.DEPLOYMENT_MODE, null);
Expand Down
2 changes: 2 additions & 0 deletions airbyte-scheduler/scheduler-persistence/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ plugins {
}

dependencies {
implementation 'io.sentry:sentry:6.1.0'

implementation project(':airbyte-analytics')
implementation project(':airbyte-commons-docker')
implementation project(':airbyte-config:config-models')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.scheduler.persistence.job_error_reporter;

import io.airbyte.config.FailureReason;
import io.airbyte.config.StandardWorkspace;
import java.util.Map;

/**
* A generic interface for a client that reports errors
*/
public interface ErrorReportingClient {

/**
* Report a job failure reason
*/
void report(StandardWorkspace workspace, final FailureReason reason, final String dockerImage, Map<String, String> metadata);
pedroslopez marked this conversation as resolved.
Show resolved Hide resolved

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.scheduler.persistence.job_error_reporter;

import io.airbyte.config.Configs;
import io.airbyte.config.Configs.ErrorReportingStrategy;

public class ErrorReportingClientFactory {

/**
* Creates an error reporting client based on the desired strategy to use
*
* @param strategy - which type of error reporting client should be created
* @return ErrorReportingClient
*/
public static ErrorReportingClient getClient(final ErrorReportingStrategy strategy, final Configs configs) {
return switch (strategy) {
case SENTRY -> new SentryErrorReportingClient(configs.getErrorReportingSentryDSN());
case LOGGING -> new LoggingErrorReportingClient();
};
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.scheduler.persistence.job_error_reporter;

import io.airbyte.config.AttemptFailureSummary;
import io.airbyte.config.FailureReason;
import io.airbyte.config.FailureReason.FailureOrigin;
import io.airbyte.config.JobSyncConfig;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.persistence.ConfigRepository;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobErrorReporter {

private static final String AIRBYTE_VERSION_META_KEY = "airbyte_version";
private static final String FAILURE_ORIGIN_META_KEY = "failure_origin";
private static final String FAILURE_TYPE_META_KEY = "failure_type";
private static final String CONNECTION_ID_META_KEY = "connection_id";
private static final String CONNECTOR_NAME_META_KEY = "connector_name";
private static final String CONNECTOR_DEFINITION_ID_META_KEY = "connector_definition_id";
private static final String CONNECTOR_RELEASE_STAGE_META_KEY = "connector_release_stage";
pedroslopez marked this conversation as resolved.
Show resolved Hide resolved

private final ConfigRepository configRepository;
private final String airbyteVersion;
private final ErrorReportingClient errorReportingClient;

public JobErrorReporter(
final ConfigRepository configRepository,
final String airbyteVersion,
final ErrorReportingClient errorReportingClient) {

this.configRepository = configRepository;
this.airbyteVersion = airbyteVersion;
this.errorReportingClient = errorReportingClient;
}

private static final Logger LOGGER = LoggerFactory.getLogger(JobErrorReporter.class);
pedroslopez marked this conversation as resolved.
Show resolved Hide resolved

public void reportSyncJobFailure(final UUID connectionId, final AttemptFailureSummary failureSummary, final JobSyncConfig jobSyncConfig) {
LOGGER.info("reportSyncJobFailure");
final List<FailureReason> traceMessageFailures = failureSummary.getFailures().stream()
.filter(failure -> failure.getMetadata() != null && failure.getMetadata().getAdditionalProperties().containsKey("from_trace_message"))
.toList();

final StandardWorkspace workspace = configRepository.getStandardWorkspaceFromConnection(connectionId, true);

for (final FailureReason failureReason : traceMessageFailures) {
final FailureOrigin failureOrigin = failureReason.getFailureOrigin();

final HashMap<String, String> metadata = new HashMap<>();
metadata.put(CONNECTION_ID_META_KEY, connectionId.toString());
metadata.put(AIRBYTE_VERSION_META_KEY, airbyteVersion);
metadata.put(FAILURE_ORIGIN_META_KEY, failureOrigin.value());
metadata.put(FAILURE_TYPE_META_KEY, failureReason.getFailureType().value());

if (failureOrigin == FailureOrigin.SOURCE) {
final StandardSourceDefinition sourceDefinition = configRepository.getSourceDefinitionFromConnection(connectionId);
final String dockerImage = jobSyncConfig.getSourceDockerImage();

metadata.put(CONNECTOR_DEFINITION_ID_META_KEY, sourceDefinition.getSourceDefinitionId().toString());
metadata.put(CONNECTOR_NAME_META_KEY, sourceDefinition.getName());
metadata.put(CONNECTOR_RELEASE_STAGE_META_KEY, sourceDefinition.getReleaseStage().value());

errorReportingClient.report(workspace, failureReason, dockerImage, metadata);
} else if (failureOrigin == FailureOrigin.DESTINATION) {
final StandardDestinationDefinition destinationDefinition = configRepository.getDestinationDefinitionFromConnection(connectionId);
final String dockerImage = jobSyncConfig.getDestinationDockerImage();

metadata.put(CONNECTOR_DEFINITION_ID_META_KEY, destinationDefinition.getDestinationDefinitionId().toString());
metadata.put(CONNECTOR_NAME_META_KEY, destinationDefinition.getName());
metadata.put(CONNECTOR_RELEASE_STAGE_META_KEY, destinationDefinition.getReleaseStage().value());

errorReportingClient.report(workspace, failureReason, dockerImage, metadata);
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.scheduler.persistence.job_error_reporter;

import io.airbyte.config.FailureReason;
import io.airbyte.config.StandardWorkspace;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LoggingErrorReportingClient implements ErrorReportingClient {

private static final Logger LOGGER = LoggerFactory.getLogger(LoggingErrorReportingClient.class);

@Override
public void report(final StandardWorkspace workspace, final FailureReason reason, final String dockerImage, final Map<String, String> metadata) {
LOGGER.info("Report Job Error -> workspaceId: {}, dockerImage: {}, failureReason: {}, metadata: {}",
workspace.getWorkspaceId(),
dockerImage,
reason,
metadata);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.scheduler.persistence.job_error_reporter;

import io.airbyte.config.FailureReason;
import io.airbyte.config.StandardWorkspace;
import io.sentry.Sentry;
import io.sentry.SentryEvent;
import io.sentry.protocol.Message;
import io.sentry.protocol.SentryException;
import io.sentry.protocol.SentryId;
import io.sentry.protocol.User;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SentryErrorReportingClient implements ErrorReportingClient {

private static final Logger LOGGER = LoggerFactory.getLogger(LoggingErrorReportingClient.class);

public SentryErrorReportingClient(final String sentryDSN) {
Sentry.init(options -> {
options.setDsn(sentryDSN);
options.setEnableUncaughtExceptionHandler(false);
});
}

@Override
public void report(final StandardWorkspace workspace,
final FailureReason failureReason,
final String dockerImage,
final Map<String, String> metadata) {
final SentryEvent event = new SentryEvent();

// airbyte/source-xyz:1.2.0 -> source-xyz@1.2.0
final String release = dockerImage.replace(":", "@").substring(dockerImage.lastIndexOf("/") + 1);
pedroslopez marked this conversation as resolved.
Show resolved Hide resolved
event.setRelease(release);

// add connector to event fingerprint to ensure separate grouping per connector
final String[] releaseParts = release.split("@");
if (releaseParts.length > 0) {
event.setFingerprints(List.of("{{ default }}", releaseParts[0]));
}

// set workspace as the user in sentry to get impact and priority
final User sentryUser = new User();
sentryUser.setId(String.valueOf(workspace.getWorkspaceId()));
sentryUser.setUsername(workspace.getName());
event.setUser(sentryUser);

// set metadata as tags
event.setTags(metadata);

final Message message = new Message();
message.setFormatted(failureReason.getInternalMessage());
event.setMessage(message);

// don't attach current thread stacktrace to the event
event.setThreads(new ArrayList<>());
event.setPlatform("other");

// attach failure reason stack trace
final String failureStackTrace = failureReason.getStacktrace();
if (failureStackTrace != null) {
final List<SentryException> parsedExceptions = SentryExceptionHelper.buildSentryExceptions(failureStackTrace);
pedroslopez marked this conversation as resolved.
Show resolved Hide resolved
event.setExceptions(parsedExceptions);
}

Sentry.withScope(scope -> {
final Map<String, String> failureReasonContext = new HashMap<>();
failureReasonContext.put("internalMessage", failureReason.getInternalMessage());
failureReasonContext.put("externalMessage", failureReason.getExternalMessage());
failureReasonContext.put("stacktrace", failureReason.getStacktrace());
scope.setContexts("Failure Reason", failureReasonContext);

final SentryId eventId = Sentry.captureEvent(event);
LOGGER.info("SENT SENTRY EVENT: {}", eventId);
});
}

}
Loading