From ddc362abb7f4f5c7f328d500bd7fecbdc1e07976 Mon Sep 17 00:00:00 2001 From: alafanechere Date: Tue, 20 Dec 2022 16:35:23 +0100 Subject: [PATCH 01/20] set failure reason when exit code is 0 --- .../java/io/airbyte/workers/WorkerUtils.java | 41 ++++++++++++++----- .../general/DefaultCheckConnectionWorker.java | 10 ++++- 2 files changed, 39 insertions(+), 12 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java index b704ade5f6a4..a12701b19176 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java @@ -107,23 +107,29 @@ public static WorkerDestinationConfig syncToWorkerDestinationConfig(final Standa .withState(sync.getState()); } + private static ConnectorCommand getConnectorCommandFromOutputType(final OutputType outputType) { + return switch (outputType) { + case SPEC -> ConnectorCommand.SPEC; + case CHECK_CONNECTION -> ConnectorCommand.CHECK; + case DISCOVER_CATALOG_ID -> ConnectorCommand.DISCOVER; + }; + } + + private static Optional getTraceMessageFromMessagesByType(final Map> messagesByType) { + return messagesByType.getOrDefault(Type.TRACE, new ArrayList<>()).stream() + .map(AirbyteMessage::getTrace) + .filter(trace -> trace.getType() == AirbyteTraceMessage.Type.ERROR) + .findFirst(); + } + public static ConnectorJobOutput getJobFailureOutputOrThrow(final OutputType outputType, final Map> messagesByType, final String defaultErrorMessage) throws WorkerException { - final Optional traceMessage = - messagesByType.getOrDefault(Type.TRACE, new ArrayList<>()).stream() - .map(AirbyteMessage::getTrace) - .filter(trace -> trace.getType() == AirbyteTraceMessage.Type.ERROR) - .findFirst(); + Optional traceMessage = getTraceMessageFromMessagesByType(messagesByType); if (traceMessage.isPresent()) { - final ConnectorCommand connectorCommand = switch (outputType) { - case SPEC -> ConnectorCommand.SPEC; - case CHECK_CONNECTION -> ConnectorCommand.CHECK; - case DISCOVER_CATALOG_ID -> ConnectorCommand.DISCOVER; - }; - + final ConnectorCommand connectorCommand = getConnectorCommandFromOutputType(outputType); final FailureReason failureReason = FailureHelper.connectorCommandFailure(traceMessage.get(), null, null, connectorCommand); return new ConnectorJobOutput().withOutputType(outputType).withFailureReason(failureReason); } @@ -131,6 +137,19 @@ public static ConnectorJobOutput getJobFailureOutputOrThrow(final OutputType out throw new WorkerException(defaultErrorMessage); } + + + public static Optional getJobFailureReasonFromMessages(final OutputType outputType, final Map> messagesByType) { + Optional traceMessage = getTraceMessageFromMessagesByType(messagesByType); + if (traceMessage.isPresent()) { + final ConnectorCommand connectorCommand = getConnectorCommandFromOutputType(outputType); + return Optional.of(FailureHelper.connectorCommandFailure(traceMessage.get(), null, null, connectorCommand)); + } else { + return Optional.empty(); + } + + } + public static Map mapStreamNamesToSchemas(final StandardSyncInput syncInput) { return syncInput.getCatalog().getStreams().stream().collect( Collectors.toMap( diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java index 66bc2d271b24..55dfffc0a322 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java @@ -14,6 +14,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.ConnectorJobOutput.OutputType; +import io.airbyte.config.FailureReason; import io.airbyte.config.StandardCheckConnectionInput; import io.airbyte.config.StandardCheckConnectionOutput; import io.airbyte.config.StandardCheckConnectionOutput.Status; @@ -92,7 +93,14 @@ public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Pa LOGGER.debug("Check connection job subprocess finished with exit code {}", exitCode); LOGGER.debug("Check connection job received output: {}", output); LineGobbler.endSection("CHECK"); - return new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION).withCheckConnection(output); + + final Optional failureReason = WorkerUtils.getJobFailureReasonFromMessages(OutputType.CHECK_CONNECTION, messagesByType); + + if (failureReason.isPresent()) { + return new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION).withCheckConnection(output).withFailureReason(failureReason.get()); + } else { + return new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION).withCheckConnection(output); + } } else { final String message = String.format("Error checking connection, status: %s, exit code: %d", status, exitCode); LOGGER.error(message); From 93a5e8c9456de74d1dadccae56ae41f03157316d Mon Sep 17 00:00:00 2001 From: alafanechere Date: Tue, 20 Dec 2022 17:01:35 +0100 Subject: [PATCH 02/20] format --- .../src/main/java/io/airbyte/workers/WorkerUtils.java | 5 ++--- .../workers/general/DefaultCheckConnectionWorker.java | 3 ++- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java index a12701b19176..7a28db3e4a13 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java @@ -137,9 +137,8 @@ public static ConnectorJobOutput getJobFailureOutputOrThrow(final OutputType out throw new WorkerException(defaultErrorMessage); } - - - public static Optional getJobFailureReasonFromMessages(final OutputType outputType, final Map> messagesByType) { + public static Optional getJobFailureReasonFromMessages(final OutputType outputType, + final Map> messagesByType) { Optional traceMessage = getTraceMessageFromMessagesByType(messagesByType); if (traceMessage.isPresent()) { final ConnectorCommand connectorCommand = getConnectorCommandFromOutputType(outputType); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java index 55dfffc0a322..3f9026b2b880 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java @@ -97,7 +97,8 @@ public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Pa final Optional failureReason = WorkerUtils.getJobFailureReasonFromMessages(OutputType.CHECK_CONNECTION, messagesByType); if (failureReason.isPresent()) { - return new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION).withCheckConnection(output).withFailureReason(failureReason.get()); + return new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION).withCheckConnection(output) + .withFailureReason(failureReason.get()); } else { return new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION).withCheckConnection(output); } From f0b69060ff64be68fa9124c2a33c27cfcafddad9 Mon Sep 17 00:00:00 2001 From: alafanechere Date: Wed, 21 Dec 2022 19:28:13 +0100 Subject: [PATCH 03/20] new approach --- .../java/io/airbyte/workers/WorkerUtils.java | 23 +++++++------ .../general/DefaultCheckConnectionWorker.java | 29 +++++++--------- .../general/DefaultDiscoverCatalogWorker.java | 33 ++++++++++--------- .../workers/general/DefaultGetSpecWorker.java | 16 +++++---- 4 files changed, 51 insertions(+), 50 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java index 7a28db3e4a13..0071b69e5010 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java @@ -122,21 +122,24 @@ private static Optional getTraceMessageFromMessagesByType(f .findFirst(); } - public static ConnectorJobOutput getJobFailureOutputOrThrow(final OutputType outputType, - final Map> messagesByType, - final String defaultErrorMessage) + public static ConnectorJobOutput getJobOutput(final OutputType outputType, + final Optional failureReason, + final String defaultErrorMessage, + final boolean throwOnNoFailureReason) throws WorkerException { - Optional traceMessage = getTraceMessageFromMessagesByType(messagesByType); - if (traceMessage.isPresent()) { - final ConnectorCommand connectorCommand = getConnectorCommandFromOutputType(outputType); - final FailureReason failureReason = FailureHelper.connectorCommandFailure(traceMessage.get(), null, null, connectorCommand); - return new ConnectorJobOutput().withOutputType(outputType).withFailureReason(failureReason); + if (throwOnNoFailureReason && !failureReason.isPresent()) { + throw new WorkerException(defaultErrorMessage); + } + ConnectorJobOutput jobOutput = new ConnectorJobOutput().withOutputType(outputType); + if (failureReason.isPresent()) { + return jobOutput.withFailureReason(failureReason.get()); + } else { + return jobOutput; } - - throw new WorkerException(defaultErrorMessage); } + public static Optional getJobFailureReasonFromMessages(final OutputType outputType, final Map> messagesByType) { Optional traceMessage = getTraceMessageFromMessagesByType(messagesByType); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java index 3f9026b2b880..bb3ce585f7c1 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java @@ -79,36 +79,29 @@ public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Pa WorkerUtils.gentleClose(process, 1, TimeUnit.MINUTES); } + final Optional failureReason = WorkerUtils.getJobFailureReasonFromMessages(OutputType.CHECK_CONNECTION, messagesByType); final int exitCode = process.exitValue(); + LOGGER.debug("Check connection job subprocess finished with exit code {}", exitCode); + final Optional status = messagesByType .getOrDefault(Type.CONNECTION_STATUS, new ArrayList<>()).stream() .map(AirbyteMessage::getConnectionStatus) .findFirst(); - if (status.isPresent() && exitCode == 0) { + final String defaultErrorMessage = String.format("Error checking connection, status: %s, exit code: %d", status, exitCode); + + if (status.isEmpty()) { + LOGGER.error(defaultErrorMessage); + return WorkerUtils.getJobOutput(OutputType.CHECK_CONNECTION, failureReason, defaultErrorMessage, true); + } else { final StandardCheckConnectionOutput output = new StandardCheckConnectionOutput() .withStatus(Enums.convertTo(status.get().getStatus(), Status.class)) .withMessage(status.get().getMessage()); - - LOGGER.debug("Check connection job subprocess finished with exit code {}", exitCode); LOGGER.debug("Check connection job received output: {}", output); LineGobbler.endSection("CHECK"); - - final Optional failureReason = WorkerUtils.getJobFailureReasonFromMessages(OutputType.CHECK_CONNECTION, messagesByType); - - if (failureReason.isPresent()) { - return new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION).withCheckConnection(output) - .withFailureReason(failureReason.get()); - } else { - return new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION).withCheckConnection(output); - } - } else { - final String message = String.format("Error checking connection, status: %s, exit code: %d", status, exitCode); - LOGGER.error(message); - - return WorkerUtils.getJobFailureOutputOrThrow(OutputType.CHECK_CONNECTION, messagesByType, message); + ConnectorJobOutput jobOutput = WorkerUtils.getJobOutput(OutputType.CHECK_CONNECTION, failureReason, defaultErrorMessage, false); + return jobOutput.withCheckConnection(output); } - } catch (final Exception e) { ApmTraceUtils.addExceptionToTrace(e); LOGGER.error("Unexpected error while checking connection: ", e); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java index 8c5a961f1fdd..7f7a0435891c 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java @@ -15,6 +15,7 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.ConnectorJobOutput.OutputType; +import io.airbyte.config.FailureReason; import io.airbyte.config.StandardDiscoverCatalogInput; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.metrics.lib.ApmTraceUtils; @@ -90,25 +91,27 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI .map(AirbyteMessage::getCatalog) .findFirst(); + final Optional failureReason = WorkerUtils.getJobFailureReasonFromMessages(OutputType.CHECK_CONNECTION, messagesByType); final int exitCode = process.exitValue(); - if (exitCode == 0) { + final String exitCodeMessage = String.format("Discover job subprocess finished with exit code %s", exitCode)); + LOGGER.debug(exitCodeMessage); + + if (exitCode != 0) { + return WorkerUtils.getJobOutput(OutputType.DISCOVER_CATALOG_ID, failureReason, exitCodeMessage, true); + } else { if (catalog.isEmpty()) { throw new WorkerException("Integration failed to output a catalog struct."); + } else { + final UUID catalogId = + configRepository.writeActorCatalogFetchEvent(catalog.get(), + // NOTE: sourceId is marked required in the OpenAPI config but the code generator doesn't enforce + // it, so we check again here. + discoverSchemaInput.getSourceId() == null ? null : UUID.fromString(discoverSchemaInput.getSourceId()), + discoverSchemaInput.getConnectorVersion(), + discoverSchemaInput.getConfigHash()); + ConnectorJobOutput jobOutput = WorkerUtils.getJobOutput(OutputType.DISCOVER_CATALOG_ID, failureReason, exitCodeMessage, false); + return jobOutput.withDiscoverCatalogId(catalogId); } - - final UUID catalogId = - configRepository.writeActorCatalogFetchEvent(catalog.get(), - // NOTE: sourceId is marked required in the OpenAPI config but the code generator doesn't enforce - // it, so we check again here. - discoverSchemaInput.getSourceId() == null ? null : UUID.fromString(discoverSchemaInput.getSourceId()), - discoverSchemaInput.getConnectorVersion(), - discoverSchemaInput.getConfigHash()); - return new ConnectorJobOutput().withOutputType(OutputType.DISCOVER_CATALOG_ID).withDiscoverCatalogId(catalogId); - } else { - return WorkerUtils.getJobFailureOutputOrThrow( - OutputType.DISCOVER_CATALOG_ID, - messagesByType, - String.format("Discover job subprocess finished with exit code %s", exitCode)); } } catch (final WorkerException e) { ApmTraceUtils.addExceptionToTrace(e); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java index 867093e23e22..6798a5124cea 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java @@ -13,6 +13,7 @@ import io.airbyte.commons.io.LineGobbler; import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.ConnectorJobOutput.OutputType; +import io.airbyte.config.FailureReason; import io.airbyte.config.JobGetSpecConfig; import io.airbyte.metrics.lib.ApmTraceUtils; import io.airbyte.protocol.models.AirbyteMessage; @@ -79,18 +80,19 @@ public ConnectorJobOutput run(final JobGetSpecConfig config, final Path jobRoot) .map(AirbyteMessage::getSpec) .findFirst(); + final Optional failureReason = WorkerUtils.getJobFailureReasonFromMessages(OutputType.SPEC, messagesByType); final int exitCode = process.exitValue(); - if (exitCode == 0) { + final String exitCodeMessage = String.format("Spec job subprocess finished with exit code %s", exitCode); + LOGGER.debug(exitCodeMessage); + if (exitCode != 0) { + return WorkerUtils.getJobOutput(OutputType.SPEC, failureReason, exitCodeMessage, true); + } else { if (spec.isEmpty()) { throw new WorkerException("integration failed to output a spec struct."); } + ConnectorJobOutput jobOutput = WorkerUtils.getJobOutput(OutputType.SPEC, failureReason, exitCodeMessage, false); - return new ConnectorJobOutput().withOutputType(OutputType.SPEC).withSpec(spec.get()); - } else { - return WorkerUtils.getJobFailureOutputOrThrow( - OutputType.SPEC, - messagesByType, - String.format("Spec job subprocess finished with exit code %s", exitCode)); + return jobOutput.withSpec(spec.get()); } } catch (final Exception e) { throw new WorkerException(String.format("Error while getting spec from image %s", config.getDockerImage()), e); From 2b4f99df963d568801a10585c8dec62cef7f30b4 Mon Sep 17 00:00:00 2001 From: alafanechere Date: Tue, 27 Dec 2022 12:07:12 +0100 Subject: [PATCH 04/20] explicit is better than implicit --- .../java/io/airbyte/workers/WorkerUtils.java | 28 +++++------ .../general/DefaultCheckConnectionWorker.java | 46 +++++++---------- .../general/DefaultDiscoverCatalogWorker.java | 49 ++++++++----------- .../workers/general/DefaultGetSpecWorker.java | 41 ++++++---------- 4 files changed, 66 insertions(+), 98 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java index 0071b69e5010..42866cf92fe9 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java @@ -5,7 +5,7 @@ package io.airbyte.workers; import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.config.ConnectorJobOutput; +import io.airbyte.commons.io.IOs; import io.airbyte.config.ConnectorJobOutput.OutputType; import io.airbyte.config.FailureReason; import io.airbyte.config.StandardSyncInput; @@ -15,9 +15,11 @@ import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.AirbyteTraceMessage; -import io.airbyte.workers.exception.WorkerException; import io.airbyte.workers.helper.FailureHelper; import io.airbyte.workers.helper.FailureHelper.ConnectorCommand; +import io.airbyte.workers.internal.AirbyteStreamFactory; +import java.io.IOException; +import java.io.InputStream; import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.ArrayList; @@ -122,24 +124,18 @@ private static Optional getTraceMessageFromMessagesByType(f .findFirst(); } - public static ConnectorJobOutput getJobOutput(final OutputType outputType, - final Optional failureReason, - final String defaultErrorMessage, - final boolean throwOnNoFailureReason) - throws WorkerException { + public static Map> getMessagesByType(Process process, AirbyteStreamFactory streamFactory, int timeOut) + throws IOException { + final Map> messagesByType; + try (final InputStream stdout = process.getInputStream()) { + messagesByType = streamFactory.create(IOs.newBufferedReader(stdout)) + .collect(Collectors.groupingBy(AirbyteMessage::getType)); - if (throwOnNoFailureReason && !failureReason.isPresent()) { - throw new WorkerException(defaultErrorMessage); - } - ConnectorJobOutput jobOutput = new ConnectorJobOutput().withOutputType(outputType); - if (failureReason.isPresent()) { - return jobOutput.withFailureReason(failureReason.get()); - } else { - return jobOutput; + WorkerUtils.gentleClose(process, timeOut, TimeUnit.MINUTES); + return messagesByType; } } - public static Optional getJobFailureReasonFromMessages(final OutputType outputType, final Map> messagesByType) { Optional traceMessage = getTraceMessageFromMessagesByType(messagesByType); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java index bb3ce585f7c1..9b8ceb2bcbe8 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java @@ -9,7 +9,6 @@ import datadog.trace.api.Trace; import io.airbyte.commons.enums.Enums; -import io.airbyte.commons.io.IOs; import io.airbyte.commons.io.LineGobbler; import io.airbyte.commons.json.Jsons; import io.airbyte.config.ConnectorJobOutput; @@ -28,14 +27,11 @@ import io.airbyte.workers.internal.AirbyteStreamFactory; import io.airbyte.workers.internal.DefaultAirbyteStreamFactory; import io.airbyte.workers.process.IntegrationLauncher; -import java.io.InputStream; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,40 +64,36 @@ public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Pa jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, Jsons.serialize(input.getConnectionConfiguration())); - - LineGobbler.gobble(process.getErrorStream(), LOGGER::error); - - final Map> messagesByType; - try (final InputStream stdout = process.getInputStream()) { - messagesByType = streamFactory.create(IOs.newBufferedReader(stdout)) - .collect(Collectors.groupingBy(AirbyteMessage::getType)); - - WorkerUtils.gentleClose(process, 1, TimeUnit.MINUTES); - } - - final Optional failureReason = WorkerUtils.getJobFailureReasonFromMessages(OutputType.CHECK_CONNECTION, messagesByType); final int exitCode = process.exitValue(); LOGGER.debug("Check connection job subprocess finished with exit code {}", exitCode); + ConnectorJobOutput jobOutput = new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION); - final Optional status = messagesByType + LineGobbler.gobble(process.getErrorStream(), LOGGER::error); + + final Map> messagesByType = WorkerUtils.getMessagesByType(process, streamFactory, 30); + final Optional connectionStatus = messagesByType .getOrDefault(Type.CONNECTION_STATUS, new ArrayList<>()).stream() .map(AirbyteMessage::getConnectionStatus) .findFirst(); - final String defaultErrorMessage = String.format("Error checking connection, status: %s, exit code: %d", status, exitCode); + final Optional failureReason = WorkerUtils.getJobFailureReasonFromMessages(OutputType.CHECK_CONNECTION, messagesByType); + if (failureReason.isPresent()) { + jobOutput = jobOutput.withFailureReason(failureReason.get()); + } - if (status.isEmpty()) { - LOGGER.error(defaultErrorMessage); - return WorkerUtils.getJobOutput(OutputType.CHECK_CONNECTION, failureReason, defaultErrorMessage, true); - } else { + if (connectionStatus.isPresent()) { final StandardCheckConnectionOutput output = new StandardCheckConnectionOutput() - .withStatus(Enums.convertTo(status.get().getStatus(), Status.class)) - .withMessage(status.get().getMessage()); + .withStatus(Enums.convertTo(connectionStatus.get().getStatus(), Status.class)) + .withMessage(connectionStatus.get().getMessage()); LOGGER.debug("Check connection job received output: {}", output); - LineGobbler.endSection("CHECK"); - ConnectorJobOutput jobOutput = WorkerUtils.getJobOutput(OutputType.CHECK_CONNECTION, failureReason, defaultErrorMessage, false); - return jobOutput.withCheckConnection(output); + jobOutput = jobOutput.withCheckConnection(output); + } else { + if (failureReason.isEmpty()) { + throw new WorkerException("Error checking connection status: no status nor failure reason were outputted."); + } } + return jobOutput; + } catch (final Exception e) { ApmTraceUtils.addExceptionToTrace(e); LOGGER.error("Unexpected error while checking connection: ", e); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java index 7f7a0435891c..fcb92e837b95 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java @@ -10,7 +10,6 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME; import datadog.trace.api.Trace; -import io.airbyte.commons.io.IOs; import io.airbyte.commons.io.LineGobbler; import io.airbyte.commons.json.Jsons; import io.airbyte.config.ConnectorJobOutput; @@ -28,7 +27,6 @@ import io.airbyte.workers.internal.AirbyteStreamFactory; import io.airbyte.workers.internal.DefaultAirbyteStreamFactory; import io.airbyte.workers.process.IntegrationLauncher; -import java.io.InputStream; import java.nio.file.Path; import java.util.ArrayList; import java.util.HashMap; @@ -36,8 +34,6 @@ import java.util.Map; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,45 +70,40 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, Jsons.serialize(discoverSchemaInput.getConnectionConfiguration())); + final int exitCode = process.exitValue(); + final String exitCodeMessage = String.format("Discover job subprocess finished with exit code %s", exitCode); + LOGGER.debug(exitCodeMessage); + ConnectorJobOutput jobOutput = new ConnectorJobOutput().withOutputType(OutputType.DISCOVER_CATALOG_ID); LineGobbler.gobble(process.getErrorStream(), LOGGER::error); - final Map> messagesByType; - - try (final InputStream stdout = process.getInputStream()) { - messagesByType = streamFactory.create(IOs.newBufferedReader(stdout)) - .collect(Collectors.groupingBy(AirbyteMessage::getType)); - - WorkerUtils.gentleClose(process, 30, TimeUnit.MINUTES); - } + final Map> messagesByType = WorkerUtils.getMessagesByType(process, streamFactory, 30); final Optional catalog = messagesByType .getOrDefault(Type.CATALOG, new ArrayList<>()).stream() .map(AirbyteMessage::getCatalog) .findFirst(); - final Optional failureReason = WorkerUtils.getJobFailureReasonFromMessages(OutputType.CHECK_CONNECTION, messagesByType); - final int exitCode = process.exitValue(); - final String exitCodeMessage = String.format("Discover job subprocess finished with exit code %s", exitCode)); - LOGGER.debug(exitCodeMessage); + final Optional failureReason = WorkerUtils.getJobFailureReasonFromMessages(OutputType.DISCOVER_CATALOG_ID, messagesByType); + if (failureReason.isPresent()) { + jobOutput = jobOutput.withFailureReason(failureReason.get()); + } - if (exitCode != 0) { - return WorkerUtils.getJobOutput(OutputType.DISCOVER_CATALOG_ID, failureReason, exitCodeMessage, true); + if (catalog.isPresent()) { + final UUID catalogId = + configRepository.writeActorCatalogFetchEvent(catalog.get(), + // NOTE: sourceId is marked required in the OpenAPI config but the code generator doesn't enforce + // it, so we check again here. + discoverSchemaInput.getSourceId() == null ? null : UUID.fromString(discoverSchemaInput.getSourceId()), + discoverSchemaInput.getConnectorVersion(), + discoverSchemaInput.getConfigHash()); + jobOutput = jobOutput.withDiscoverCatalogId(catalogId); } else { - if (catalog.isEmpty()) { + if (failureReason.isEmpty()) { throw new WorkerException("Integration failed to output a catalog struct."); - } else { - final UUID catalogId = - configRepository.writeActorCatalogFetchEvent(catalog.get(), - // NOTE: sourceId is marked required in the OpenAPI config but the code generator doesn't enforce - // it, so we check again here. - discoverSchemaInput.getSourceId() == null ? null : UUID.fromString(discoverSchemaInput.getSourceId()), - discoverSchemaInput.getConnectorVersion(), - discoverSchemaInput.getConfigHash()); - ConnectorJobOutput jobOutput = WorkerUtils.getJobOutput(OutputType.DISCOVER_CATALOG_ID, failureReason, exitCodeMessage, false); - return jobOutput.withDiscoverCatalogId(catalogId); } } + return jobOutput; } catch (final WorkerException e) { ApmTraceUtils.addExceptionToTrace(e); throw e; diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java index 6798a5124cea..1a38b91a8ec4 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java @@ -9,7 +9,6 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME; import datadog.trace.api.Trace; -import io.airbyte.commons.io.IOs; import io.airbyte.commons.io.LineGobbler; import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.ConnectorJobOutput.OutputType; @@ -24,14 +23,11 @@ import io.airbyte.workers.internal.AirbyteStreamFactory; import io.airbyte.workers.internal.DefaultAirbyteStreamFactory; import io.airbyte.workers.process.IntegrationLauncher; -import java.io.InputStream; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +37,6 @@ public class DefaultGetSpecWorker implements GetSpecWorker { private final IntegrationLauncher integrationLauncher; private final AirbyteStreamFactory streamFactory; - private Process process; public DefaultGetSpecWorker(final IntegrationLauncher integrationLauncher, @@ -60,20 +55,14 @@ public ConnectorJobOutput run(final JobGetSpecConfig config, final Path jobRoot) ApmTraceUtils.addTagsToTrace(Map.of(JOB_ROOT_KEY, jobRoot, DOCKER_IMAGE_KEY, config.getDockerImage())); try { process = integrationLauncher.spec(jobRoot); + final int exitCode = process.exitValue(); + final String exitCodeMessage = String.format("Spec job subprocess finished with exit code %s", exitCode); + LOGGER.debug(exitCodeMessage); + ConnectorJobOutput jobOutput = new ConnectorJobOutput().withOutputType(OutputType.SPEC); LineGobbler.gobble(process.getErrorStream(), LOGGER::error); - final Map> messagesByType; - try (final InputStream stdout = process.getInputStream()) { - messagesByType = streamFactory.create(IOs.newBufferedReader(stdout)) - .collect(Collectors.groupingBy(AirbyteMessage::getType)); - - // todo (cgardens) - let's pre-fetch the images outside of the worker so we don't need account for - // this. - // retrieving spec should generally be instantaneous, but since docker images might not be pulled - // it could take a while longer depending on internet conditions as well. - WorkerUtils.gentleClose(process, 30, TimeUnit.MINUTES); - } + final Map> messagesByType = WorkerUtils.getMessagesByType(process, streamFactory, 30); final Optional spec = messagesByType .getOrDefault(Type.SPEC, new ArrayList<>()).stream() @@ -81,19 +70,19 @@ public ConnectorJobOutput run(final JobGetSpecConfig config, final Path jobRoot) .findFirst(); final Optional failureReason = WorkerUtils.getJobFailureReasonFromMessages(OutputType.SPEC, messagesByType); - final int exitCode = process.exitValue(); - final String exitCodeMessage = String.format("Spec job subprocess finished with exit code %s", exitCode); - LOGGER.debug(exitCodeMessage); - if (exitCode != 0) { - return WorkerUtils.getJobOutput(OutputType.SPEC, failureReason, exitCodeMessage, true); + if (failureReason.isPresent()) { + jobOutput = jobOutput.withFailureReason(failureReason.get()); + } + + if (spec.isPresent()) { + jobOutput = jobOutput.withSpec(spec.get()); } else { - if (spec.isEmpty()) { - throw new WorkerException("integration failed to output a spec struct."); + if (failureReason.isEmpty()) { + throw new WorkerException("Integration failed to output a spec struct and did not output a failure reason."); } - ConnectorJobOutput jobOutput = WorkerUtils.getJobOutput(OutputType.SPEC, failureReason, exitCodeMessage, false); - - return jobOutput.withSpec(spec.get()); } + + return jobOutput; } catch (final Exception e) { throw new WorkerException(String.format("Error while getting spec from image %s", config.getDockerImage()), e); } From 74d4a7b4a05f9a1bb6dd7a99eca5fc805042ff1d Mon Sep 17 00:00:00 2001 From: alafanechere Date: Tue, 27 Dec 2022 17:27:00 +0100 Subject: [PATCH 05/20] use setters --- .../workers/general/DefaultCheckConnectionWorker.java | 8 +++----- .../io/airbyte/workers/general/DefaultGetSpecWorker.java | 8 +++----- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java index 9b8ceb2bcbe8..4a66bff2af63 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java @@ -66,7 +66,7 @@ public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Pa Jsons.serialize(input.getConnectionConfiguration())); final int exitCode = process.exitValue(); LOGGER.debug("Check connection job subprocess finished with exit code {}", exitCode); - ConnectorJobOutput jobOutput = new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION); + final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION); LineGobbler.gobble(process.getErrorStream(), LOGGER::error); @@ -77,16 +77,14 @@ public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Pa .findFirst(); final Optional failureReason = WorkerUtils.getJobFailureReasonFromMessages(OutputType.CHECK_CONNECTION, messagesByType); - if (failureReason.isPresent()) { - jobOutput = jobOutput.withFailureReason(failureReason.get()); - } + failureReason.ifPresent(jobOutput::setFailureReason); if (connectionStatus.isPresent()) { final StandardCheckConnectionOutput output = new StandardCheckConnectionOutput() .withStatus(Enums.convertTo(connectionStatus.get().getStatus(), Status.class)) .withMessage(connectionStatus.get().getMessage()); LOGGER.debug("Check connection job received output: {}", output); - jobOutput = jobOutput.withCheckConnection(output); + jobOutput.setCheckConnection(output); } else { if (failureReason.isEmpty()) { throw new WorkerException("Error checking connection status: no status nor failure reason were outputted."); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java index 1a38b91a8ec4..e5a5641b6c58 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java @@ -59,7 +59,7 @@ public ConnectorJobOutput run(final JobGetSpecConfig config, final Path jobRoot) final String exitCodeMessage = String.format("Spec job subprocess finished with exit code %s", exitCode); LOGGER.debug(exitCodeMessage); - ConnectorJobOutput jobOutput = new ConnectorJobOutput().withOutputType(OutputType.SPEC); + final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withOutputType(OutputType.SPEC); LineGobbler.gobble(process.getErrorStream(), LOGGER::error); final Map> messagesByType = WorkerUtils.getMessagesByType(process, streamFactory, 30); @@ -70,12 +70,10 @@ public ConnectorJobOutput run(final JobGetSpecConfig config, final Path jobRoot) .findFirst(); final Optional failureReason = WorkerUtils.getJobFailureReasonFromMessages(OutputType.SPEC, messagesByType); - if (failureReason.isPresent()) { - jobOutput = jobOutput.withFailureReason(failureReason.get()); - } + failureReason.ifPresent(jobOutput::setFailureReason); if (spec.isPresent()) { - jobOutput = jobOutput.withSpec(spec.get()); + jobOutput.setSpec(spec.get()); } else { if (failureReason.isEmpty()) { throw new WorkerException("Integration failed to output a spec struct and did not output a failure reason."); From 993fa9a30c28fb15f1e0e5f410f7b62a3676708a Mon Sep 17 00:00:00 2001 From: alafanechere Date: Wed, 28 Dec 2022 10:47:22 +0100 Subject: [PATCH 06/20] write discover tests --- .../general/DefaultDiscoverCatalogWorker.java | 8 ++-- .../general/DefaultGetSpecWorkerTest.java | 47 +++++++++++++------ 2 files changed, 36 insertions(+), 19 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java index fcb92e837b95..1d657e98a1aa 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java @@ -73,7 +73,7 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI final int exitCode = process.exitValue(); final String exitCodeMessage = String.format("Discover job subprocess finished with exit code %s", exitCode); LOGGER.debug(exitCodeMessage); - ConnectorJobOutput jobOutput = new ConnectorJobOutput().withOutputType(OutputType.DISCOVER_CATALOG_ID); + final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withOutputType(OutputType.DISCOVER_CATALOG_ID); LineGobbler.gobble(process.getErrorStream(), LOGGER::error); @@ -85,9 +85,7 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI .findFirst(); final Optional failureReason = WorkerUtils.getJobFailureReasonFromMessages(OutputType.DISCOVER_CATALOG_ID, messagesByType); - if (failureReason.isPresent()) { - jobOutput = jobOutput.withFailureReason(failureReason.get()); - } + failureReason.ifPresent(jobOutput::setFailureReason); if (catalog.isPresent()) { final UUID catalogId = @@ -97,7 +95,7 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI discoverSchemaInput.getSourceId() == null ? null : UUID.fromString(discoverSchemaInput.getSourceId()), discoverSchemaInput.getConnectorVersion(), discoverSchemaInput.getConfigHash()); - jobOutput = jobOutput.withDiscoverCatalogId(catalogId); + jobOutput.setDiscoverCatalogId(catalogId); } else { if (failureReason.isEmpty()) { throw new WorkerException("Integration failed to output a catalog struct."); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultGetSpecWorkerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultGetSpecWorkerTest.java index 7ddbdfc17d2c..192484cecdd2 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultGetSpecWorkerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultGetSpecWorkerTest.java @@ -78,47 +78,66 @@ void testSuccessfulRun() throws IOException, InterruptedException, WorkerExcepti } @Test - void testFailureOnInvalidSpec() throws InterruptedException { + void testFailureOnInvalidSpecAndNoFailureReason() throws InterruptedException { final String expectedSpecString = "{\"key\":\"value\"}"; when(process.getInputStream()).thenReturn(new ByteArrayInputStream(expectedSpecString.getBytes(Charsets.UTF_8))); when(process.waitFor(anyLong(), any())).thenReturn(true); - when(process.exitValue()).thenReturn(0); assertThatThrownBy(() -> worker.run(config, jobRoot)) .isInstanceOf(WorkerException.class) .getCause() .isInstanceOf(WorkerException.class) - .hasMessageContaining("integration failed to output a spec struct.") + .hasMessageContaining("Integration failed to output a spec struct and did not output a failure reason.") .hasNoCause(); } @Test - void testFailureOnNonzeroExitCode() throws InterruptedException, IOException { + void testWithInvalidSpecAndFailureReason() throws InterruptedException, WorkerException { + final String expectedSpecString = "{\"key\":\"value\"}"; + + final AirbyteMessage message = new AirbyteMessage() + .withType(Type.SPEC) + .withSpec(Jsons.deserialize(expectedSpecString, io.airbyte.protocol.models.ConnectorSpecification.class)); + final AirbyteMessage traceMessage = AirbyteMessageUtils.createErrorMessage("some error from the connector", 123.0); + + when(process.getInputStream()) + .thenReturn(new ByteArrayInputStream((Jsons.serialize(message) + "\n" + Jsons.serialize(traceMessage)).getBytes(Charsets.UTF_8))); + when(process.waitFor(anyLong(), any())).thenReturn(true); + + final ConnectorJobOutput output = worker.run(config, jobRoot); + assertEquals(OutputType.SPEC, output.getOutputType()); + assertNull(output.getSpec()); + + final FailureReason failureReason = output.getFailureReason(); + assertEquals("some error from the connector", failureReason.getExternalMessage()); + } + + @Test + void testWithValidSpecAndFailureReason() throws InterruptedException, WorkerException, IOException { final String expectedSpecString = MoreResources.readResource("valid_spec.json"); final AirbyteMessage message = new AirbyteMessage() .withType(Type.SPEC) .withSpec(Jsons.deserialize(expectedSpecString, io.airbyte.protocol.models.ConnectorSpecification.class)); + final AirbyteMessage traceMessage = AirbyteMessageUtils.createErrorMessage("some error from the connector", 123.0); - when(process.getInputStream()).thenReturn(new ByteArrayInputStream(Jsons.serialize(message).getBytes(Charsets.UTF_8))); + when(process.getInputStream()) + .thenReturn(new ByteArrayInputStream((Jsons.serialize(message) + "\n" + Jsons.serialize(traceMessage)).getBytes(Charsets.UTF_8))); when(process.waitFor(anyLong(), any())).thenReturn(true); - when(process.exitValue()).thenReturn(1); - assertThatThrownBy(() -> worker.run(config, jobRoot)) - .isInstanceOf(WorkerException.class) - .getCause() - .isInstanceOf(WorkerException.class) - .hasMessageContaining("Spec job subprocess finished with exit code") - .hasNoCause(); + final ConnectorJobOutput output = worker.run(config, jobRoot); + assertEquals(OutputType.SPEC, output.getOutputType()); + assertEquals(output.getSpec(), Jsons.deserialize(expectedSpecString, ConnectorSpecification.class)); + final FailureReason failureReason = output.getFailureReason(); + assertEquals("some error from the connector", failureReason.getExternalMessage()); } @Test - void testFailureOnNonzeroExitCodeWithTraceMessage() throws WorkerException, InterruptedException { + void testFailureReasonWithTraceMessageOnly() throws WorkerException, InterruptedException { final AirbyteMessage message = AirbyteMessageUtils.createErrorMessage("some error from the connector", 123.0); when(process.getInputStream()).thenReturn(new ByteArrayInputStream(Jsons.serialize(message).getBytes(Charsets.UTF_8))); when(process.waitFor(anyLong(), any())).thenReturn(true); - when(process.exitValue()).thenReturn(1); final ConnectorJobOutput output = worker.run(config, jobRoot); assertEquals(OutputType.SPEC, output.getOutputType()); From 2bf5ff56eff28670885839b9f2c9a3bce4d8365b Mon Sep 17 00:00:00 2001 From: alafanechere Date: Wed, 28 Dec 2022 11:42:22 +0100 Subject: [PATCH 07/20] write discover tests --- .../DefaultDiscoverCatalogWorkerTest.java | 58 +++++++++++++------ 1 file changed, 40 insertions(+), 18 deletions(-) diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorkerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorkerTest.java index b9fe4417657d..41dedcb4206c 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorkerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorkerTest.java @@ -47,6 +47,7 @@ import java.nio.file.Path; import java.time.Duration; import java.util.UUID; +import java.util.stream.Stream; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -76,8 +77,10 @@ class DefaultDiscoverCatalogWorkerTest { private Path jobRoot; private IntegrationLauncher integrationLauncher; private Process process; - private AirbyteStreamFactory streamFactory; - + private AirbyteStreamFactory validCatalogStreamFactory; + private AirbyteStreamFactory emptyStreamFactory; + private AirbyteStreamFactory traceStreamFactory; + private AirbyteStreamFactory validCatalogWithTraceMessageStreamFactory; private UUID CATALOG_ID; @BeforeEach @@ -96,14 +99,21 @@ void setup() throws Exception { when(process.getErrorStream()).thenReturn(new ByteArrayInputStream(new byte[0])); IOs.writeFile(jobRoot, WorkerConstants.SOURCE_CATALOG_JSON_FILENAME, MoreResources.readResource("airbyte_postgres_catalog.json")); + final AirbyteMessage traceMessage = AirbyteMessageUtils.createErrorMessage("some error from the connector", 123.0); + + validCatalogStreamFactory = noop -> Lists.newArrayList(new AirbyteMessage().withType(Type.CATALOG).withCatalog(CATALOG)).stream(); + traceStreamFactory = noop -> Lists.newArrayList(traceMessage).stream(); + emptyStreamFactory = noop -> Stream.empty(); + + validCatalogWithTraceMessageStreamFactory = + noop -> Lists.newArrayList(new AirbyteMessage().withType(Type.CATALOG).withCatalog(CATALOG), traceMessage).stream(); - streamFactory = noop -> Lists.newArrayList(new AirbyteMessage().withType(Type.CATALOG).withCatalog(CATALOG)).stream(); } @SuppressWarnings("BusyWait") @Test void testDiscoverSchema() throws Exception { - final DefaultDiscoverCatalogWorker worker = new DefaultDiscoverCatalogWorker(mConfigRepository, integrationLauncher, streamFactory); + final DefaultDiscoverCatalogWorker worker = new DefaultDiscoverCatalogWorker(mConfigRepository, integrationLauncher, validCatalogStreamFactory); final ConnectorJobOutput output = worker.run(INPUT, jobRoot); assertNull(output.getFailureReason()); @@ -122,10 +132,8 @@ void testDiscoverSchema() throws Exception { @SuppressWarnings("BusyWait") @Test - void testDiscoverSchemaProcessFail() throws Exception { - when(process.exitValue()).thenReturn(1); - - final DefaultDiscoverCatalogWorker worker = new DefaultDiscoverCatalogWorker(mConfigRepository, integrationLauncher, streamFactory); + void testDiscoverSchemaProcessFailWithNoCatalogNoTraceMessage() { + final DefaultDiscoverCatalogWorker worker = new DefaultDiscoverCatalogWorker(mConfigRepository, integrationLauncher, emptyStreamFactory); assertThrows(WorkerException.class, () -> worker.run(INPUT, jobRoot)); Assertions.assertTimeout(Duration.ofSeconds(5), () -> { @@ -139,18 +147,32 @@ void testDiscoverSchemaProcessFail() throws Exception { @SuppressWarnings("BusyWait") @Test - void testDiscoverSchemaProcessFailWithTraceMessage() throws Exception { - final AirbyteStreamFactory traceStreamFactory = noop -> Lists.newArrayList( - AirbyteMessageUtils.createErrorMessage("some error from the connector", 123.0)).stream(); - - when(process.exitValue()).thenReturn(1); + void testDiscoverSchemaHasFailureReasonWithTraceMessage() throws Exception { final DefaultDiscoverCatalogWorker worker = new DefaultDiscoverCatalogWorker(mConfigRepository, integrationLauncher, traceStreamFactory); final ConnectorJobOutput output = worker.run(INPUT, jobRoot); - // assertEquals(OutputType.DISCOVER_CATALOG, output.getOutputType()); - // assertNull(output.getDiscoverCatalog()); - assertNotNull(output.getFailureReason()); + assertEquals(output.getOutputType(), OutputType.DISCOVER_CATALOG_ID); + assertNull(output.getDiscoverCatalogId()); + final FailureReason failureReason = output.getFailureReason(); + assertEquals("some error from the connector", failureReason.getExternalMessage()); + Assertions.assertTimeout(Duration.ofSeconds(5), () -> { + while (process.getErrorStream().available() != 0) { + Thread.sleep(50); + } + }); + + verify(process).exitValue(); + } + + @Test + void testDiscoverSchemaHasFailureReasonAndCatalogWithCatalogAndTraceMessage() throws Exception { + + final DefaultDiscoverCatalogWorker worker = + new DefaultDiscoverCatalogWorker(mConfigRepository, integrationLauncher, validCatalogWithTraceMessageStreamFactory); + final ConnectorJobOutput output = worker.run(INPUT, jobRoot); + assertEquals(output.getOutputType(), OutputType.DISCOVER_CATALOG_ID); + assertNotNull(output.getDiscoverCatalogId()); final FailureReason failureReason = output.getFailureReason(); assertEquals("some error from the connector", failureReason.getExternalMessage()); @@ -168,13 +190,13 @@ void testDiscoverSchemaException() throws WorkerException { when(integrationLauncher.discover(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, Jsons.serialize(CREDENTIALS))) .thenThrow(new RuntimeException()); - final DefaultDiscoverCatalogWorker worker = new DefaultDiscoverCatalogWorker(mConfigRepository, integrationLauncher, streamFactory); + final DefaultDiscoverCatalogWorker worker = new DefaultDiscoverCatalogWorker(mConfigRepository, integrationLauncher, validCatalogStreamFactory); assertThrows(WorkerException.class, () -> worker.run(INPUT, jobRoot)); } @Test void testCancel() throws WorkerException { - final DefaultDiscoverCatalogWorker worker = new DefaultDiscoverCatalogWorker(mConfigRepository, integrationLauncher, streamFactory); + final DefaultDiscoverCatalogWorker worker = new DefaultDiscoverCatalogWorker(mConfigRepository, integrationLauncher, validCatalogStreamFactory); worker.run(INPUT, jobRoot); worker.cancel(); From 1f6c31044fc0a092f7034f36bbd1c65f1467867b Mon Sep 17 00:00:00 2001 From: alafanechere Date: Wed, 28 Dec 2022 11:51:07 +0100 Subject: [PATCH 08/20] write check tests --- .../DefaultCheckConnectionWorkerTest.java | 27 +++++++++++++++---- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultCheckConnectionWorkerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultCheckConnectionWorkerTest.java index f5685c245e44..ca0d1b8dee40 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultCheckConnectionWorkerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultCheckConnectionWorkerTest.java @@ -5,6 +5,7 @@ package io.airbyte.workers.general; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; @@ -37,6 +38,7 @@ import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Path; +import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -53,6 +55,8 @@ class DefaultCheckConnectionWorkerTest { private AirbyteStreamFactory successStreamFactory; private AirbyteStreamFactory failureStreamFactory; private AirbyteStreamFactory traceMessageStreamFactory; + private AirbyteStreamFactory traceMessageSuccessStreamFactory; + private AirbyteStreamFactory emptyStreamFactory; @BeforeEach void setup() throws IOException, WorkerException { @@ -79,6 +83,8 @@ void setup() throws IOException, WorkerException { final AirbyteMessage traceMessage = AirbyteMessageUtils.createErrorMessage("some error from the connector", 123.0); traceMessageStreamFactory = noop -> Lists.newArrayList(traceMessage).stream(); + traceMessageSuccessStreamFactory = noop -> Lists.newArrayList(successMessage, traceMessage).stream(); + emptyStreamFactory = noop -> Stream.empty(); } @Test @@ -113,16 +119,14 @@ void testFailedConnection() throws WorkerException { } @Test - void testProcessFail() { - when(process.exitValue()).thenReturn(1); + void testProcessFailWithNoFailureMessageNorStatus() { - final DefaultCheckConnectionWorker worker = new DefaultCheckConnectionWorker(integrationLauncher, failureStreamFactory); + final DefaultCheckConnectionWorker worker = new DefaultCheckConnectionWorker(integrationLauncher, emptyStreamFactory); assertThrows(WorkerException.class, () -> worker.run(input, jobRoot)); } @Test - void testProcessFailWithTraceMessage() throws WorkerException { - when(process.exitValue()).thenReturn(1); + void testOutputHasFailureReasonWhenTraceMessage() throws WorkerException { final DefaultCheckConnectionWorker worker = new DefaultCheckConnectionWorker(integrationLauncher, traceMessageStreamFactory); final ConnectorJobOutput output = worker.run(input, jobRoot); @@ -134,6 +138,19 @@ void testProcessFailWithTraceMessage() throws WorkerException { assertEquals("some error from the connector", failureReason.getExternalMessage()); } + @Test + void testOutputHasStatusAndFailureReasonWhenSuccessAndTraceMessage() throws WorkerException { + + final DefaultCheckConnectionWorker worker = new DefaultCheckConnectionWorker(integrationLauncher, traceMessageSuccessStreamFactory); + final ConnectorJobOutput output = worker.run(input, jobRoot); + + assertEquals(output.getOutputType(), OutputType.CHECK_CONNECTION); + assertNotNull(output.getCheckConnection()); + + final FailureReason failureReason = output.getFailureReason(); + assertEquals("some error from the connector", failureReason.getExternalMessage()); + } + @Test void testExceptionThrownInRun() throws WorkerException { doThrow(new RuntimeException()).when(integrationLauncher).check(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, Jsons.serialize(CREDS)); From 51d8da17722b0b802d2b2eb1ea812a11b1ff6836 Mon Sep 17 00:00:00 2001 From: alafanechere Date: Tue, 3 Jan 2023 15:27:08 +0100 Subject: [PATCH 09/20] add stderr to exception --- .../main/java/io/airbyte/workers/WorkerUtils.java | 13 +++++++++++++ .../general/DefaultCheckConnectionWorker.java | 7 ++++++- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java index 42866cf92fe9..12a3816c3aec 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java @@ -18,8 +18,10 @@ import io.airbyte.workers.helper.FailureHelper; import io.airbyte.workers.helper.FailureHelper.ConnectorCommand; import io.airbyte.workers.internal.AirbyteStreamFactory; +import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; +import java.io.InputStreamReader; import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.ArrayList; @@ -156,4 +158,15 @@ public static Map mapStreamNamesToSche } + public static String getStdErrFromErrorStream(final InputStream errorStream) throws IOException { + BufferedReader reader = new BufferedReader(new InputStreamReader(errorStream)); + StringBuilder errorOutput = new StringBuilder(); + String line; + while ((line = reader.readLine()) != null) { + errorOutput.append(line); + errorOutput.append(System.lineSeparator()); + } + return errorOutput.toString(); + } + } diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java index 4a66bff2af63..4821ac1ad447 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java @@ -87,7 +87,12 @@ public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Pa jobOutput.setCheckConnection(output); } else { if (failureReason.isEmpty()) { - throw new WorkerException("Error checking connection status: no status nor failure reason were outputted."); + final String stderr = WorkerUtils.getStdErrFromErrorStream(process.getErrorStream()); + if (stderr.isEmpty()) { + throw new WorkerException("Error checking connection status: no status nor failure reason were outputted."); + } else { + throw new WorkerException("Error checking connection status.\n" + stderr); + } } } return jobOutput; From 9fa3c999665ee3b06b137f671b1e947235ce9858 Mon Sep 17 00:00:00 2001 From: alafanechere Date: Tue, 3 Jan 2023 15:34:16 +0100 Subject: [PATCH 10/20] add stderr to exception --- .../workers/general/DefaultDiscoverCatalogWorker.java | 7 ++++++- .../io/airbyte/workers/general/DefaultGetSpecWorker.java | 7 ++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java index 1d657e98a1aa..ab84e6a98e04 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java @@ -98,7 +98,12 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI jobOutput.setDiscoverCatalogId(catalogId); } else { if (failureReason.isEmpty()) { - throw new WorkerException("Integration failed to output a catalog struct."); + final String stderr = WorkerUtils.getStdErrFromErrorStream(process.getErrorStream()); + if (stderr.isEmpty()) { + throw new WorkerException("Integration failed to output a catalog struct."); + } else { + throw new WorkerException("Integration failed to output a catalog struct:\n" + stderr); + } } } return jobOutput; diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java index e5a5641b6c58..7d0227dadd1d 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java @@ -76,7 +76,12 @@ public ConnectorJobOutput run(final JobGetSpecConfig config, final Path jobRoot) jobOutput.setSpec(spec.get()); } else { if (failureReason.isEmpty()) { - throw new WorkerException("Integration failed to output a spec struct and did not output a failure reason."); + final String stderr = WorkerUtils.getStdErrFromErrorStream(process.getErrorStream()); + if (stderr.isEmpty()) { + throw new WorkerException("Integration failed to output a spec struct and did not output a failure reason."); + } else { + throw new WorkerException("Integration failed to output a spec struct and did not output a failure reason:\n" + stderr); + } } } From b9586a9ff9c66c35c13eeaa8c6d4aba31afd6bd2 Mon Sep 17 00:00:00 2001 From: alafanechere Date: Tue, 3 Jan 2023 18:50:29 +0100 Subject: [PATCH 11/20] fix exit code --- .../workers/general/DefaultCheckConnectionWorker.java | 10 ++++++---- .../workers/general/DefaultDiscoverCatalogWorker.java | 7 ++++--- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java index 4821ac1ad447..509ea4139c50 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java @@ -64,8 +64,7 @@ public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Pa jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, Jsons.serialize(input.getConnectionConfiguration())); - final int exitCode = process.exitValue(); - LOGGER.debug("Check connection job subprocess finished with exit code {}", exitCode); + final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION); LineGobbler.gobble(process.getErrorStream(), LOGGER::error); @@ -78,12 +77,13 @@ public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Pa final Optional failureReason = WorkerUtils.getJobFailureReasonFromMessages(OutputType.CHECK_CONNECTION, messagesByType); failureReason.ifPresent(jobOutput::setFailureReason); - + final int exitCode = process.exitValue(); + LOGGER.info("Check connection job subprocess finished with exit code {}", exitCode); if (connectionStatus.isPresent()) { final StandardCheckConnectionOutput output = new StandardCheckConnectionOutput() .withStatus(Enums.convertTo(connectionStatus.get().getStatus(), Status.class)) .withMessage(connectionStatus.get().getMessage()); - LOGGER.debug("Check connection job received output: {}", output); + LOGGER.info("Check connection job received output: {}", output); jobOutput.setCheckConnection(output); } else { if (failureReason.isEmpty()) { @@ -95,11 +95,13 @@ public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Pa } } } + LineGobbler.endSection("CHECK"); return jobOutput; } catch (final Exception e) { ApmTraceUtils.addExceptionToTrace(e); LOGGER.error("Unexpected error while checking connection: ", e); + LineGobbler.endSection("CHECK"); throw new WorkerException("Unexpected error while getting checking connection.", e); } } diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java index ab84e6a98e04..cfd2051c5a90 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java @@ -70,9 +70,7 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, Jsons.serialize(discoverSchemaInput.getConnectionConfiguration())); - final int exitCode = process.exitValue(); - final String exitCodeMessage = String.format("Discover job subprocess finished with exit code %s", exitCode); - LOGGER.debug(exitCodeMessage); + final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withOutputType(OutputType.DISCOVER_CATALOG_ID); LineGobbler.gobble(process.getErrorStream(), LOGGER::error); @@ -87,6 +85,9 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI final Optional failureReason = WorkerUtils.getJobFailureReasonFromMessages(OutputType.DISCOVER_CATALOG_ID, messagesByType); failureReason.ifPresent(jobOutput::setFailureReason); + final int exitCode = process.exitValue(); + LOGGER.info(String.format("Discover job subprocess finished with exit code %s", exitCode)); + if (catalog.isPresent()) { final UUID catalogId = configRepository.writeActorCatalogFetchEvent(catalog.get(), From 41dda91a66097885b944136fcea6fd2b33c0650e Mon Sep 17 00:00:00 2001 From: Pedro Lopez Date: Wed, 11 Jan 2023 17:50:07 -0400 Subject: [PATCH 12/20] fix spotbugs error --- .../src/main/java/io/airbyte/workers/WorkerUtils.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java index 32c9cab9574a..d08f91bf8e56 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.ArrayList; @@ -138,7 +139,7 @@ private static Optional getTraceMessageFromMessagesByType(f .findFirst(); } - public static Map> getMessagesByType(Process process, AirbyteStreamFactory streamFactory, int timeOut) + public static Map> getMessagesByType(final Process process, final AirbyteStreamFactory streamFactory, final int timeOut) throws IOException { final Map> messagesByType; try (final InputStream stdout = process.getInputStream()) { @@ -152,7 +153,7 @@ public static Map> getMessagesByType(Process process, public static Optional getJobFailureReasonFromMessages(final OutputType outputType, final Map> messagesByType) { - Optional traceMessage = getTraceMessageFromMessagesByType(messagesByType); + final Optional traceMessage = getTraceMessageFromMessagesByType(messagesByType); if (traceMessage.isPresent()) { final ConnectorCommand connectorCommand = getConnectorCommandFromOutputType(outputType); return Optional.of(FailureHelper.connectorCommandFailure(traceMessage.get(), null, null, connectorCommand)); @@ -171,8 +172,8 @@ public static Map mapStreamNamesToSche } public static String getStdErrFromErrorStream(final InputStream errorStream) throws IOException { - BufferedReader reader = new BufferedReader(new InputStreamReader(errorStream)); - StringBuilder errorOutput = new StringBuilder(); + final BufferedReader reader = new BufferedReader(new InputStreamReader(errorStream, StandardCharsets.UTF_8)); + final StringBuilder errorOutput = new StringBuilder(); String line; while ((line = reader.readLine()) != null) { errorOutput.append(line); From 995157d1c6e19fd2933daea21e1bc31d4cc225de Mon Sep 17 00:00:00 2001 From: Pedro Lopez Date: Wed, 11 Jan 2023 17:58:30 -0400 Subject: [PATCH 13/20] add missing method signature update after merge --- .../workers/general/DefaultDiscoverCatalogWorkerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorkerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorkerTest.java index a89022648f23..6b63a3e765dc 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorkerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorkerTest.java @@ -210,7 +210,7 @@ void testDiscoverSchemaHasFailureReasonWithTraceMessage() throws Exception { void testDiscoverSchemaHasFailureReasonAndCatalogWithCatalogAndTraceMessage() throws Exception { final DefaultDiscoverCatalogWorker worker = - new DefaultDiscoverCatalogWorker(mConfigRepository, integrationLauncher, validCatalogWithTraceMessageStreamFactory); + new DefaultDiscoverCatalogWorker(mConfigRepository, integrationLauncher, connectorConfigUpdater, validCatalogWithTraceMessageStreamFactory); final ConnectorJobOutput output = worker.run(INPUT, jobRoot); assertEquals(output.getOutputType(), OutputType.DISCOVER_CATALOG_ID); assertNotNull(output.getDiscoverCatalogId()); From 0673e4e89964352b9c7bd4ecfb19afb0eeafd364 Mon Sep 17 00:00:00 2001 From: Pedro Lopez Date: Wed, 11 Jan 2023 18:03:01 -0400 Subject: [PATCH 14/20] fix pmd warning --- .../workers/general/DefaultGetSpecWorkerTest.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultGetSpecWorkerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultGetSpecWorkerTest.java index 192484cecdd2..7db18f0c77e4 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultGetSpecWorkerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultGetSpecWorkerTest.java @@ -39,6 +39,7 @@ class DefaultGetSpecWorkerTest { private static final Path TEST_ROOT = Path.of("/tmp/airbyte_tests"); private static final String DUMMY_IMAGE_NAME = "airbyte/notarealimage:1.1"; + private static final String ERROR_MESSAGE = "some error from the connector"; private DefaultGetSpecWorker worker; private IntegrationLauncher integrationLauncher; @@ -98,7 +99,7 @@ void testWithInvalidSpecAndFailureReason() throws InterruptedException, WorkerEx final AirbyteMessage message = new AirbyteMessage() .withType(Type.SPEC) .withSpec(Jsons.deserialize(expectedSpecString, io.airbyte.protocol.models.ConnectorSpecification.class)); - final AirbyteMessage traceMessage = AirbyteMessageUtils.createErrorMessage("some error from the connector", 123.0); + final AirbyteMessage traceMessage = AirbyteMessageUtils.createErrorMessage(ERROR_MESSAGE, 123.0); when(process.getInputStream()) .thenReturn(new ByteArrayInputStream((Jsons.serialize(message) + "\n" + Jsons.serialize(traceMessage)).getBytes(Charsets.UTF_8))); @@ -109,7 +110,7 @@ void testWithInvalidSpecAndFailureReason() throws InterruptedException, WorkerEx assertNull(output.getSpec()); final FailureReason failureReason = output.getFailureReason(); - assertEquals("some error from the connector", failureReason.getExternalMessage()); + assertEquals(ERROR_MESSAGE, failureReason.getExternalMessage()); } @Test @@ -119,7 +120,7 @@ void testWithValidSpecAndFailureReason() throws InterruptedException, WorkerExce final AirbyteMessage message = new AirbyteMessage() .withType(Type.SPEC) .withSpec(Jsons.deserialize(expectedSpecString, io.airbyte.protocol.models.ConnectorSpecification.class)); - final AirbyteMessage traceMessage = AirbyteMessageUtils.createErrorMessage("some error from the connector", 123.0); + final AirbyteMessage traceMessage = AirbyteMessageUtils.createErrorMessage(ERROR_MESSAGE, 123.0); when(process.getInputStream()) .thenReturn(new ByteArrayInputStream((Jsons.serialize(message) + "\n" + Jsons.serialize(traceMessage)).getBytes(Charsets.UTF_8))); @@ -129,12 +130,12 @@ void testWithValidSpecAndFailureReason() throws InterruptedException, WorkerExce assertEquals(OutputType.SPEC, output.getOutputType()); assertEquals(output.getSpec(), Jsons.deserialize(expectedSpecString, ConnectorSpecification.class)); final FailureReason failureReason = output.getFailureReason(); - assertEquals("some error from the connector", failureReason.getExternalMessage()); + assertEquals(ERROR_MESSAGE, failureReason.getExternalMessage()); } @Test void testFailureReasonWithTraceMessageOnly() throws WorkerException, InterruptedException { - final AirbyteMessage message = AirbyteMessageUtils.createErrorMessage("some error from the connector", 123.0); + final AirbyteMessage message = AirbyteMessageUtils.createErrorMessage(ERROR_MESSAGE, 123.0); when(process.getInputStream()).thenReturn(new ByteArrayInputStream(Jsons.serialize(message).getBytes(Charsets.UTF_8))); when(process.waitFor(anyLong(), any())).thenReturn(true); @@ -145,7 +146,7 @@ void testFailureReasonWithTraceMessageOnly() throws WorkerException, Interrupted assertNotNull(output.getFailureReason()); final FailureReason failureReason = output.getFailureReason(); - assertEquals("some error from the connector", failureReason.getExternalMessage()); + assertEquals(ERROR_MESSAGE, failureReason.getExternalMessage()); } } From 1f2138e51f6f0bf2ffb056c7dc768f1150646ae8 Mon Sep 17 00:00:00 2001 From: Pedro Lopez Date: Wed, 11 Jan 2023 18:32:33 -0400 Subject: [PATCH 15/20] return check job output if present, regardless of failure status --- .../main/java/io/airbyte/server/handlers/SchedulerHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index 8e7221178858..d2270ccf172b 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -436,7 +436,7 @@ private CheckConnectionRead reportConnectionStatus(final SynchronousResponse Date: Thu, 12 Jan 2023 18:18:46 +0100 Subject: [PATCH 16/20] introduce utils throwWorkerException --- .../java/io/airbyte/workers/WorkerUtils.java | 10 ++++++++++ .../general/DefaultCheckConnectionWorker.java | 13 ++++--------- .../general/DefaultDiscoverCatalogWorker.java | 12 ++---------- .../workers/general/DefaultGetSpecWorker.java | 18 ++++++------------ 4 files changed, 22 insertions(+), 31 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java index d08f91bf8e56..6a5deb6a6de6 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java @@ -17,6 +17,7 @@ import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair; import io.airbyte.protocol.models.AirbyteTraceMessage; +import io.airbyte.workers.exception.WorkerException; import io.airbyte.workers.helper.FailureHelper; import io.airbyte.workers.helper.FailureHelper.ConnectorCommand; import io.airbyte.workers.internal.AirbyteStreamFactory; @@ -182,4 +183,13 @@ public static String getStdErrFromErrorStream(final InputStream errorStream) thr return errorOutput.toString(); } + public static void throwWorkerException(final String errorMessage, final Process process) + throws WorkerException, IOException { + final String stderr = getStdErrFromErrorStream(process.getErrorStream()); + if (stderr.isEmpty()) { + throw new WorkerException(errorMessage); + } else { + throw new WorkerException(errorMessage + ": \n" + stderr); + } + } } diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java index 7fac6eb5c95b..4d345853957c 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java @@ -97,23 +97,18 @@ public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Pa final Optional failureReason = WorkerUtils.getJobFailureReasonFromMessages(OutputType.CHECK_CONNECTION, messagesByType); failureReason.ifPresent(jobOutput::setFailureReason); + final int exitCode = process.exitValue(); LOGGER.info("Check connection job subprocess finished with exit code {}", exitCode); + if (connectionStatus.isPresent()) { final StandardCheckConnectionOutput output = new StandardCheckConnectionOutput() .withStatus(Enums.convertTo(connectionStatus.get().getStatus(), Status.class)) .withMessage(connectionStatus.get().getMessage()); LOGGER.info("Check connection job received output: {}", output); jobOutput.setCheckConnection(output); - } else { - if (failureReason.isEmpty()) { - final String stderr = WorkerUtils.getStdErrFromErrorStream(process.getErrorStream()); - if (stderr.isEmpty()) { - throw new WorkerException("Error checking connection status: no status nor failure reason were outputted."); - } else { - throw new WorkerException("Error checking connection status.\n" + stderr); - } - } + } else if (failureReason.isEmpty()) { + WorkerUtils.throwWorkerException("Error checking connection status: no status nor failure reason were outputted", process); } LineGobbler.endSection("CHECK"); return jobOutput; diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java index d6c0da833e1b..4d701f0b6d81 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java @@ -78,7 +78,6 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI Jsons.serialize(discoverSchemaInput.getConnectionConfiguration())); final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withOutputType(OutputType.DISCOVER_CATALOG_ID); - LineGobbler.gobble(process.getErrorStream(), LOGGER::error); final Map> messagesByType = WorkerUtils.getMessagesByType(process, streamFactory, 30); @@ -109,15 +108,8 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI discoverSchemaInput.getConnectorVersion(), discoverSchemaInput.getConfigHash()); jobOutput.setDiscoverCatalogId(catalogId); - } else { - if (failureReason.isEmpty()) { - final String stderr = WorkerUtils.getStdErrFromErrorStream(process.getErrorStream()); - if (stderr.isEmpty()) { - throw new WorkerException("Integration failed to output a catalog struct."); - } else { - throw new WorkerException("Integration failed to output a catalog struct:\n" + stderr); - } - } + } else if (failureReason.isEmpty()){ + WorkerUtils.throwWorkerException("Integration failed to output a catalog struct", process); } return jobOutput; } catch (final WorkerException e) { diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java index 7d0227dadd1d..637df9f1171d 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java @@ -55,9 +55,7 @@ public ConnectorJobOutput run(final JobGetSpecConfig config, final Path jobRoot) ApmTraceUtils.addTagsToTrace(Map.of(JOB_ROOT_KEY, jobRoot, DOCKER_IMAGE_KEY, config.getDockerImage())); try { process = integrationLauncher.spec(jobRoot); - final int exitCode = process.exitValue(); - final String exitCodeMessage = String.format("Spec job subprocess finished with exit code %s", exitCode); - LOGGER.debug(exitCodeMessage); + final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withOutputType(OutputType.SPEC); LineGobbler.gobble(process.getErrorStream(), LOGGER::error); @@ -72,17 +70,13 @@ public ConnectorJobOutput run(final JobGetSpecConfig config, final Path jobRoot) final Optional failureReason = WorkerUtils.getJobFailureReasonFromMessages(OutputType.SPEC, messagesByType); failureReason.ifPresent(jobOutput::setFailureReason); + final int exitCode = process.exitValue(); + LOGGER.info( String.format("Spec job subprocess finished with exit code %s", exitCode)); + if (spec.isPresent()) { jobOutput.setSpec(spec.get()); - } else { - if (failureReason.isEmpty()) { - final String stderr = WorkerUtils.getStdErrFromErrorStream(process.getErrorStream()); - if (stderr.isEmpty()) { - throw new WorkerException("Integration failed to output a spec struct and did not output a failure reason."); - } else { - throw new WorkerException("Integration failed to output a spec struct and did not output a failure reason:\n" + stderr); - } - } + } else if (failureReason.isEmpty()) { + WorkerUtils.throwWorkerException("Integration failed to output a spec struct and did not output a failure reason.", process); } return jobOutput; From db2f9f49040995481d51d8da9cf31251b9c498d0 Mon Sep 17 00:00:00 2001 From: alafanechere Date: Thu, 12 Jan 2023 18:32:24 +0100 Subject: [PATCH 17/20] log warning on non 0 status code --- .../airbyte/workers/general/DefaultCheckConnectionWorker.java | 4 +++- .../airbyte/workers/general/DefaultDiscoverCatalogWorker.java | 4 +++- .../java/io/airbyte/workers/general/DefaultGetSpecWorker.java | 4 +++- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java index 4d345853957c..e3f2cdb99710 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java @@ -99,7 +99,9 @@ public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Pa failureReason.ifPresent(jobOutput::setFailureReason); final int exitCode = process.exitValue(); - LOGGER.info("Check connection job subprocess finished with exit code {}", exitCode); + if (exitCode != 0) { + LOGGER.warn("Check connection job subprocess finished with exit code {}", exitCode); + } if (connectionStatus.isPresent()) { final StandardCheckConnectionOutput output = new StandardCheckConnectionOutput() diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java index 4d701f0b6d81..3f690709219f 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java @@ -97,7 +97,9 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI failureReason.ifPresent(jobOutput::setFailureReason); final int exitCode = process.exitValue(); - LOGGER.info(String.format("Discover job subprocess finished with exit code %s", exitCode)); + if (exitCode != 0) { + LOGGER.warn("Discover job subprocess finished with exit codee {}", exitCode); + } if (catalog.isPresent()) { final UUID catalogId = diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java index 637df9f1171d..d51efc9df152 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java @@ -71,7 +71,9 @@ public ConnectorJobOutput run(final JobGetSpecConfig config, final Path jobRoot) failureReason.ifPresent(jobOutput::setFailureReason); final int exitCode = process.exitValue(); - LOGGER.info( String.format("Spec job subprocess finished with exit code %s", exitCode)); + if (exitCode != 0) { + LOGGER.warn("Spec job subprocess finished with exit code {}", exitCode); + } if (spec.isPresent()) { jobOutput.setSpec(spec.get()); From f14fed380f3944929a34d5187e7fb3f2f290bc66 Mon Sep 17 00:00:00 2001 From: alafanechere Date: Thu, 12 Jan 2023 19:06:28 +0100 Subject: [PATCH 18/20] format --- .../main/java/io/airbyte/workers/WorkerUtils.java | 13 +++++++------ .../general/DefaultDiscoverCatalogWorker.java | 2 +- .../workers/general/DefaultGetSpecWorker.java | 1 - 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java index 6a5deb6a6de6..2c305c5dfb20 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java @@ -185,11 +185,12 @@ public static String getStdErrFromErrorStream(final InputStream errorStream) thr public static void throwWorkerException(final String errorMessage, final Process process) throws WorkerException, IOException { - final String stderr = getStdErrFromErrorStream(process.getErrorStream()); - if (stderr.isEmpty()) { - throw new WorkerException(errorMessage); - } else { - throw new WorkerException(errorMessage + ": \n" + stderr); - } + final String stderr = getStdErrFromErrorStream(process.getErrorStream()); + if (stderr.isEmpty()) { + throw new WorkerException(errorMessage); + } else { + throw new WorkerException(errorMessage + ": \n" + stderr); + } } + } diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java index 3f690709219f..07cb56660074 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java @@ -110,7 +110,7 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI discoverSchemaInput.getConnectorVersion(), discoverSchemaInput.getConfigHash()); jobOutput.setDiscoverCatalogId(catalogId); - } else if (failureReason.isEmpty()){ + } else if (failureReason.isEmpty()) { WorkerUtils.throwWorkerException("Integration failed to output a catalog struct", process); } return jobOutput; diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java index d51efc9df152..c624b5211192 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java @@ -56,7 +56,6 @@ public ConnectorJobOutput run(final JobGetSpecConfig config, final Path jobRoot) try { process = integrationLauncher.spec(jobRoot); - final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withOutputType(OutputType.SPEC); LineGobbler.gobble(process.getErrorStream(), LOGGER::error); From f0ebcdfb384a7b3b03ac7c9465ad10023fe831ca Mon Sep 17 00:00:00 2001 From: alafanechere Date: Thu, 12 Jan 2023 20:22:11 +0100 Subject: [PATCH 19/20] consistent error message --- .../airbyte/workers/general/DefaultDiscoverCatalogWorker.java | 2 +- .../java/io/airbyte/workers/general/DefaultGetSpecWorker.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java index 07cb56660074..883dcaed85e6 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java @@ -111,7 +111,7 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI discoverSchemaInput.getConfigHash()); jobOutput.setDiscoverCatalogId(catalogId); } else if (failureReason.isEmpty()) { - WorkerUtils.throwWorkerException("Integration failed to output a catalog struct", process); + WorkerUtils.throwWorkerException("Integration failed to output a catalog struct and did not output a failure reason", process); } return jobOutput; } catch (final WorkerException e) { diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java index c624b5211192..09d85b38907a 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java @@ -77,7 +77,7 @@ public ConnectorJobOutput run(final JobGetSpecConfig config, final Path jobRoot) if (spec.isPresent()) { jobOutput.setSpec(spec.get()); } else if (failureReason.isEmpty()) { - WorkerUtils.throwWorkerException("Integration failed to output a spec struct and did not output a failure reason.", process); + WorkerUtils.throwWorkerException("Integration failed to output a spec struct and did not output a failure reason", process); } return jobOutput; From 2a0856cfb085952973fcc3bfd9bd5052e98dc47a Mon Sep 17 00:00:00 2001 From: Pedro Lopez Date: Thu, 12 Jan 2023 17:12:00 -0400 Subject: [PATCH 20/20] fix failing test --- .../io/airbyte/workers/general/DefaultGetSpecWorkerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultGetSpecWorkerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultGetSpecWorkerTest.java index 7db18f0c77e4..7d878ef66ce9 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultGetSpecWorkerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultGetSpecWorkerTest.java @@ -88,7 +88,7 @@ void testFailureOnInvalidSpecAndNoFailureReason() throws InterruptedException { .isInstanceOf(WorkerException.class) .getCause() .isInstanceOf(WorkerException.class) - .hasMessageContaining("Integration failed to output a spec struct and did not output a failure reason.") + .hasMessageContaining("Integration failed to output a spec struct and did not output a failure reason") .hasNoCause(); }