Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add and persist job failures for Normalization #14790

Merged
merged 31 commits into from
Aug 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
9f33750
added TracedException and uncaught exception handler
Phlair Jul 10, 2022
f9d7844
Merge branch 'master' into george/normalization-errors-py
Phlair Jul 11, 2022
ff954d1
added trace message capturing
Phlair Jul 13, 2022
7ed5164
added tests for TRACE messages
Phlair Jul 13, 2022
f730f77
pre-json logging
Phlair Jul 18, 2022
d92674c
Merge branch 'master' into george/normalization-errors-py
Phlair Jul 18, 2022
77ee33d
propagating normalization failures
Phlair Jul 18, 2022
1e1b1b1
log format json & fix hang
Phlair Jul 18, 2022
23efe1c
parsing dbt json logs
Phlair Jul 18, 2022
af2ee53
Merge commit 'c41c8d7ff425841c7d4203c8e2966cb887e9ed00' into george/n…
Phlair Jul 19, 2022
d2d0a2a
bump normalization version
Phlair Jul 19, 2022
bb99589
tests
Phlair Jul 19, 2022
9a7efec
Merge branch 'master' into george/normalization-errors-py
Phlair Jul 21, 2022
3d442ad
Merge branch 'master' into george/normalization-errors-py
Phlair Jul 25, 2022
16b588d
Benoit comments
Phlair Jul 25, 2022
6890ec4
update trace exception user message
Phlair Jul 25, 2022
d076969
review comments
Phlair Jul 26, 2022
01ed748
Merge branch 'master' into george/normalization-errors-py
Phlair Jul 26, 2022
9138bfa
bump version
Phlair Jul 26, 2022
745f7c4
bump version
Phlair Jul 26, 2022
33707f1
review comments
Phlair Jul 27, 2022
9073cc5
nit comments
Phlair Jul 27, 2022
29e138c
add normalization trace failure test
Phlair Jul 27, 2022
001da52
Merge branch 'master' into george/normalization-errors-py
Phlair Jul 27, 2022
0416e54
version bump
Phlair Jul 27, 2022
ea39d1e
pmd
Phlair Jul 27, 2022
eb09ec8
formatto
Phlair Jul 27, 2022
9c951ae
Merge branch 'master' into george/normalization-errors-py
Phlair Jul 27, 2022
f6eba9c
Merge branch 'master' into george/normalization-errors-py
Phlair Aug 1, 2022
5564ecd
bump version
Phlair Aug 1, 2022
63095ad
Merge branch 'master' into george/normalization-errors-py
Phlair Aug 2, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,7 @@ properties:
type: integer
endTime:
type: integer
failures:
type: array
items:
"$ref": FailureReason.yaml
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/base-normalization/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,5 @@ WORKDIR /airbyte
ENV AIRBYTE_ENTRYPOINT "/airbyte/entrypoint.sh"
ENTRYPOINT ["/airbyte/entrypoint.sh"]

