diff --git a/airbyte-config/config-models/src/main/resources/types/NormalizationSummary.yaml b/airbyte-config/config-models/src/main/resources/types/NormalizationSummary.yaml index f7bb6dc158f2..96e1244da4cb 100644 --- a/airbyte-config/config-models/src/main/resources/types/NormalizationSummary.yaml +++ b/airbyte-config/config-models/src/main/resources/types/NormalizationSummary.yaml @@ -13,3 +13,7 @@ properties: type: integer endTime: type: integer + failures: + type: array + items: + "$ref": FailureReason.yaml diff --git a/airbyte-integrations/bases/base-normalization/Dockerfile b/airbyte-integrations/bases/base-normalization/Dockerfile index d6109600120e..609856a08a12 100644 --- a/airbyte-integrations/bases/base-normalization/Dockerfile +++ b/airbyte-integrations/bases/base-normalization/Dockerfile @@ -28,5 +28,5 @@ WORKDIR /airbyte ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh" ENTRYPOINT ["/airbyte/entrypoint.sh"] -LABEL io.airbyte.version=0.2.13 +LABEL io.airbyte.version=0.2.14 LABEL io.airbyte.name=airbyte/normalization diff --git a/airbyte-integrations/bases/base-normalization/entrypoint.sh b/airbyte-integrations/bases/base-normalization/entrypoint.sh index 733a6fbe0db7..a1df178483c2 100755 --- a/airbyte-integrations/bases/base-normalization/entrypoint.sh +++ b/airbyte-integrations/bases/base-normalization/entrypoint.sh @@ -123,10 +123,11 @@ function main() { set +e # allow script to continue running even if next commands fail to run properly # We don't run dbt 1.0.x on all destinations (because their plugins don't support it yet) # So we need to only pass `--event-buffer-size` if it's supported by DBT. + # Same goes for JSON formatted logging. check_dbt_event_buffer_size if [ "$ret" -eq 0 ]; then echo -e "\nDBT >=1.0.0 detected; using 10K event buffer size\n" - dbt_additional_args="--event-buffer-size=10000" + dbt_additional_args="--event-buffer-size=10000 --log-format json" else dbt_additional_args="" fi diff --git a/airbyte-integrations/bases/base-normalization/main_dev_transform_catalog.py b/airbyte-integrations/bases/base-normalization/main_dev_transform_catalog.py index 76b6c5f32af9..fa66e0c15e63 100644 --- a/airbyte-integrations/bases/base-normalization/main_dev_transform_catalog.py +++ b/airbyte-integrations/bases/base-normalization/main_dev_transform_catalog.py @@ -3,7 +3,19 @@ # +import logging + +from airbyte_cdk.exception_handler import init_uncaught_exception_handler +from airbyte_cdk.utils.traced_exception import AirbyteTracedException from normalization.transform_catalog.transform import main if __name__ == "__main__": - main() + init_uncaught_exception_handler(logging.getLogger("airbyte")) + try: + main() + except Exception as e: + msg = ( + "Something went wrong while normalizing the data moved in this sync " + + "(failed to transform catalog into dbt project). See the logs for more details." + ) + raise AirbyteTracedException.from_exception(e, message=msg) diff --git a/airbyte-integrations/bases/base-normalization/main_dev_transform_config.py b/airbyte-integrations/bases/base-normalization/main_dev_transform_config.py index 9aa690bad0d4..d4f650c863d2 100644 --- a/airbyte-integrations/bases/base-normalization/main_dev_transform_config.py +++ b/airbyte-integrations/bases/base-normalization/main_dev_transform_config.py @@ -3,7 +3,19 @@ # +import logging + +from airbyte_cdk.exception_handler import init_uncaught_exception_handler +from airbyte_cdk.utils.traced_exception import AirbyteTracedException from normalization.transform_config.transform import main if __name__ == "__main__": - main() + init_uncaught_exception_handler(logging.getLogger("airbyte")) + try: + main() + except Exception as e: + msg = ( + "Something went wrong while normalizing the data moved in this sync " + + "(failed to transform config for dbt project). See the logs for more details." + ) + raise AirbyteTracedException.from_exception(e, message=msg) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultNormalizationWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultNormalizationWorker.java index f962943281e9..cf9b6735f69c 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultNormalizationWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/general/DefaultNormalizationWorker.java @@ -5,14 +5,19 @@ package io.airbyte.workers.general; import io.airbyte.config.Configs.WorkerEnvironment; +import io.airbyte.config.FailureReason; import io.airbyte.config.NormalizationInput; import io.airbyte.config.NormalizationSummary; +import io.airbyte.protocol.models.AirbyteTraceMessage; import io.airbyte.workers.exception.WorkerException; +import io.airbyte.workers.helper.FailureHelper; import io.airbyte.workers.normalization.NormalizationRunner; import io.airbyte.workers.normalization.NormalizationWorker; import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang3.time.DurationFormatUtils; import org.slf4j.Logger; @@ -27,6 +32,8 @@ public class DefaultNormalizationWorker implements NormalizationWorker { private final int attempt; private final NormalizationRunner normalizationRunner; private final WorkerEnvironment workerEnvironment; + private final List traceFailureReasons = new ArrayList<>(); + private boolean failed = false; private final AtomicBoolean cancelled; @@ -58,10 +65,10 @@ public NormalizationSummary run(final NormalizationInput input, final Path jobRo if (!normalizationRunner.normalize(jobId, attempt, normalizationRoot, input.getDestinationConfiguration(), input.getCatalog(), input.getResourceRequirements())) { - throw new WorkerException("Normalization Failed."); + buildFailureReasonsAndSetFailure(); } } catch (final Exception e) { - throw new WorkerException("Normalization Failed.", e); + buildFailureReasonsAndSetFailure(); } if (cancelled.get()) { @@ -77,11 +84,25 @@ public NormalizationSummary run(final NormalizationInput input, final Path jobRo .withStartTime(startTime) .withEndTime(endTime); + if (!traceFailureReasons.isEmpty()) { + summary.setFailures(traceFailureReasons); + LOGGER.error("Normalization Failed."); + } else if (failed) { + throw new WorkerException("Normalization Failed."); + } + LOGGER.info("Normalization summary: {}", summary); return summary; } + private void buildFailureReasonsAndSetFailure() { + normalizationRunner.getTraceMessages() + .filter(traceMessage -> traceMessage.getType() == AirbyteTraceMessage.Type.ERROR) + .forEach(traceMessage -> traceFailureReasons.add(FailureHelper.normalizationFailure(traceMessage, Long.valueOf(jobId), attempt))); + failed = true; + } + @Override public void cancel() { LOGGER.info("Cancelling normalization runner..."); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/helper/FailureHelper.java b/airbyte-workers/src/main/java/io/airbyte/workers/helper/FailureHelper.java index 367b95db57ba..7874f5bfd988 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/helper/FailureHelper.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/helper/FailureHelper.java @@ -115,6 +115,12 @@ public static FailureReason normalizationFailure(final Throwable t, final Long j .withExternalMessage("Something went wrong during normalization"); } + public static FailureReason normalizationFailure(final AirbyteTraceMessage m, final Long jobId, final Integer attemptNumber) { + return genericFailure(m, jobId, attemptNumber) + .withFailureOrigin(FailureOrigin.NORMALIZATION) + .withExternalMessage(m.getError().getMessage()); + } + public static FailureReason dbtFailure(final Throwable t, final Long jobId, final Integer attemptNumber) { return genericFailure(t, jobId, attemptNumber) .withFailureOrigin(FailureOrigin.DBT) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java index 9850255c19a3..dc7ec6c03f9e 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java @@ -8,6 +8,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.io.IOs; import io.airbyte.commons.io.LineGobbler; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.logging.LoggingHelper.Color; @@ -15,6 +16,11 @@ import io.airbyte.commons.logging.MdcScope.Builder; import io.airbyte.config.OperatorDbt; import io.airbyte.config.ResourceRequirements; +import io.airbyte.protocol.models.AirbyteErrorTraceMessage; +import io.airbyte.protocol.models.AirbyteErrorTraceMessage.FailureType; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteMessage.Type; +import io.airbyte.protocol.models.AirbyteTraceMessage; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.workers.WorkerConfigs; import io.airbyte.workers.WorkerConstants; @@ -22,10 +28,14 @@ import io.airbyte.workers.exception.WorkerException; import io.airbyte.workers.process.AirbyteIntegrationLauncher; import io.airbyte.workers.process.ProcessFactory; +import java.io.InputStream; import java.nio.file.Path; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +50,8 @@ public class DefaultNormalizationRunner implements NormalizationRunner { private final DestinationType destinationType; private final ProcessFactory processFactory; private final String normalizationImageName; + private final NormalizationAirbyteStreamFactory streamFactory = new NormalizationAirbyteStreamFactory(CONTAINER_LOG_MDC_BUILDER); + private Map> airbyteMessagesByType; private Process process = null; @@ -135,7 +147,30 @@ private boolean runProcess(final String jobId, Collections.emptyMap(), args); - LineGobbler.gobble(process.getInputStream(), LOGGER::info, CONTAINER_LOG_MDC_BUILDER); + try (final InputStream stdout = process.getInputStream()) { + // finds and collects any AirbyteMessages from stdout + // also builds a list of raw dbt errors and stores in streamFactory + airbyteMessagesByType = streamFactory.create(IOs.newBufferedReader(stdout)) + .collect(Collectors.groupingBy(AirbyteMessage::getType)); + + // picks up error logs from dbt + String dbtErrorStack = String.join("\n\t", streamFactory.getDbtErrors()); + + if (!"".equals(dbtErrorStack)) { + AirbyteMessage dbtTraceMessage = new AirbyteMessage() + .withType(Type.TRACE) + .withTrace(new AirbyteTraceMessage() + .withType(AirbyteTraceMessage.Type.ERROR) + .withEmittedAt((double) System.currentTimeMillis()) + .withError(new AirbyteErrorTraceMessage() + .withFailureType(FailureType.SYSTEM_ERROR) // TODO: decide on best FailureType for this + .withMessage("Normalization failed during the dbt run. This may indicate a problem with the data itself.") + .withInternalMessage(dbtErrorStack) + .withStackTrace(dbtErrorStack))); + + airbyteMessagesByType.putIfAbsent(Type.TRACE, List.of(dbtTraceMessage)); + } + } LineGobbler.gobble(process.getErrorStream(), LOGGER::error, CONTAINER_LOG_MDC_BUILDER); WorkerUtils.wait(process); @@ -163,6 +198,14 @@ public void close() throws Exception { } } + @Override + public Stream getTraceMessages() { + if (airbyteMessagesByType != null && airbyteMessagesByType.get(Type.TRACE) != null) { + return airbyteMessagesByType.get(Type.TRACE).stream().map(AirbyteMessage::getTrace); + } + return Stream.empty(); + } + @VisibleForTesting DestinationType getDestinationType() { return destinationType; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationAirbyteStreamFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationAirbyteStreamFactory.java new file mode 100644 index 000000000000..556f5492b1b4 --- /dev/null +++ b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationAirbyteStreamFactory.java @@ -0,0 +1,128 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.normalization; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeType; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.logging.MdcScope; +import io.airbyte.protocol.models.AirbyteLogMessage; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.workers.internal.AirbyteStreamFactory; +import java.io.BufferedReader; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.stream.Stream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Creates a stream from an input stream. The produced stream attempts to parse each line of the + * InputStream into a AirbyteMessage. If the line cannot be parsed into a AirbyteMessage it is + * assumed to be from dbt. dbt [error] messages are also parsed + * + *

