From c163cef219eeb15bac9f8ba56fb8b1f4230b71e0 Mon Sep 17 00:00:00 2001 From: git-phu Date: Fri, 25 Feb 2022 14:48:18 -0800 Subject: [PATCH] new failures metadata for segment tracking failure_reasons: array of all failures (as json objects) for a job - for general analytics on failures main_failure_reason: main failure reason (as json object) for this job - for operational usage (for Intercom) - currently this is just the first failure reason chronologically - we'll probably to change this when we have more data on how to determine failure reasons more intelligently - added an attempt_id to failures so we can group failures by attempt - removed stacktrace from failures since it's not clear how we'd use these in an analytics use case (and because segment has a 32kb size limit for events) --- .../java/io/airbyte/commons/json/Jsons.java | 5 +++ .../io/airbyte/commons/json/JsonsTest.java | 5 +++ .../job_tracker/TrackingMetadata.java | 34 +++++++++++++++++++ 3 files changed, 44 insertions(+) diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java b/airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java index e063d053adae..d3939a5b109d 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java @@ -14,6 +14,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectWriter; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.base.Charsets; import com.google.common.base.Preconditions; @@ -91,6 +92,10 @@ public static JsonNode emptyObject() { return jsonNode(Collections.emptyMap()); } + public static ArrayNode arrayNode() { + return OBJECT_MAPPER.createArrayNode(); + } + public static T object(final JsonNode jsonNode, final Class klass) { return OBJECT_MAPPER.convertValue(jsonNode, klass); } diff --git a/airbyte-commons/src/test/java/io/airbyte/commons/json/JsonsTest.java b/airbyte-commons/src/test/java/io/airbyte/commons/json/JsonsTest.java index d6ca6603bf2a..63a47fc577d6 100644 --- a/airbyte-commons/src/test/java/io/airbyte/commons/json/JsonsTest.java +++ b/airbyte-commons/src/test/java/io/airbyte/commons/json/JsonsTest.java @@ -134,6 +134,11 @@ void testEmptyObject() { assertEquals(Jsons.deserialize("{}"), Jsons.emptyObject()); } + @Test + void testArrayNode() { + assertEquals(Jsons.deserialize("[]"), Jsons.arrayNode()); + } + @Test void testToObject() { final ToClass expected = new ToClass("abc", 999, 888L); diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/TrackingMetadata.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/TrackingMetadata.java index 4de7b58b3c54..ff6acf6c6b16 100644 --- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/TrackingMetadata.java +++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/TrackingMetadata.java @@ -4,8 +4,12 @@ package io.airbyte.scheduler.persistence.job_tracker; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap.Builder; +import io.airbyte.commons.json.Jsons; import io.airbyte.config.JobOutput; import io.airbyte.config.ResourceRequirements; import io.airbyte.config.StandardDestinationDefinition; @@ -16,7 +20,9 @@ import io.airbyte.scheduler.models.Attempt; import io.airbyte.scheduler.models.Job; import java.util.List; +import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; import org.apache.logging.log4j.util.Strings; public class TrackingMetadata { @@ -102,9 +108,37 @@ public static ImmutableMap generateJobAttemptMetadata(final Job metadata.put("volume_rows", syncSummary.getRecordsSynced()); } } + + final ArrayNode failuresAsJson = failureReasonsAsJsonArray(attempts); + if (!failuresAsJson.isEmpty()) { + metadata.put("failure_reasons", failuresAsJson.toString()); + metadata.put("main_failure_reason", failuresAsJson.get(0).toString()); + } } } return metadata.build(); } + private static ArrayNode failureReasonsAsJsonArray(final List attempts) { + final ArrayNode failureJsonsArray = Jsons.arrayNode(); + + final IntStream attemptIndices = IntStream.range(0, attempts.size()); + attemptIndices.mapToObj(attemptIndex -> attempts.get(attemptIndex) + .getFailureSummary() + .map(failureSummary -> failureSummary.getFailures() + .stream() + .map(failureReason -> { + final JsonNode jsonNode = Jsons.jsonNode(failureReason); + ((ObjectNode) jsonNode).remove("stacktrace"); + ((ObjectNode) jsonNode).put("attempt_id", attemptIndex + 1); + return jsonNode; + }) + .toList())) + .filter(Optional::isPresent) + .map(Optional::get) + .forEach(failureJsonsArray::addAll); + + return failureJsonsArray; + } + }