LABEL io.airbyte.version=0.2.13
LABEL io.airbyte.version=0.2.14
LABEL io.airbyte.name=airbyte/normalization
3 changes: 2 additions & 1 deletion airbyte-integrations/bases/base-normalization/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,11 @@ function main() {
set +e # allow script to continue running even if next commands fail to run properly
# We don't run dbt 1.0.x on all destinations (because their plugins don't support it yet)
# So we need to only pass `--event-buffer-size` if it's supported by DBT.
# Same goes for JSON formatted logging.
check_dbt_event_buffer_size
if [ "$ret" -eq 0 ]; then
echo -e "\nDBT >=1.0.0 detected; using 10K event buffer size\n"
dbt_additional_args="--event-buffer-size=10000"
dbt_additional_args="--event-buffer-size=10000 --log-format json"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's way better than trying to parse line breaks!

else
dbt_additional_args=""
fi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,19 @@
#


import logging

from airbyte_cdk.exception_handler import init_uncaught_exception_handler
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from normalization.transform_catalog.transform import main

if __name__ == "__main__":
main()
init_uncaught_exception_handler(logging.getLogger("airbyte"))
try:
main()
except Exception as e:
msg = (
"Something went wrong while normalizing the data moved in this sync "
+ "(failed to transform catalog into dbt project). See the logs for more details."
)
raise AirbyteTracedException.from_exception(e, message=msg)
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,19 @@
#


import logging

from airbyte_cdk.exception_handler import init_uncaught_exception_handler
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
from normalization.transform_config.transform import main

if __name__ == "__main__":
main()
init_uncaught_exception_handler(logging.getLogger("airbyte"))
try:
main()
except Exception as e:
msg = (
"Something went wrong while normalizing the data moved in this sync "
+ "(failed to transform config for dbt project). See the logs for more details."
)
raise AirbyteTracedException.from_exception(e, message=msg)
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@
package io.airbyte.workers.general;

import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.FailureReason;
import io.airbyte.config.NormalizationInput;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.protocol.models.AirbyteTraceMessage;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.helper.FailureHelper;
import io.airbyte.workers.normalization.NormalizationRunner;
import io.airbyte.workers.normalization.NormalizationWorker;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.time.DurationFormatUtils;
import org.slf4j.Logger;
Expand All @@ -27,6 +32,8 @@ public class DefaultNormalizationWorker implements NormalizationWorker {
private final int attempt;
private final NormalizationRunner normalizationRunner;
private final WorkerEnvironment workerEnvironment;
private final List<FailureReason> traceFailureReasons = new ArrayList<>();
private boolean failed = false;

private final AtomicBoolean cancelled;

Expand Down Expand Up @@ -58,10 +65,10 @@ public NormalizationSummary run(final NormalizationInput input, final Path jobRo

if (!normalizationRunner.normalize(jobId, attempt, normalizationRoot, input.getDestinationConfiguration(), input.getCatalog(),
input.getResourceRequirements())) {
throw new WorkerException("Normalization Failed.");
buildFailureReasonsAndSetFailure();
}
} catch (final Exception e) {
throw new WorkerException("Normalization Failed.", e);
buildFailureReasonsAndSetFailure();
}

if (cancelled.get()) {
Expand All @@ -77,11 +84,25 @@ public NormalizationSummary run(final NormalizationInput input, final Path jobRo
.withStartTime(startTime)
.withEndTime(endTime);

if (!traceFailureReasons.isEmpty()) {
summary.setFailures(traceFailureReasons);
LOGGER.error("Normalization Failed.");
edgao marked this conversation as resolved.
Show resolved Hide resolved
} else if (failed) {
throw new WorkerException("Normalization Failed.");
}

LOGGER.info("Normalization summary: {}", summary);

return summary;
}

private void buildFailureReasonsAndSetFailure() {
normalizationRunner.getTraceMessages()
.filter(traceMessage -> traceMessage.getType() == AirbyteTraceMessage.Type.ERROR)
.forEach(traceMessage -> traceFailureReasons.add(FailureHelper.normalizationFailure(traceMessage, Long.valueOf(jobId), attempt)));
failed = true;
}

@Override
public void cancel() {
LOGGER.info("Cancelling normalization runner...");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,12 @@ public static FailureReason normalizationFailure(final Throwable t, final Long j
.withExternalMessage("Something went wrong during normalization");
}

public static FailureReason normalizationFailure(final AirbyteTraceMessage m, final Long jobId, final Integer attemptNumber) {
return genericFailure(m, jobId, attemptNumber)
.withFailureOrigin(FailureOrigin.NORMALIZATION)
.withExternalMessage(m.getError().getMessage());
}

public static FailureReason dbtFailure(final Throwable t, final Long jobId, final Integer attemptNumber) {
return genericFailure(t, jobId, attemptNumber)
.withFailureOrigin(FailureOrigin.DBT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,34 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.logging.LoggingHelper.Color;
import io.airbyte.commons.logging.MdcScope;
import io.airbyte.commons.logging.MdcScope.Builder;
import io.airbyte.config.OperatorDbt;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.protocol.models.AirbyteErrorTraceMessage;
import io.airbyte.protocol.models.AirbyteErrorTraceMessage.FailureType;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteTraceMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.exception.WorkerException;
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
import io.airbyte.workers.process.ProcessFactory;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,6 +50,8 @@ public class DefaultNormalizationRunner implements NormalizationRunner {
private final DestinationType destinationType;
private final ProcessFactory processFactory;
private final String normalizationImageName;
private final NormalizationAirbyteStreamFactory streamFactory = new NormalizationAirbyteStreamFactory(CONTAINER_LOG_MDC_BUILDER);
private Map<Type, List<AirbyteMessage>> airbyteMessagesByType;

private Process process = null;

Expand Down Expand Up @@ -135,7 +147,30 @@ private boolean runProcess(final String jobId,
Collections.emptyMap(),
args);

LineGobbler.gobble(process.getInputStream(), LOGGER::info, CONTAINER_LOG_MDC_BUILDER);
try (final InputStream stdout = process.getInputStream()) {
// finds and collects any AirbyteMessages from stdout
// also builds a list of raw dbt errors and stores in streamFactory
airbyteMessagesByType = streamFactory.create(IOs.newBufferedReader(stdout))
.collect(Collectors.groupingBy(AirbyteMessage::getType));

// picks up error logs from dbt
String dbtErrorStack = String.join("\n\t", streamFactory.getDbtErrors());

if (!"".equals(dbtErrorStack)) {
AirbyteMessage dbtTraceMessage = new AirbyteMessage()
.withType(Type.TRACE)
.withTrace(new AirbyteTraceMessage()
.withType(AirbyteTraceMessage.Type.ERROR)
.withEmittedAt((double) System.currentTimeMillis())
.withError(new AirbyteErrorTraceMessage()
.withFailureType(FailureType.SYSTEM_ERROR) // TODO: decide on best FailureType for this
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should use DATA_ERROR or even more specifically DBT_ERROR here?

The reason for an error from dbt is not necessarily clear, e.g. it could be a problem with the source data or it could be a system error from a bug we've introduced or it could be an issue with the destination (and other ors)...

.withMessage("Normalization failed during the dbt run. This may indicate a problem with the data itself.")
.withInternalMessage(dbtErrorStack)
.withStackTrace(dbtErrorStack)));
Comment on lines +160 to +169
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(curious) why not create a FailureReason directly? I think this is the first time we're creating a trace message platform side rather than coming from connectors

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point we could have TRACE messages in airbyteMessagesByType from an error in the normalization python code so for consistency I opted for TRACE messages from dbt at this point too.

Due to the way we run and consume output from dbt, it wouldn't be trivial to create trace messages directly from normalization.


airbyteMessagesByType.putIfAbsent(Type.TRACE, List.of(dbtTraceMessage));
}
}
LineGobbler.gobble(process.getErrorStream(), LOGGER::error, CONTAINER_LOG_MDC_BUILDER);

WorkerUtils.wait(process);
Expand Down Expand Up @@ -163,6 +198,14 @@ public void close() throws Exception {
}
}

@Override
public Stream<AirbyteTraceMessage> getTraceMessages() {
if (airbyteMessagesByType != null && airbyteMessagesByType.get(Type.TRACE) != null) {
return airbyteMessagesByType.get(Type.TRACE).stream().map(AirbyteMessage::getTrace);
}
return Stream.empty();
}

@VisibleForTesting
DestinationType getDestinationType() {
return destinationType;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.normalization;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeType;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.logging.MdcScope;
import io.airbyte.protocol.models.AirbyteLogMessage;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.workers.internal.AirbyteStreamFactory;
import java.io.BufferedReader;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Creates a stream from an input stream. The produced stream attempts to parse each line of the
* InputStream into a AirbyteMessage. If the line cannot be parsed into a AirbyteMessage it is
* assumed to be from dbt. dbt [error] messages are also parsed
*
* <p>
* If a line starts with a AirbyteMessage and then has other characters after it, that
* AirbyteMessage will still be parsed. If there are multiple AirbyteMessage records on the same
* line, only the first will be parsed.
*/
public class NormalizationAirbyteStreamFactory implements AirbyteStreamFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(NormalizationAirbyteStreamFactory.class);

private final MdcScope.Builder containerLogMdcBuilder;
private final Logger logger;
private final List<String> dbtErrors = new ArrayList<>();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not so sure about storing dbtErrors as data within an instance of this object, does that seem fine or is there a better approach?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the getter use anywhere? creating it in the create method and passing it as a parameter will remove any potential side effect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the getter is used here


public NormalizationAirbyteStreamFactory(final MdcScope.Builder containerLogMdcBuilder) {
this(LOGGER, containerLogMdcBuilder);
}

NormalizationAirbyteStreamFactory(final Logger logger, final MdcScope.Builder containerLogMdcBuilder) {
this.logger = logger;
this.containerLogMdcBuilder = containerLogMdcBuilder;
}

@Override
public Stream<AirbyteMessage> create(final BufferedReader bufferedReader) {
return bufferedReader
.lines()
.flatMap(this::filterOutAndHandleNonJsonLines)
.flatMap(this::filterOutAndHandleNonAirbyteMessageLines)
// so now we are just left with AirbyteMessages
.filter(airbyteMessage -> {
final boolean isLog = airbyteMessage.getType() == AirbyteMessage.Type.LOG;
if (isLog) {
try (final var mdcScope = containerLogMdcBuilder.build()) {
internalLog(airbyteMessage.getLog());
}
}
return !isLog;
});
}

private Stream<JsonNode> filterOutAndHandleNonJsonLines(String line) {
final Optional<JsonNode> jsonLine = Jsons.tryDeserialize(line);
if (jsonLine.isEmpty()) {
// we log as info all the lines that are not valid json.
try (final var mdcScope = containerLogMdcBuilder.build()) {
logger.info(line);
// this is really hacky and vulnerable to picking up lines we don't want,
// however it is only for destinations that are using dbt version < 1.0.
// For v1 + we switch on JSON logging and parse those in the next block.
if (line.contains("[error]")) {
dbtErrors.add(line);
}
}
}
return jsonLine.stream();
}

private Stream<AirbyteMessage> filterOutAndHandleNonAirbyteMessageLines(JsonNode jsonLine) {
final Optional<AirbyteMessage> m = Jsons.tryObject(jsonLine, AirbyteMessage.class);
if (m.isEmpty()) {
// valid JSON but not an AirbyteMessage, so we assume this is a dbt json log
try {
final String logLevel = (jsonLine.getNodeType() == JsonNodeType.NULL || jsonLine.get("level").isNull())
? ""
: jsonLine.get("level").asText();
final String logMsg = jsonLine.get("msg").isNull() ? "" : jsonLine.get("msg").asText();
try (final var mdcScope = containerLogMdcBuilder.build()) {
switch (logLevel) {
case "debug" -> logger.debug(logMsg);
case "info" -> logger.info(logMsg);
case "warn" -> logger.warn(logMsg);
case "error" -> logAndCollectErrorMessage(logMsg);
default -> logger.info(jsonLine.asText()); // this shouldn't happen but logging it to avoid hiding unexpected lines.
}
}
} catch (final Exception e) {
logger.info(jsonLine.asText());
}
}
return m.stream();
}

private void logAndCollectErrorMessage(String logMessage) {
logger.error(logMessage);
dbtErrors.add(logMessage);
}

public List<String> getDbtErrors() {
return dbtErrors;
}

private void internalLog(final AirbyteLogMessage logMessage) {
switch (logMessage.getLevel()) {
case FATAL, ERROR -> logger.error(logMessage.getMessage());
case WARN -> logger.warn(logMessage.getMessage());
case DEBUG -> logger.debug(logMessage.getMessage());
case TRACE -> logger.trace(logMessage.getMessage());
default -> logger.info(logMessage.getMessage());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.config.OperatorDbt;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.protocol.models.AirbyteTraceMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.nio.file.Path;
import java.util.stream.Stream;

public interface NormalizationRunner extends AutoCloseable {

Expand Down Expand Up @@ -62,4 +64,6 @@ boolean normalize(String jobId,
ResourceRequirements resourceRequirements)
throws Exception;

Stream<AirbyteTraceMessage> getTraceMessages();

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
public class NormalizationRunnerFactory {

public static final String BASE_NORMALIZATION_IMAGE_NAME = "airbyte/normalization";
public static final String NORMALIZATION_VERSION = "0.2.13";
public static final String NORMALIZATION_VERSION = "0.2.14";

static final Map<String, ImmutablePair<String, DefaultNormalizationRunner.DestinationType>> NORMALIZATION_MAPPING =
ImmutableMap.<String, ImmutablePair<String, DefaultNormalizationRunner.DestinationType>>builder()
Expand Down
Loading