Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add failure origins to APM trace #19665

Merged
merged 2 commits into from
Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ public static FailureReason dbtFailure(final Throwable t, final Long jobId, fina

public static FailureReason unknownOriginFailure(final Throwable t, final Long jobId, final Integer attemptNumber) {
return genericFailure(t, jobId, attemptNumber)
.withFailureOrigin(FailureOrigin.UNKNOWN)
.withExternalMessage("An unknown failure occurred");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,14 @@ void testOrderedFailures() throws Exception {
assertEquals(failureReasonList.get(0), TRACE_FAILURE_REASON);
}

@Test
void testUnknownOriginFailure() {
final Throwable t = new RuntimeException();
final Long jobId = 12345L;
final Integer attemptNumber = 1;
final FailureReason failureReason = FailureHelper.unknownOriginFailure(t, jobId, attemptNumber);
assertEquals(FailureOrigin.UNKNOWN, failureReason.getFailureOrigin());
assertEquals("An unknown failure occurred", failureReason.getExternalMessage());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ properties:
- normalization
- dbt
- airbyte_platform
- unknown
failureType:
description: Categorizes well known errors into types for programmatic handling. If not set, the type of error is not well known.
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ public static final class Tags {
*/
public static final String DOCKER_IMAGE_KEY = "docker_image";

/**
* Name of the APM trace tag that holds the failure origin(s) associated with the trace.
*/
public static final String FAILURE_ORIGINS_KEY = "failure_origins";

/**
* Name of the APM trace tag that holds the job ID value associated with the trace.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public static String getReleaseStage(final ReleaseStage stage) {
}

public static String getFailureOrigin(final FailureOrigin origin) {
return origin != null ? origin.value() : UNKNOWN;
return origin != null ? origin.value() : FailureOrigin.UNKNOWN.value();
}

public static String getJobStatus(final JobStatus status) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.ATTEMPT_NUMBER_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.FAILURE_ORIGINS_KEY;
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;
import static io.airbyte.persistence.job.models.AttemptStatus.FAILED;

Expand All @@ -22,6 +23,7 @@
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.FailureReason;
import io.airbyte.config.FailureReason.FailureOrigin;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobOutput;
import io.airbyte.config.JobSyncConfig;
Expand Down Expand Up @@ -56,6 +58,7 @@
import io.airbyte.workers.run.TemporalWorkerRunFactory;
import io.airbyte.workers.run.WorkerRun;
import io.micronaut.context.annotation.Requires;
import io.micronaut.core.util.CollectionUtils;
import jakarta.inject.Singleton;
import java.io.IOException;
import java.nio.file.Path;
Expand All @@ -67,6 +70,7 @@
import java.util.OptionalLong;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;

@Slf4j
Expand Down Expand Up @@ -179,9 +183,8 @@ private void emitSrcIdDstIdToReleaseStagesMetric(final UUID srcId, final UUID ds
@Override
public AttemptCreationOutput createNewAttempt(final AttemptCreationInput input) throws RetryableException {
try {
ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, input.getJobId()));

final long jobId = input.getJobId();
ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, jobId));
final Job createdJob = jobPersistence.getJob(jobId);

final WorkerRun workerRun = temporalWorkerRunFactory.create(createdJob);
Expand All @@ -200,9 +203,8 @@ public AttemptCreationOutput createNewAttempt(final AttemptCreationInput input)
@Override
public AttemptNumberCreationOutput createNewAttemptNumber(final AttemptCreationInput input) throws RetryableException {
try {
ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, input.getJobId()));

final long jobId = input.getJobId();
ApmTraceUtils.addTagsToTrace(Map.of(JOB_ID_KEY, jobId));
final Job createdJob = jobPersistence.getJob(jobId);

final WorkerRun workerRun = temporalWorkerRunFactory.create(createdJob);
Expand All @@ -221,10 +223,9 @@ public AttemptNumberCreationOutput createNewAttemptNumber(final AttemptCreationI
@Override
public void jobSuccess(final JobSuccessInput input) {
try {
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptId(), JOB_ID_KEY, input.getJobId()));

final long jobId = input.getJobId();
final int attemptId = input.getAttemptId();
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, attemptId, JOB_ID_KEY, jobId));

if (input.getStandardSyncOutput() != null) {
final JobOutput jobOutput = new JobOutput().withSync(input.getStandardSyncOutput());
Expand Down Expand Up @@ -287,12 +288,13 @@ public void jobFailure(final JobFailureInput input) {
@Override
public void attemptFailure(final AttemptFailureInput input) {
try {
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptId(), JOB_ID_KEY, input.getJobId()));

final int attemptId = input.getAttemptId();
final long jobId = input.getJobId();
final AttemptFailureSummary failureSummary = input.getAttemptFailureSummary();

ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, attemptId, JOB_ID_KEY, jobId));
traceFailures(failureSummary);

jobPersistence.failAttempt(jobId, attemptId);
jobPersistence.writeAttemptFailureSummary(jobId, attemptId, failureSummary);

Expand All @@ -302,11 +304,7 @@ public void attemptFailure(final AttemptFailureInput input) {
}

emitJobIdToReleaseStagesMetric(OssMetricsRegistry.ATTEMPT_FAILED_BY_RELEASE_STAGE, jobId);
for (final FailureReason reason : failureSummary.getFailures()) {
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.ATTEMPT_FAILED_BY_FAILURE_ORIGIN, 1,
new MetricAttribute(MetricTags.FAILURE_ORIGIN, MetricTags.getFailureOrigin(reason.getFailureOrigin())));
}

trackFailures(failureSummary);
} catch (final IOException e) {
throw new RetryableException(e);
}
Expand All @@ -329,10 +327,9 @@ public void attemptFailureWithAttemptNumber(final AttemptNumberFailureInput inpu
@Override
public void jobCancelled(final JobCancelledInput input) {
try {
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, input.getAttemptId(), JOB_ID_KEY, input.getJobId()));

final long jobId = input.getJobId();
final int attemptId = input.getAttemptId();
ApmTraceUtils.addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, attemptId, JOB_ID_KEY, jobId));
jobPersistence.failAttempt(jobId, attemptId);
jobPersistence.writeAttemptFailureSummary(jobId, attemptId, input.getAttemptFailureSummary());
jobPersistence.cancelJob(jobId);
Expand Down Expand Up @@ -487,4 +484,37 @@ private void trackCompletionForInternalFailure(final Long jobId,
jobTracker.trackSyncForInternalFailure(jobId, connectionId, attemptId, Enums.convertTo(status, JobState.class), e);
}

/**
* Adds the failure origins to the APM trace.
*
* @param failureSummary The {@link AttemptFailureSummary} containing the failure reason(s).
*/
private void traceFailures(final AttemptFailureSummary failureSummary) {
if (failureSummary != null) {
if (CollectionUtils.isNotEmpty(failureSummary.getFailures())) {
ApmTraceUtils.addTagsToTrace(Map.of(FAILURE_ORIGINS_KEY, failureSummary.getFailures().stream().map(FailureReason::getFailureOrigin).map(
FailureOrigin::name).collect(Collectors.joining(","))));
}
} else {
ApmTraceUtils.addTagsToTrace(Map.of(FAILURE_ORIGINS_KEY, FailureOrigin.UNKNOWN.value()));
}
}

/**
* Records a metric for each failure reason.
*
* @param failureSummary The {@link AttemptFailureSummary} containing the failure reason(s).
*/
private void trackFailures(final AttemptFailureSummary failureSummary) {
if (failureSummary != null) {
for (final FailureReason reason : failureSummary.getFailures()) {
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.ATTEMPT_FAILED_BY_FAILURE_ORIGIN, 1,
new MetricAttribute(MetricTags.FAILURE_ORIGIN, MetricTags.getFailureOrigin(reason.getFailureOrigin())));
}
} else {
MetricClientFactory.getMetricClient().count(OssMetricsRegistry.ATTEMPT_FAILED_BY_FAILURE_ORIGIN, 1,
new MetricAttribute(MetricTags.FAILURE_ORIGIN, FailureOrigin.UNKNOWN.value()));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,16 @@ void setAttemptFailure() throws IOException {
verify(mJobPersistence).writeAttemptFailureSummary(JOB_ID, ATTEMPT_ID, failureSummary);
}

@Test
void setAttemptFailureManuallyTerminated() throws IOException {
jobCreationAndStatusUpdateActivity
.attemptFailure(new AttemptFailureInput(JOB_ID, ATTEMPT_ID, CONNECTION_ID, standardSyncOutput, null));

verify(mJobPersistence).failAttempt(JOB_ID, ATTEMPT_ID);
verify(mJobPersistence).writeOutput(JOB_ID, ATTEMPT_ID, jobOutput);
verify(mJobPersistence).writeAttemptFailureSummary(JOB_ID, ATTEMPT_ID, null);
}

@Test
void setAttemptFailureWrapException() throws IOException {
final Exception exception = new IOException(TEST_EXCEPTION_MESSAGE);
Expand Down