Skip to content

Commit

Permalink
Add and persist job failures for Normalization (airbytehq#14790)
Browse files Browse the repository at this point in the history
* added TracedException and uncaught exception handler

* added trace message capturing

* added tests for TRACE messages

* pre-json logging

* propagating normalization failures

* log format json & fix hang

* parsing dbt json logs

* bump normalization version

* tests

* Benoit comments

* update trace exception user message

* review comments

* bump version

* bump version

* review comments

* nit comments

* add normalization trace failure test

* version bump

* pmd

* formatto

* bump version
  • Loading branch information
Phlair authored and UsmanAli99 committed Aug 3, 2022
1 parent 9793d84 commit f706a0a
Show file tree
Hide file tree
Showing 17 changed files with 477 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,7 @@ properties:
type: integer
endTime:
type: integer
failures:
type: array
items:
"$ref": FailureReason.yaml
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ WORKDIR /airbyte
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.2.13
LABEL io.airbyte.version=0.2.14
LABEL io.airbyte.name=airbyte/normalization
3 changes: 2 additions & 1 deletion airbyte-integrations/bases/base-normalization/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,11 @@ function main() {
set +e # allow script to continue running even if next commands fail to run properly
# We don't run dbt 1.0.x on all destinations (because their plugins don't support it yet)
# So we need to only pass `--event-buffer-size` if it's supported by DBT.
# Same goes for JSON formatted logging.
check_dbt_event_buffer_size
if [ "$ret" -eq 0 ]; then
echo -e "\nDBT >=1.0.0 detected; using 10K event buffer size\n"
dbt_additional_args="--event-buffer-size=10000"
dbt_additional_args="--event-buffer-size=10000 --log-format json"
else
dbt_additional_args=""
fi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,19 @@
#


import logging

from airbyte_cdk.exception_handler import init_uncaught_exception_handler
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from normalization.transform_catalog.transform import main

if __name__ == "__main__":
main()
init_uncaught_exception_handler(logging.getLogger("airbyte"))
try:
main()
except Exception as e:
msg = (
"Something went wrong while normalizing the data moved in this sync "
+ "(failed to transform catalog into dbt project). See the logs for more details."
)
raise AirbyteTracedException.from_exception(e, message=msg)
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,19 @@
#


import logging

from airbyte_cdk.exception_handler import init_uncaught_exception_handler
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from normalization.transform_config.transform import main

if __name__ == "__main__":
main()
init_uncaught_exception_handler(logging.getLogger("airbyte"))
try:
main()
except Exception as e:
msg = (
"Something went wrong while normalizing the data moved in this sync "
+ "(failed to transform config for dbt project). See the logs for more details."
)
raise AirbyteTracedException.from_exception(e, message=msg)
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@
package io.airbyte.workers.general;

import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.FailureReason;
import io.airbyte.config.NormalizationInput;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.protocol.models.AirbyteTraceMessage;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.helper.FailureHelper;
import io.airbyte.workers.normalization.NormalizationRunner;
import io.airbyte.workers.normalization.NormalizationWorker;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.slf4j.Logger;
Expand All @@ -27,6 +32,8 @@ public class DefaultNormalizationWorker implements NormalizationWorker {
private final int attempt;
private final NormalizationRunner normalizationRunner;
private final WorkerEnvironment workerEnvironment;
private final List<FailureReason> traceFailureReasons = new ArrayList<>();
private boolean failed = false;

private final AtomicBoolean cancelled;

Expand Down Expand Up @@ -58,10 +65,10 @@ public NormalizationSummary run(final NormalizationInput input, final Path jobRo

if (!normalizationRunner.normalize(jobId, attempt, normalizationRoot, input.getDestinationConfiguration(), input.getCatalog(),
input.getResourceRequirements())) {
throw new WorkerException("Normalization Failed.");
buildFailureReasonsAndSetFailure();
}
} catch (final Exception e) {
throw new WorkerException("Normalization Failed.", e);
buildFailureReasonsAndSetFailure();
}

if (cancelled.get()) {
Expand All @@ -77,11 +84,25 @@ public NormalizationSummary run(final NormalizationInput input, final Path jobRo
.withStartTime(startTime)
.withEndTime(endTime);

if (!traceFailureReasons.isEmpty()) {
summary.setFailures(traceFailureReasons);
LOGGER.error("Normalization Failed.");
} else if (failed) {
throw new WorkerException("Normalization Failed.");
}

LOGGER.info("Normalization summary: {}", summary);

return summary;
}

private void buildFailureReasonsAndSetFailure() {
normalizationRunner.getTraceMessages()
.filter(traceMessage -> traceMessage.getType() == AirbyteTraceMessage.Type.ERROR)
.forEach(traceMessage -> traceFailureReasons.add(FailureHelper.normalizationFailure(traceMessage, Long.valueOf(jobId), attempt)));
failed = true;
}

@Override
public void cancel() {
LOGGER.info("Cancelling normalization runner...");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ public static FailureReason normalizationFailure(final Throwable t, final Long j
.withExternalMessage("Something went wrong during normalization");
}

public static FailureReason normalizationFailure(final AirbyteTraceMessage m, final Long jobId, final Integer attemptNumber) {
return genericFailure(m, jobId, attemptNumber)
.withFailureOrigin(FailureOrigin.NORMALIZATION)
.withExternalMessage(m.getError().getMessage());
}

public static FailureReason dbtFailure(final Throwable t, final Long jobId, final Integer attemptNumber) {
return genericFailure(t, jobId, attemptNumber)
.withFailureOrigin(FailureOrigin.DBT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,34 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.logging.LoggingHelper.Color;
import io.airbyte.commons.logging.MdcScope;
import io.airbyte.commons.logging.MdcScope.Builder;
import io.airbyte.config.OperatorDbt;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.protocol.models.AirbyteErrorTraceMessage;
import io.airbyte.protocol.models.AirbyteErrorTraceMessage.FailureType;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteTraceMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
import io.airbyte.workers.process.ProcessFactory;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,6 +50,8 @@ public class DefaultNormalizationRunner implements NormalizationRunner {
private final DestinationType destinationType;
private final ProcessFactory processFactory;
private final String normalizationImageName;
private final NormalizationAirbyteStreamFactory streamFactory = new NormalizationAirbyteStreamFactory(CONTAINER_LOG_MDC_BUILDER);
private Map<Type, List<AirbyteMessage>> airbyteMessagesByType;

private Process process = null;

Expand Down Expand Up @@ -135,7 +147,30 @@ private boolean runProcess(final String jobId,
Collections.emptyMap(),
args);

LineGobbler.gobble(process.getInputStream(), LOGGER::info, CONTAINER_LOG_MDC_BUILDER);
try (final InputStream stdout = process.getInputStream()) {
// finds and collects any AirbyteMessages from stdout
// also builds a list of raw dbt errors and stores in streamFactory
airbyteMessagesByType = streamFactory.create(IOs.newBufferedReader(stdout))
.collect(Collectors.groupingBy(AirbyteMessage::getType));

// picks up error logs from dbt
String dbtErrorStack = String.join("\n\t", streamFactory.getDbtErrors());

if (!"".equals(dbtErrorStack)) {
AirbyteMessage dbtTraceMessage = new AirbyteMessage()
.withType(Type.TRACE)
.withTrace(new AirbyteTraceMessage()
.withType(AirbyteTraceMessage.Type.ERROR)
.withEmittedAt((double) System.currentTimeMillis())
.withError(new AirbyteErrorTraceMessage()
.withFailureType(FailureType.SYSTEM_ERROR) // TODO: decide on best FailureType for this
.withMessage("Normalization failed during the dbt run. This may indicate a problem with the data itself.")
.withInternalMessage(dbtErrorStack)
.withStackTrace(dbtErrorStack)));

airbyteMessagesByType.putIfAbsent(Type.TRACE, List.of(dbtTraceMessage));
}
}
LineGobbler.gobble(process.getErrorStream(), LOGGER::error, CONTAINER_LOG_MDC_BUILDER);

WorkerUtils.wait(process);
Expand Down Expand Up @@ -163,6 +198,14 @@ public void close() throws Exception {
}
}

@Override
public Stream<AirbyteTraceMessage> getTraceMessages() {
if (airbyteMessagesByType != null && airbyteMessagesByType.get(Type.TRACE) != null) {
return airbyteMessagesByType.get(Type.TRACE).stream().map(AirbyteMessage::getTrace);
}
return Stream.empty();
}

@VisibleForTesting
DestinationType getDestinationType() {
return destinationType;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.normalization;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeType;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.logging.MdcScope;
import io.airbyte.protocol.models.AirbyteLogMessage;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.workers.internal.AirbyteStreamFactory;
import java.io.BufferedReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Creates a stream from an input stream. The produced stream attempts to parse each line of the
* InputStream into a AirbyteMessage. If the line cannot be parsed into a AirbyteMessage it is
* assumed to be from dbt. dbt [error] messages are also parsed
*
* <p>
* If a line starts with a AirbyteMessage and then has other characters after it, that
* AirbyteMessage will still be parsed. If there are multiple AirbyteMessage records on the same
* line, only the first will be parsed.
*/
public class NormalizationAirbyteStreamFactory implements AirbyteStreamFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(NormalizationAirbyteStreamFactory.class);

private final MdcScope.Builder containerLogMdcBuilder;
private final Logger logger;
private final List<String> dbtErrors = new ArrayList<>();

public NormalizationAirbyteStreamFactory(final MdcScope.Builder containerLogMdcBuilder) {
this(LOGGER, containerLogMdcBuilder);
}

NormalizationAirbyteStreamFactory(final Logger logger, final MdcScope.Builder containerLogMdcBuilder) {
this.logger = logger;
this.containerLogMdcBuilder = containerLogMdcBuilder;
}

@Override
public Stream<AirbyteMessage> create(final BufferedReader bufferedReader) {
return bufferedReader
.lines()
.flatMap(this::filterOutAndHandleNonJsonLines)
.flatMap(this::filterOutAndHandleNonAirbyteMessageLines)
// so now we are just left with AirbyteMessages
.filter(airbyteMessage -> {
final boolean isLog = airbyteMessage.getType() == AirbyteMessage.Type.LOG;
if (isLog) {
try (final var mdcScope = containerLogMdcBuilder.build()) {
internalLog(airbyteMessage.getLog());
}
}
return !isLog;
});
}

private Stream<JsonNode> filterOutAndHandleNonJsonLines(String line) {
final Optional<JsonNode> jsonLine = Jsons.tryDeserialize(line);
if (jsonLine.isEmpty()) {
// we log as info all the lines that are not valid json.
try (final var mdcScope = containerLogMdcBuilder.build()) {
logger.info(line);
// this is really hacky and vulnerable to picking up lines we don't want,
// however it is only for destinations that are using dbt version < 1.0.
// For v1 + we switch on JSON logging and parse those in the next block.
if (line.contains("[error]")) {
dbtErrors.add(line);
}
}
}
return jsonLine.stream();
}

private Stream<AirbyteMessage> filterOutAndHandleNonAirbyteMessageLines(JsonNode jsonLine) {
final Optional<AirbyteMessage> m = Jsons.tryObject(jsonLine, AirbyteMessage.class);
if (m.isEmpty()) {
// valid JSON but not an AirbyteMessage, so we assume this is a dbt json log
try {
final String logLevel = (jsonLine.getNodeType() == JsonNodeType.NULL || jsonLine.get("level").isNull())
? ""
: jsonLine.get("level").asText();
final String logMsg = jsonLine.get("msg").isNull() ? "" : jsonLine.get("msg").asText();
try (final var mdcScope = containerLogMdcBuilder.build()) {
switch (logLevel) {
case "debug" -> logger.debug(logMsg);
case "info" -> logger.info(logMsg);
case "warn" -> logger.warn(logMsg);
case "error" -> logAndCollectErrorMessage(logMsg);
default -> logger.info(jsonLine.asText()); // this shouldn't happen but logging it to avoid hiding unexpected lines.
}
}
} catch (final Exception e) {
logger.info(jsonLine.asText());
}
}
return m.stream();
}

private void logAndCollectErrorMessage(String logMessage) {
logger.error(logMessage);
dbtErrors.add(logMessage);
}

public List<String> getDbtErrors() {
return dbtErrors;
}

private void internalLog(final AirbyteLogMessage logMessage) {
switch (logMessage.getLevel()) {
case FATAL, ERROR -> logger.error(logMessage.getMessage());
case WARN -> logger.warn(logMessage.getMessage());
case DEBUG -> logger.debug(logMessage.getMessage());
case TRACE -> logger.trace(logMessage.getMessage());
default -> logger.info(logMessage.getMessage());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.config.OperatorDbt;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.protocol.models.AirbyteTraceMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.nio.file.Path;
import java.util.stream.Stream;

public interface NormalizationRunner extends AutoCloseable {

Expand Down Expand Up @@ -62,4 +64,6 @@ boolean normalize(String jobId,
ResourceRequirements resourceRequirements)
throws Exception;

Stream<AirbyteTraceMessage> getTraceMessages();

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
public class NormalizationRunnerFactory {

public static final String BASE_NORMALIZATION_IMAGE_NAME = "airbyte/normalization";
public static final String NORMALIZATION_VERSION = "0.2.13";
public static final String NORMALIZATION_VERSION = "0.2.14";

static final Map<String, ImmutablePair<String, DefaultNormalizationRunner.DestinationType>> NORMALIZATION_MAPPING =
ImmutableMap.<String, ImmutablePair<String, DefaultNormalizationRunner.DestinationType>>builder()
Expand Down
Loading

0 comments on commit f706a0a

Please sign in to comment.