+ * If a line starts with a AirbyteMessage and then has other characters after it, that + * AirbyteMessage will still be parsed. If there are multiple AirbyteMessage records on the same + * line, only the first will be parsed. + */ +public class NormalizationAirbyteStreamFactory implements AirbyteStreamFactory { + + private static final Logger LOGGER = LoggerFactory.getLogger(NormalizationAirbyteStreamFactory.class); + + private final MdcScope.Builder containerLogMdcBuilder; + private final Logger logger; + private final List dbtErrors = new ArrayList<>(); + + public NormalizationAirbyteStreamFactory(final MdcScope.Builder containerLogMdcBuilder) { + this(LOGGER, containerLogMdcBuilder); + } + + NormalizationAirbyteStreamFactory(final Logger logger, final MdcScope.Builder containerLogMdcBuilder) { + this.logger = logger; + this.containerLogMdcBuilder = containerLogMdcBuilder; + } + + @Override + public Stream create(final BufferedReader bufferedReader) { + return bufferedReader + .lines() + .flatMap(this::filterOutAndHandleNonJsonLines) + .flatMap(this::filterOutAndHandleNonAirbyteMessageLines) + // so now we are just left with AirbyteMessages + .filter(airbyteMessage -> { + final boolean isLog = airbyteMessage.getType() == AirbyteMessage.Type.LOG; + if (isLog) { + try (final var mdcScope = containerLogMdcBuilder.build()) { + internalLog(airbyteMessage.getLog()); + } + } + return !isLog; + }); + } + + private Stream filterOutAndHandleNonJsonLines(String line) { + final Optional jsonLine = Jsons.tryDeserialize(line); + if (jsonLine.isEmpty()) { + // we log as info all the lines that are not valid json. + try (final var mdcScope = containerLogMdcBuilder.build()) { + logger.info(line); + // this is really hacky and vulnerable to picking up lines we don't want, + // however it is only for destinations that are using dbt version < 1.0. + // For v1 + we switch on JSON logging and parse those in the next block. + if (line.contains("[error]")) { + dbtErrors.add(line); + } + } + } + return jsonLine.stream(); + } + + private Stream filterOutAndHandleNonAirbyteMessageLines(JsonNode jsonLine) { + final Optional m = Jsons.tryObject(jsonLine, AirbyteMessage.class); + if (m.isEmpty()) { + // valid JSON but not an AirbyteMessage, so we assume this is a dbt json log + try { + final String logLevel = (jsonLine.getNodeType() == JsonNodeType.NULL || jsonLine.get("level").isNull()) + ? "" + : jsonLine.get("level").asText(); + final String logMsg = jsonLine.get("msg").isNull() ? "" : jsonLine.get("msg").asText(); + try (final var mdcScope = containerLogMdcBuilder.build()) { + switch (logLevel) { + case "debug" -> logger.debug(logMsg); + case "info" -> logger.info(logMsg); + case "warn" -> logger.warn(logMsg); + case "error" -> logAndCollectErrorMessage(logMsg); + default -> logger.info(jsonLine.asText()); // this shouldn't happen but logging it to avoid hiding unexpected lines. + } + } + } catch (final Exception e) { + logger.info(jsonLine.asText()); + } + } + return m.stream(); + } + + private void logAndCollectErrorMessage(String logMessage) { + logger.error(logMessage); + dbtErrors.add(logMessage); + } + + public List getDbtErrors() { + return dbtErrors; + } + + private void internalLog(final AirbyteLogMessage logMessage) { + switch (logMessage.getLevel()) { + case FATAL, ERROR -> logger.error(logMessage.getMessage()); + case WARN -> logger.warn(logMessage.getMessage()); + case DEBUG -> logger.debug(logMessage.getMessage()); + case TRACE -> logger.trace(logMessage.getMessage()); + default -> logger.info(logMessage.getMessage()); + } + } + +} diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunner.java b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunner.java index 8d66f64f8e82..dcb2f17f5066 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunner.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunner.java @@ -7,8 +7,10 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.config.OperatorDbt; import io.airbyte.config.ResourceRequirements; +import io.airbyte.protocol.models.AirbyteTraceMessage; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import java.nio.file.Path; +import java.util.stream.Stream; public interface NormalizationRunner extends AutoCloseable { @@ -62,4 +64,6 @@ boolean normalize(String jobId, ResourceRequirements resourceRequirements) throws Exception; + Stream getTraceMessages(); + } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java index e726ca3dc485..2c6b15d98243 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/NormalizationRunnerFactory.java @@ -14,7 +14,7 @@ public class NormalizationRunnerFactory { public static final String BASE_NORMALIZATION_IMAGE_NAME = "airbyte/normalization"; - public static final String NORMALIZATION_VERSION = "0.2.13"; + public static final String NORMALIZATION_VERSION = "0.2.14"; static final Map> NORMALIZATION_MAPPING = ImmutableMap.>builder() diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index da3cedc2d69c..0e6f1c54f4bf 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -10,6 +10,7 @@ import io.airbyte.config.EnvConfigs; import io.airbyte.config.FailureReason; import io.airbyte.config.FailureReason.FailureType; +import io.airbyte.config.NormalizationSummary; import io.airbyte.config.StandardCheckConnectionInput; import io.airbyte.config.StandardCheckConnectionOutput; import io.airbyte.config.StandardSyncInput; @@ -758,6 +759,14 @@ private boolean getFailStatus(final StandardSyncOutput standardSyncOutput) { return true; } + // catch normalization failure reasons + final NormalizationSummary normalizationSummary = standardSyncOutput.getNormalizationSummary(); + if (normalizationSummary != null && normalizationSummary.getFailures() != null && + !normalizationSummary.getFailures().isEmpty()) { + workflowInternalState.getFailures().addAll(normalizationSummary.getFailures()); + return true; + } + return false; } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultNormalizationWorkerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultNormalizationWorkerTest.java index db34bd4e3730..c4f8e2fc20dd 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultNormalizationWorkerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultNormalizationWorkerTest.java @@ -4,22 +4,30 @@ package io.airbyte.workers.general; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.EnvConfigs; +import io.airbyte.config.FailureReason.FailureOrigin; import io.airbyte.config.NormalizationInput; import io.airbyte.config.NormalizationSummary; import io.airbyte.config.StandardSync; import io.airbyte.config.StandardSyncInput; +import io.airbyte.protocol.models.AirbyteTraceMessage; import io.airbyte.workers.TestConfigHelpers; import io.airbyte.workers.WorkerConfigs; +import io.airbyte.workers.exception.WorkerException; +import io.airbyte.workers.internal.AirbyteMessageUtils; import io.airbyte.workers.normalization.NormalizationRunner; import java.nio.file.Files; import java.nio.file.Path; +import java.util.stream.Stream; import org.apache.commons.lang3.tuple.ImmutablePair; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -29,6 +37,8 @@ class DefaultNormalizationWorkerTest { private static final String JOB_ID = "0"; private static final int JOB_ATTEMPT = 0; private static final Path WORKSPACE_ROOT = Path.of("workspaces/10"); + private static final AirbyteTraceMessage ERROR_TRACE_MESSAGE = + AirbyteMessageUtils.createErrorTraceMessage("a normalization error occurred", 123.0); private WorkerConfigs workerConfigs; private Path jobRoot; @@ -78,4 +88,58 @@ void test() throws Exception { assertNotNull(normalizationOutput.getEndTime()); } + // This test verifies the expected behaviour prior to adding TRACE message handling + // if no TRACE messages are emitted we should throw a WorkerException as before + @Test + void testFailure() throws Exception { + when(normalizationRunner.normalize(JOB_ID, + JOB_ATTEMPT, + normalizationRoot, + normalizationInput.getDestinationConfiguration(), + normalizationInput.getCatalog(), workerConfigs.getResourceRequirements())) + .thenReturn(false); + + final DefaultNormalizationWorker normalizationWorker = + new DefaultNormalizationWorker(JOB_ID, JOB_ATTEMPT, normalizationRunner, WorkerEnvironment.DOCKER); + + assertThrows(WorkerException.class, () -> normalizationWorker.run(normalizationInput, jobRoot)); + + verify(normalizationRunner).start(); + } + + // This test verifies failure behaviour when we have TRACE messages emitted from normalization + // instead of throwing an exception, we should return the summary with a non-empty FailureReasons + // array + @Test + void testFailureWithTraceMessage() throws Exception { + when(normalizationRunner.normalize(JOB_ID, + JOB_ATTEMPT, + normalizationRoot, + normalizationInput.getDestinationConfiguration(), + normalizationInput.getCatalog(), workerConfigs.getResourceRequirements())) + .thenReturn(false); + + when(normalizationRunner.getTraceMessages()).thenReturn(Stream.of(ERROR_TRACE_MESSAGE)); + + final DefaultNormalizationWorker normalizationWorker = + new DefaultNormalizationWorker(JOB_ID, JOB_ATTEMPT, normalizationRunner, WorkerEnvironment.DOCKER); + + final NormalizationSummary normalizationOutput = normalizationWorker.run(normalizationInput, jobRoot); + + verify(normalizationRunner).start(); + verify(normalizationRunner).normalize( + JOB_ID, + JOB_ATTEMPT, + normalizationRoot, + normalizationInput.getDestinationConfiguration(), + normalizationInput.getCatalog(), workerConfigs.getResourceRequirements()); + verify(normalizationRunner).close(); + assertNotNull(normalizationOutput.getStartTime()); + assertNotNull(normalizationOutput.getEndTime()); + assertFalse(normalizationOutput.getFailures().isEmpty()); + assertTrue(normalizationOutput.getFailures().stream() + .anyMatch(f -> f.getFailureOrigin().equals(FailureOrigin.NORMALIZATION) + && f.getExternalMessage().contains(ERROR_TRACE_MESSAGE.getError().getMessage()))); + } + } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/normalization/DefaultNormalizationRunnerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/normalization/DefaultNormalizationRunnerTest.java index de2641d5a0f0..e6963918ccad 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/normalization/DefaultNormalizationRunnerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/normalization/DefaultNormalizationRunnerTest.java @@ -5,9 +5,10 @@ package io.airbyte.workers.normalization; import static io.airbyte.commons.logging.LoggingHelper.RESET; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -38,10 +39,13 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -@SuppressWarnings("PMD.AvoidPrintStackTrace") class DefaultNormalizationRunnerTest { + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultNormalizationRunnerTest.class); + private static final String JOB_ID = "0"; private static final int JOB_ATTEMPT = 0; @@ -52,7 +56,7 @@ class DefaultNormalizationRunnerTest { logJobRoot = Files.createTempDirectory(Path.of("/tmp"), "mdc_test"); LogClientSingleton.getInstance().setJobMdc(WorkerEnvironment.DOCKER, LogConfigs.EMPTY, logJobRoot); } catch (final IOException e) { - e.printStackTrace(); + LOGGER.error(e.getMessage()); } } @@ -94,7 +98,7 @@ void setup() throws IOException, WorkerException { } @AfterEach - void tearDown() throws IOException { + public void tearDown() throws IOException { // The log file needs to be present and empty final Path logFile = logJobRoot.resolve(LogClientSingleton.LOG_FILENAME); if (Files.exists(logFile)) { @@ -150,16 +154,90 @@ void testClose() throws Exception { } @Test - void testFailure() { - doThrow(new RuntimeException()).when(process).exitValue(); + void testFailure() throws Exception { + when(process.exitValue()).thenReturn(1); + + final NormalizationRunner runner = + new DefaultNormalizationRunner(workerConfigs, DestinationType.BIGQUERY, processFactory, + NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME); + assertFalse(runner.normalize(JOB_ID, JOB_ATTEMPT, jobRoot, config, catalog, workerConfigs.getResourceRequirements())); + + verify(process).waitFor(); + + assertThrows(WorkerException.class, runner::close); + } + + @Test + void testFailureWithTraceMessage() throws Exception { + when(process.exitValue()).thenReturn(1); + + String errorTraceString = """ + {"type": "TRACE", "trace": { + "type": "ERROR", "emitted_at": 123.0, "error": { + "message": "Something went wrong in normalization.", "internal_message": "internal msg", + "stack_trace": "abc.xyz", "failure_type": "system_error"}}} + """.replace("\n", ""); + when(process.getInputStream()).thenReturn(new ByteArrayInputStream(errorTraceString.getBytes(StandardCharsets.UTF_8))); + + final NormalizationRunner runner = + new DefaultNormalizationRunner(workerConfigs, DestinationType.BIGQUERY, processFactory, + NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME); + assertFalse(runner.normalize(JOB_ID, JOB_ATTEMPT, jobRoot, config, catalog, workerConfigs.getResourceRequirements())); + + assertEquals(1, runner.getTraceMessages().count()); + + verify(process).waitFor(); + + assertThrows(WorkerException.class, runner::close); + } + + @Test + void testFailureWithDbtError() throws Exception { + when(process.exitValue()).thenReturn(1); + + String dbtErrorString = """ + [info ] [MainThread]: Completed with 1 error and 0 warnings: + [info ] [MainThread]: + [error] [MainThread]: Database Error in model xyz (models/generated/airbyte_incremental/abc/xyz.sql) + [error] [MainThread]: 1292 (22007): Truncated incorrect DOUBLE value: 'ABC' + [error] [MainThread]: compiled SQL at ../build/run/airbyte_utils/models/generated/airbyte_incremental/abc/xyz.sql + [info ] [MainThread]: + [info ] [MainThread]: Done. PASS=1 WARN=0 ERROR=1 SKIP=0 TOTAL=2 + """; + when(process.getInputStream()).thenReturn(new ByteArrayInputStream(dbtErrorString.getBytes(StandardCharsets.UTF_8))); final NormalizationRunner runner = new DefaultNormalizationRunner(workerConfigs, DestinationType.BIGQUERY, processFactory, NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME); - assertThrows(RuntimeException.class, - () -> runner.normalize(JOB_ID, JOB_ATTEMPT, jobRoot, config, catalog, workerConfigs.getResourceRequirements())); + assertFalse(runner.normalize(JOB_ID, JOB_ATTEMPT, jobRoot, config, catalog, workerConfigs.getResourceRequirements())); + + assertEquals(1, runner.getTraceMessages().count()); + + verify(process).waitFor(); + + assertThrows(WorkerException.class, runner::close); + } + + @Test + void testFailureWithDbtErrorJsonFormat() throws Exception { + when(process.exitValue()).thenReturn(1); + + String dbtErrorString = + """ + {"code": "Q035", "data": {"description": "table model public.start_products", "execution_time": 0.1729569435119629, "index": 1, "status": "error", "total": 2}, "invocation_id": "6ada8ee5-11c1-4239-8bd0-7e45178217c5", "level": "error", "log_version": 1, "msg": "1 of 2 ERROR creating table model public.start_products................................................................. [\\u001b[31mERROR\\u001b[0m in 0.17s]", "node_info": {"materialized": "table", "node_finished_at": null, "node_name": "start_products", "node_path": "generated/airbyte_incremental/public/start_products.sql", "node_started_at": "2022-07-18T15:04:27.036328", "node_status": "compiling", "resource_type": "model", "type": "node_status", "unique_id": "model.airbyte_utils.start_products"}, "pid": 14, "thread_name": "Thread-1", "ts": "2022-07-18T15:04:27.215077Z", "type": "log_line"} + """; + when(process.getInputStream()).thenReturn(new ByteArrayInputStream(dbtErrorString.getBytes(StandardCharsets.UTF_8))); + + final NormalizationRunner runner = + new DefaultNormalizationRunner(workerConfigs, DestinationType.BIGQUERY, processFactory, + NormalizationRunnerFactory.BASE_NORMALIZATION_IMAGE_NAME); + assertFalse(runner.normalize(JOB_ID, JOB_ATTEMPT, jobRoot, config, catalog, workerConfigs.getResourceRequirements())); + + assertEquals(1, runner.getTraceMessages().count()); + + verify(process).waitFor(); - verify(process).destroy(); + assertThrows(WorkerException.class, runner::close); } } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java index f63b2d275680..af981135fb9a 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java @@ -44,6 +44,7 @@ import io.airbyte.workers.temporal.scheduling.testsyncworkflow.DbtFailureSyncWorkflow; import io.airbyte.workers.temporal.scheduling.testsyncworkflow.EmptySyncWorkflow; import io.airbyte.workers.temporal.scheduling.testsyncworkflow.NormalizationFailureSyncWorkflow; +import io.airbyte.workers.temporal.scheduling.testsyncworkflow.NormalizationTraceFailureSyncWorkflow; import io.airbyte.workers.temporal.scheduling.testsyncworkflow.PersistFailureSyncWorkflow; import io.airbyte.workers.temporal.scheduling.testsyncworkflow.ReplicateFailureSyncWorkflow; import io.airbyte.workers.temporal.scheduling.testsyncworkflow.SleepingSyncWorkflow; @@ -1188,6 +1189,40 @@ void testNormalizationFailure() throws InterruptedException { .attemptFailureWithAttemptNumber(Mockito.argThat(new HasFailureFromOrigin(FailureOrigin.NORMALIZATION))); } + @Test + @Timeout(value = 10, + unit = TimeUnit.SECONDS) + @DisplayName("Test that normalization trace failure is recorded") + void testNormalizationTraceFailure() throws InterruptedException { + final Worker syncWorker = testEnv.newWorker(TemporalJobType.SYNC.name()); + syncWorker.registerWorkflowImplementationTypes(NormalizationTraceFailureSyncWorkflow.class); + + testEnv.start(); + + final UUID testId = UUID.randomUUID(); + final TestStateListener testStateListener = new TestStateListener(); + final WorkflowState workflowState = new WorkflowState(testId, testStateListener); + final ConnectionUpdaterInput input = ConnectionUpdaterInput.builder() + .connectionId(UUID.randomUUID()) + .jobId(JOB_ID) + .attemptId(ATTEMPT_ID) + .fromFailure(false) + .attemptNumber(1) + .workflowState(workflowState) + .build(); + + startWorkflowAndWaitUntilReady(workflow, input); + + // wait for workflow to initialize + testEnv.sleep(Duration.ofMinutes(1)); + + workflow.submitManualSync(); + Thread.sleep(500); // any time after no-waiting manual run + + Mockito.verify(mJobCreationAndStatusUpdateActivity) + .attemptFailureWithAttemptNumber(Mockito.argThat(new HasFailureFromOrigin(FailureOrigin.NORMALIZATION))); + } + @Test @Timeout(value = 10, unit = TimeUnit.SECONDS) diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/testsyncworkflow/NormalizationTraceFailureSyncWorkflow.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/testsyncworkflow/NormalizationTraceFailureSyncWorkflow.java new file mode 100644 index 000000000000..84cc7bd49ab1 --- /dev/null +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/testsyncworkflow/NormalizationTraceFailureSyncWorkflow.java @@ -0,0 +1,42 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.workers.temporal.scheduling.testsyncworkflow; + +import com.google.common.annotations.VisibleForTesting; +import io.airbyte.config.FailureReason; +import io.airbyte.config.FailureReason.FailureOrigin; +import io.airbyte.config.NormalizationSummary; +import io.airbyte.config.StandardSyncInput; +import io.airbyte.config.StandardSyncOutput; +import io.airbyte.scheduler.models.IntegrationLauncherConfig; +import io.airbyte.scheduler.models.JobRunConfig; +import io.airbyte.workers.temporal.sync.SyncWorkflow; +import java.util.List; +import java.util.UUID; + +public class NormalizationTraceFailureSyncWorkflow implements SyncWorkflow { + + // Should match activity types from FailureHelper.java + + @VisibleForTesting + public static final FailureReason FAILURE_REASON = new FailureReason() + .withFailureOrigin(FailureOrigin.NORMALIZATION) + .withTimestamp(System.currentTimeMillis()); + + @Override + public StandardSyncOutput run(final JobRunConfig jobRunConfig, + final IntegrationLauncherConfig sourceLauncherConfig, + final IntegrationLauncherConfig destinationLauncherConfig, + final StandardSyncInput syncInput, + final UUID connectionId) { + + return new StandardSyncOutput() + .withNormalizationSummary(new NormalizationSummary() + .withFailures(List.of(FAILURE_REASON)) + .withStartTime(System.currentTimeMillis() - 1000) + .withEndTime(System.currentTimeMillis())); + } + +} diff --git a/docs/understanding-airbyte/basic-normalization.md b/docs/understanding-airbyte/basic-normalization.md index a34b7d776cf0..80be6f45ceca 100644 --- a/docs/understanding-airbyte/basic-normalization.md +++ b/docs/understanding-airbyte/basic-normalization.md @@ -353,6 +353,7 @@ Therefore, in order to "upgrade" to the desired normalization version, you need | Airbyte Version | Normalization Version | Date | Pull Request | Subject | |:----------------|:----------------------|:-----------| :--- |:---------------------------------------------------------------------------| +| | 0.2.14 | 2022-08-01 | [\#14790](https://github.com/airbytehq/airbyte/pull/14790) | Add and persist job failures for Normalization | | | 0.2.13 | 2022-07-27 | [\#14683](https://github.com/airbytehq/airbyte/pull/14683) | Quote schema name to allow reserved keywords | | | 0.2.12 | 2022-07-26 | [\#14362](https://github.com/airbytehq/airbyte/pull/14362) | Handle timezone in date-time format. Parse date correct in clickhouse. | | | 0.2.11 | 2022-07-26 | [\#13591](https://github.com/airbytehq/airbyte/pull/13591) | Updated support for integer columns. |