diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java index 947e2edbdee6..2c305c5dfb20 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/WorkerUtils.java @@ -5,7 +5,7 @@ package io.airbyte.workers; import com.fasterxml.jackson.databind.JsonNode; -import io.airbyte.config.ConnectorJobOutput; +import io.airbyte.commons.io.IOs; import io.airbyte.config.ConnectorJobOutput.OutputType; import io.airbyte.config.FailureReason; import io.airbyte.config.StandardSyncInput; @@ -20,6 +20,12 @@ import io.airbyte.workers.exception.WorkerException; import io.airbyte.workers.helper.FailureHelper; import io.airbyte.workers.helper.FailureHelper.ConnectorCommand; +import io.airbyte.workers.internal.AirbyteStreamFactory; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.ArrayList; @@ -111,6 +117,14 @@ public static WorkerDestinationConfig syncToWorkerDestinationConfig(final Standa .withState(sync.getState()); } + private static ConnectorCommand getConnectorCommandFromOutputType(final OutputType outputType) { + return switch (outputType) { + case SPEC -> ConnectorCommand.SPEC; + case CHECK_CONNECTION -> ConnectorCommand.CHECK; + case DISCOVER_CATALOG_ID -> ConnectorCommand.DISCOVER; + }; + } + public static Optional getMostRecentConfigControlMessage(final Map> messagesByType) { return messagesByType.getOrDefault(Type.CONTROL, new ArrayList<>()).stream() .map(AirbyteMessage::getControl) @@ -119,28 +133,35 @@ public static Optional getMostRecentConfig .reduce((first, second) -> second); } - public static ConnectorJobOutput getJobFailureOutputOrThrow(final OutputType outputType, - final Map> messagesByType, - final String defaultErrorMessage) - throws WorkerException { - final Optional traceMessage = - messagesByType.getOrDefault(Type.TRACE, new ArrayList<>()).stream() - .map(AirbyteMessage::getTrace) - .filter(trace -> trace.getType() == AirbyteTraceMessage.Type.ERROR) - .findFirst(); + private static Optional getTraceMessageFromMessagesByType(final Map> messagesByType) { + return messagesByType.getOrDefault(Type.TRACE, new ArrayList<>()).stream() + .map(AirbyteMessage::getTrace) + .filter(trace -> trace.getType() == AirbyteTraceMessage.Type.ERROR) + .findFirst(); + } + + public static Map> getMessagesByType(final Process process, final AirbyteStreamFactory streamFactory, final int timeOut) + throws IOException { + final Map> messagesByType; + try (final InputStream stdout = process.getInputStream()) { + messagesByType = streamFactory.create(IOs.newBufferedReader(stdout)) + .collect(Collectors.groupingBy(AirbyteMessage::getType)); + + WorkerUtils.gentleClose(process, timeOut, TimeUnit.MINUTES); + return messagesByType; + } + } + public static Optional getJobFailureReasonFromMessages(final OutputType outputType, + final Map> messagesByType) { + final Optional traceMessage = getTraceMessageFromMessagesByType(messagesByType); if (traceMessage.isPresent()) { - final ConnectorCommand connectorCommand = switch (outputType) { - case SPEC -> ConnectorCommand.SPEC; - case CHECK_CONNECTION -> ConnectorCommand.CHECK; - case DISCOVER_CATALOG_ID -> ConnectorCommand.DISCOVER; - }; - - final FailureReason failureReason = FailureHelper.connectorCommandFailure(traceMessage.get(), null, null, connectorCommand); - return new ConnectorJobOutput().withOutputType(outputType).withFailureReason(failureReason); + final ConnectorCommand connectorCommand = getConnectorCommandFromOutputType(outputType); + return Optional.of(FailureHelper.connectorCommandFailure(traceMessage.get(), null, null, connectorCommand)); + } else { + return Optional.empty(); } - throw new WorkerException(defaultErrorMessage); } public static Map mapStreamNamesToSchemas(final StandardSyncInput syncInput) { @@ -151,4 +172,25 @@ public static Map mapStreamNamesToSche } + public static String getStdErrFromErrorStream(final InputStream errorStream) throws IOException { + final BufferedReader reader = new BufferedReader(new InputStreamReader(errorStream, StandardCharsets.UTF_8)); + final StringBuilder errorOutput = new StringBuilder(); + String line; + while ((line = reader.readLine()) != null) { + errorOutput.append(line); + errorOutput.append(System.lineSeparator()); + } + return errorOutput.toString(); + } + + public static void throwWorkerException(final String errorMessage, final Process process) + throws WorkerException, IOException { + final String stderr = getStdErrFromErrorStream(process.getErrorStream()); + if (stderr.isEmpty()) { + throw new WorkerException(errorMessage); + } else { + throw new WorkerException(errorMessage + ": \n" + stderr); + } + } + } diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java index 21c4039981cd..e3f2cdb99710 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultCheckConnectionWorker.java @@ -9,11 +9,11 @@ import datadog.trace.api.Trace; import io.airbyte.commons.enums.Enums; -import io.airbyte.commons.io.IOs; import io.airbyte.commons.io.LineGobbler; import io.airbyte.commons.json.Jsons; import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.ConnectorJobOutput.OutputType; +import io.airbyte.config.FailureReason; import io.airbyte.config.StandardCheckConnectionInput; import io.airbyte.config.StandardCheckConnectionOutput; import io.airbyte.config.StandardCheckConnectionOutput.Status; @@ -29,14 +29,11 @@ import io.airbyte.workers.internal.AirbyteStreamFactory; import io.airbyte.workers.internal.DefaultAirbyteStreamFactory; import io.airbyte.workers.process.IntegrationLauncher; -import java.io.InputStream; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,18 +70,12 @@ public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Pa WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, Jsons.serialize(input.getConnectionConfiguration())); - LineGobbler.gobble(process.getErrorStream(), LOGGER::error); - - final Map> messagesByType; - try (final InputStream stdout = process.getInputStream()) { - messagesByType = streamFactory.create(IOs.newBufferedReader(stdout)) - .collect(Collectors.groupingBy(AirbyteMessage::getType)); + final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION); - WorkerUtils.gentleClose(process, 1, TimeUnit.MINUTES); - } + LineGobbler.gobble(process.getErrorStream(), LOGGER::error); - final int exitCode = process.exitValue(); - final Optional status = messagesByType + final Map> messagesByType = WorkerUtils.getMessagesByType(process, streamFactory, 30); + final Optional connectionStatus = messagesByType .getOrDefault(Type.CONNECTION_STATUS, new ArrayList<>()).stream() .map(AirbyteMessage::getConnectionStatus) .findFirst(); @@ -104,25 +95,30 @@ public ConnectorJobOutput run(final StandardCheckConnectionInput input, final Pa }); } - if (status.isPresent() && exitCode == 0) { + final Optional failureReason = WorkerUtils.getJobFailureReasonFromMessages(OutputType.CHECK_CONNECTION, messagesByType); + failureReason.ifPresent(jobOutput::setFailureReason); + + final int exitCode = process.exitValue(); + if (exitCode != 0) { + LOGGER.warn("Check connection job subprocess finished with exit code {}", exitCode); + } + + if (connectionStatus.isPresent()) { final StandardCheckConnectionOutput output = new StandardCheckConnectionOutput() - .withStatus(Enums.convertTo(status.get().getStatus(), Status.class)) - .withMessage(status.get().getMessage()); - - LOGGER.debug("Check connection job subprocess finished with exit code {}", exitCode); - LOGGER.debug("Check connection job received output: {}", output); - LineGobbler.endSection("CHECK"); - return new ConnectorJobOutput().withOutputType(OutputType.CHECK_CONNECTION).withCheckConnection(output); - } else { - final String message = String.format("Error checking connection, status: %s, exit code: %d", status, exitCode); - LOGGER.error(message); - - return WorkerUtils.getJobFailureOutputOrThrow(OutputType.CHECK_CONNECTION, messagesByType, message); + .withStatus(Enums.convertTo(connectionStatus.get().getStatus(), Status.class)) + .withMessage(connectionStatus.get().getMessage()); + LOGGER.info("Check connection job received output: {}", output); + jobOutput.setCheckConnection(output); + } else if (failureReason.isEmpty()) { + WorkerUtils.throwWorkerException("Error checking connection status: no status nor failure reason were outputted", process); } + LineGobbler.endSection("CHECK"); + return jobOutput; } catch (final Exception e) { ApmTraceUtils.addExceptionToTrace(e); LOGGER.error("Unexpected error while checking connection: ", e); + LineGobbler.endSection("CHECK"); throw new WorkerException("Unexpected error while getting checking connection.", e); } } diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java index 34eefa7e390d..883dcaed85e6 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorker.java @@ -10,11 +10,11 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME; import datadog.trace.api.Trace; -import io.airbyte.commons.io.IOs; import io.airbyte.commons.io.LineGobbler; import io.airbyte.commons.json.Jsons; import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.ConnectorJobOutput.OutputType; +import io.airbyte.config.FailureReason; import io.airbyte.config.StandardDiscoverCatalogInput; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.metrics.lib.ApmTraceUtils; @@ -29,7 +29,6 @@ import io.airbyte.workers.internal.AirbyteStreamFactory; import io.airbyte.workers.internal.DefaultAirbyteStreamFactory; import io.airbyte.workers.process.IntegrationLauncher; -import java.io.InputStream; import java.nio.file.Path; import java.util.ArrayList; import java.util.HashMap; @@ -37,8 +36,6 @@ import java.util.Map; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -80,16 +77,10 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, Jsons.serialize(discoverSchemaInput.getConnectionConfiguration())); + final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withOutputType(OutputType.DISCOVER_CATALOG_ID); LineGobbler.gobble(process.getErrorStream(), LOGGER::error); - final Map> messagesByType; - - try (final InputStream stdout = process.getInputStream()) { - messagesByType = streamFactory.create(IOs.newBufferedReader(stdout)) - .collect(Collectors.groupingBy(AirbyteMessage::getType)); - - WorkerUtils.gentleClose(process, 30, TimeUnit.MINUTES); - } + final Map> messagesByType = WorkerUtils.getMessagesByType(process, streamFactory, 30); final Optional catalog = messagesByType .getOrDefault(Type.CATALOG, new ArrayList<>()).stream() @@ -102,12 +93,15 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI UUID.fromString(discoverSchemaInput.getSourceId()), configMessage.getConfig())); + final Optional failureReason = WorkerUtils.getJobFailureReasonFromMessages(OutputType.DISCOVER_CATALOG_ID, messagesByType); + failureReason.ifPresent(jobOutput::setFailureReason); + final int exitCode = process.exitValue(); - if (exitCode == 0) { - if (catalog.isEmpty()) { - throw new WorkerException("Integration failed to output a catalog struct."); - } + if (exitCode != 0) { + LOGGER.warn("Discover job subprocess finished with exit codee {}", exitCode); + } + if (catalog.isPresent()) { final UUID catalogId = configRepository.writeActorCatalogFetchEvent(catalog.get(), // NOTE: sourceId is marked required in the OpenAPI config but the code generator doesn't enforce @@ -115,13 +109,11 @@ public ConnectorJobOutput run(final StandardDiscoverCatalogInput discoverSchemaI discoverSchemaInput.getSourceId() == null ? null : UUID.fromString(discoverSchemaInput.getSourceId()), discoverSchemaInput.getConnectorVersion(), discoverSchemaInput.getConfigHash()); - return new ConnectorJobOutput().withOutputType(OutputType.DISCOVER_CATALOG_ID).withDiscoverCatalogId(catalogId); - } else { - return WorkerUtils.getJobFailureOutputOrThrow( - OutputType.DISCOVER_CATALOG_ID, - messagesByType, - String.format("Discover job subprocess finished with exit code %s", exitCode)); + jobOutput.setDiscoverCatalogId(catalogId); + } else if (failureReason.isEmpty()) { + WorkerUtils.throwWorkerException("Integration failed to output a catalog struct and did not output a failure reason", process); } + return jobOutput; } catch (final WorkerException e) { ApmTraceUtils.addExceptionToTrace(e); throw e; diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java index 867093e23e22..09d85b38907a 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultGetSpecWorker.java @@ -9,10 +9,10 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME; import datadog.trace.api.Trace; -import io.airbyte.commons.io.IOs; import io.airbyte.commons.io.LineGobbler; import io.airbyte.config.ConnectorJobOutput; import io.airbyte.config.ConnectorJobOutput.OutputType; +import io.airbyte.config.FailureReason; import io.airbyte.config.JobGetSpecConfig; import io.airbyte.metrics.lib.ApmTraceUtils; import io.airbyte.protocol.models.AirbyteMessage; @@ -23,14 +23,11 @@ import io.airbyte.workers.internal.AirbyteStreamFactory; import io.airbyte.workers.internal.DefaultAirbyteStreamFactory; import io.airbyte.workers.process.IntegrationLauncher; -import java.io.InputStream; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +37,6 @@ public class DefaultGetSpecWorker implements GetSpecWorker { private final IntegrationLauncher integrationLauncher; private final AirbyteStreamFactory streamFactory; - private Process process; public DefaultGetSpecWorker(final IntegrationLauncher integrationLauncher, @@ -60,38 +56,31 @@ public ConnectorJobOutput run(final JobGetSpecConfig config, final Path jobRoot) try { process = integrationLauncher.spec(jobRoot); + final ConnectorJobOutput jobOutput = new ConnectorJobOutput().withOutputType(OutputType.SPEC); LineGobbler.gobble(process.getErrorStream(), LOGGER::error); - final Map> messagesByType; - try (final InputStream stdout = process.getInputStream()) { - messagesByType = streamFactory.create(IOs.newBufferedReader(stdout)) - .collect(Collectors.groupingBy(AirbyteMessage::getType)); - - // todo (cgardens) - let's pre-fetch the images outside of the worker so we don't need account for - // this. - // retrieving spec should generally be instantaneous, but since docker images might not be pulled - // it could take a while longer depending on internet conditions as well. - WorkerUtils.gentleClose(process, 30, TimeUnit.MINUTES); - } + final Map> messagesByType = WorkerUtils.getMessagesByType(process, streamFactory, 30); final Optional spec = messagesByType .getOrDefault(Type.SPEC, new ArrayList<>()).stream() .map(AirbyteMessage::getSpec) .findFirst(); + final Optional failureReason = WorkerUtils.getJobFailureReasonFromMessages(OutputType.SPEC, messagesByType); + failureReason.ifPresent(jobOutput::setFailureReason); + final int exitCode = process.exitValue(); - if (exitCode == 0) { - if (spec.isEmpty()) { - throw new WorkerException("integration failed to output a spec struct."); - } - - return new ConnectorJobOutput().withOutputType(OutputType.SPEC).withSpec(spec.get()); - } else { - return WorkerUtils.getJobFailureOutputOrThrow( - OutputType.SPEC, - messagesByType, - String.format("Spec job subprocess finished with exit code %s", exitCode)); + if (exitCode != 0) { + LOGGER.warn("Spec job subprocess finished with exit code {}", exitCode); } + + if (spec.isPresent()) { + jobOutput.setSpec(spec.get()); + } else if (failureReason.isEmpty()) { + WorkerUtils.throwWorkerException("Integration failed to output a spec struct and did not output a failure reason", process); + } + + return jobOutput; } catch (final Exception e) { throw new WorkerException(String.format("Error while getting spec from image %s", config.getDockerImage()), e); } diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index 8e7221178858..d2270ccf172b 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -436,7 +436,7 @@ private CheckConnectionRead reportConnectionStatus(final SynchronousResponse Lists.newArrayList(traceMessage).stream(); + traceMessageSuccessStreamFactory = noop -> Lists.newArrayList(successMessage, traceMessage).stream(); + emptyStreamFactory = noop -> Stream.empty(); final AirbyteMessage configMessage1 = AirbyteMessageUtils.createConfigControlMessage(new Config().withAdditionalProperty("apiKey", "123"), 1D); final AirbyteMessage configMessage2 = AirbyteMessageUtils.createConfigControlMessage(CONNECTOR_CONFIG, 2D); @@ -186,16 +192,14 @@ void testFailedConnection() throws WorkerException { } @Test - void testProcessFail() { - when(process.exitValue()).thenReturn(1); + void testProcessFailWithNoFailureMessageNorStatus() { - final DefaultCheckConnectionWorker worker = new DefaultCheckConnectionWorker(integrationLauncher, connectorConfigUpdater, failureStreamFactory); + final DefaultCheckConnectionWorker worker = new DefaultCheckConnectionWorker(integrationLauncher, connectorConfigUpdater, emptyStreamFactory); assertThrows(WorkerException.class, () -> worker.run(input, jobRoot)); } @Test - void testProcessFailWithTraceMessage() throws WorkerException { - when(process.exitValue()).thenReturn(1); + void testOutputHasFailureReasonWhenTraceMessage() throws WorkerException { final DefaultCheckConnectionWorker worker = new DefaultCheckConnectionWorker(integrationLauncher, connectorConfigUpdater, traceMessageStreamFactory); @@ -208,6 +212,20 @@ void testProcessFailWithTraceMessage() throws WorkerException { assertEquals("some error from the connector", failureReason.getExternalMessage()); } + @Test + void testOutputHasStatusAndFailureReasonWhenSuccessAndTraceMessage() throws WorkerException { + + final DefaultCheckConnectionWorker worker = + new DefaultCheckConnectionWorker(integrationLauncher, connectorConfigUpdater, traceMessageSuccessStreamFactory); + final ConnectorJobOutput output = worker.run(input, jobRoot); + + assertEquals(output.getOutputType(), OutputType.CHECK_CONNECTION); + assertNotNull(output.getCheckConnection()); + + final FailureReason failureReason = output.getFailureReason(); + assertEquals("some error from the connector", failureReason.getExternalMessage()); + } + @Test void testExceptionThrownInRun() throws WorkerException { doThrow(new RuntimeException()).when(integrationLauncher).check(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, Jsons.serialize(CREDS)); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorkerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorkerTest.java index 67b76d951043..6b63a3e765dc 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorkerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorkerTest.java @@ -50,6 +50,7 @@ import java.nio.file.Path; import java.time.Duration; import java.util.UUID; +import java.util.stream.Stream; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -80,6 +81,10 @@ class DefaultDiscoverCatalogWorkerTest { private Path jobRoot; private IntegrationLauncher integrationLauncher; private Process process; + private AirbyteStreamFactory validCatalogStreamFactory; + private AirbyteStreamFactory emptyStreamFactory; + private AirbyteStreamFactory traceStreamFactory; + private AirbyteStreamFactory validCatalogWithTraceMessageStreamFactory; private AirbyteStreamFactory streamFactory; private ConnectorConfigUpdater connectorConfigUpdater; @@ -102,15 +107,22 @@ void setup() throws Exception { when(process.getErrorStream()).thenReturn(new ByteArrayInputStream(new byte[0])); IOs.writeFile(jobRoot, WorkerConstants.SOURCE_CATALOG_JSON_FILENAME, MoreResources.readResource("airbyte_postgres_catalog.json")); + final AirbyteMessage traceMessage = AirbyteMessageUtils.createErrorMessage("some error from the connector", 123.0); + + validCatalogStreamFactory = noop -> Lists.newArrayList(new AirbyteMessage().withType(Type.CATALOG).withCatalog(CATALOG)).stream(); + traceStreamFactory = noop -> Lists.newArrayList(traceMessage).stream(); + emptyStreamFactory = noop -> Stream.empty(); + + validCatalogWithTraceMessageStreamFactory = + noop -> Lists.newArrayList(new AirbyteMessage().withType(Type.CATALOG).withCatalog(CATALOG), traceMessage).stream(); - streamFactory = noop -> Lists.newArrayList(new AirbyteMessage().withType(Type.CATALOG).withCatalog(CATALOG)).stream(); } @SuppressWarnings("BusyWait") @Test void testDiscoverSchema() throws Exception { final DefaultDiscoverCatalogWorker worker = - new DefaultDiscoverCatalogWorker(mConfigRepository, integrationLauncher, connectorConfigUpdater, streamFactory); + new DefaultDiscoverCatalogWorker(mConfigRepository, integrationLauncher, connectorConfigUpdater, validCatalogStreamFactory); final ConnectorJobOutput output = worker.run(INPUT, jobRoot); assertNull(output.getFailureReason()); @@ -159,11 +171,9 @@ void testDiscoverSchemaWithConfigUpdate() throws Exception { @SuppressWarnings("BusyWait") @Test - void testDiscoverSchemaProcessFail() throws Exception { - when(process.exitValue()).thenReturn(1); - + void testDiscoverSchemaProcessFailWithNoCatalogNoTraceMessage() { final DefaultDiscoverCatalogWorker worker = - new DefaultDiscoverCatalogWorker(mConfigRepository, integrationLauncher, connectorConfigUpdater, streamFactory); + new DefaultDiscoverCatalogWorker(mConfigRepository, integrationLauncher, connectorConfigUpdater, emptyStreamFactory); assertThrows(WorkerException.class, () -> worker.run(INPUT, jobRoot)); Assertions.assertTimeout(Duration.ofSeconds(5), () -> { @@ -177,19 +187,33 @@ void testDiscoverSchemaProcessFail() throws Exception { @SuppressWarnings("BusyWait") @Test - void testDiscoverSchemaProcessFailWithTraceMessage() throws Exception { - final AirbyteStreamFactory traceStreamFactory = noop -> Lists.newArrayList( - AirbyteMessageUtils.createErrorMessage("some error from the connector", 123.0)).stream(); - - when(process.exitValue()).thenReturn(1); + void testDiscoverSchemaHasFailureReasonWithTraceMessage() throws Exception { final DefaultDiscoverCatalogWorker worker = new DefaultDiscoverCatalogWorker(mConfigRepository, integrationLauncher, connectorConfigUpdater, traceStreamFactory); final ConnectorJobOutput output = worker.run(INPUT, jobRoot); - // assertEquals(OutputType.DISCOVER_CATALOG, output.getOutputType()); - // assertNull(output.getDiscoverCatalog()); - assertNotNull(output.getFailureReason()); + assertEquals(output.getOutputType(), OutputType.DISCOVER_CATALOG_ID); + assertNull(output.getDiscoverCatalogId()); + final FailureReason failureReason = output.getFailureReason(); + assertEquals("some error from the connector", failureReason.getExternalMessage()); + Assertions.assertTimeout(Duration.ofSeconds(5), () -> { + while (process.getErrorStream().available() != 0) { + Thread.sleep(50); + } + }); + + verify(process).exitValue(); + } + + @Test + void testDiscoverSchemaHasFailureReasonAndCatalogWithCatalogAndTraceMessage() throws Exception { + + final DefaultDiscoverCatalogWorker worker = + new DefaultDiscoverCatalogWorker(mConfigRepository, integrationLauncher, connectorConfigUpdater, validCatalogWithTraceMessageStreamFactory); + final ConnectorJobOutput output = worker.run(INPUT, jobRoot); + assertEquals(output.getOutputType(), OutputType.DISCOVER_CATALOG_ID); + assertNotNull(output.getDiscoverCatalogId()); final FailureReason failureReason = output.getFailureReason(); assertEquals("some error from the connector", failureReason.getExternalMessage()); @@ -208,14 +232,14 @@ void testDiscoverSchemaException() throws WorkerException { .thenThrow(new RuntimeException()); final DefaultDiscoverCatalogWorker worker = - new DefaultDiscoverCatalogWorker(mConfigRepository, integrationLauncher, connectorConfigUpdater, streamFactory); + new DefaultDiscoverCatalogWorker(mConfigRepository, integrationLauncher, connectorConfigUpdater, validCatalogStreamFactory); assertThrows(WorkerException.class, () -> worker.run(INPUT, jobRoot)); } @Test void testCancel() throws WorkerException { final DefaultDiscoverCatalogWorker worker = - new DefaultDiscoverCatalogWorker(mConfigRepository, integrationLauncher, connectorConfigUpdater, streamFactory); + new DefaultDiscoverCatalogWorker(mConfigRepository, integrationLauncher, connectorConfigUpdater, validCatalogStreamFactory); worker.run(INPUT, jobRoot); worker.cancel(); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultGetSpecWorkerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultGetSpecWorkerTest.java index 7ddbdfc17d2c..7d878ef66ce9 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultGetSpecWorkerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultGetSpecWorkerTest.java @@ -39,6 +39,7 @@ class DefaultGetSpecWorkerTest { private static final Path TEST_ROOT = Path.of("/tmp/airbyte_tests"); private static final String DUMMY_IMAGE_NAME = "airbyte/notarealimage:1.1"; + private static final String ERROR_MESSAGE = "some error from the connector"; private DefaultGetSpecWorker worker; private IntegrationLauncher integrationLauncher; @@ -78,47 +79,66 @@ void testSuccessfulRun() throws IOException, InterruptedException, WorkerExcepti } @Test - void testFailureOnInvalidSpec() throws InterruptedException { + void testFailureOnInvalidSpecAndNoFailureReason() throws InterruptedException { final String expectedSpecString = "{\"key\":\"value\"}"; when(process.getInputStream()).thenReturn(new ByteArrayInputStream(expectedSpecString.getBytes(Charsets.UTF_8))); when(process.waitFor(anyLong(), any())).thenReturn(true); - when(process.exitValue()).thenReturn(0); assertThatThrownBy(() -> worker.run(config, jobRoot)) .isInstanceOf(WorkerException.class) .getCause() .isInstanceOf(WorkerException.class) - .hasMessageContaining("integration failed to output a spec struct.") + .hasMessageContaining("Integration failed to output a spec struct and did not output a failure reason") .hasNoCause(); } @Test - void testFailureOnNonzeroExitCode() throws InterruptedException, IOException { + void testWithInvalidSpecAndFailureReason() throws InterruptedException, WorkerException { + final String expectedSpecString = "{\"key\":\"value\"}"; + + final AirbyteMessage message = new AirbyteMessage() + .withType(Type.SPEC) + .withSpec(Jsons.deserialize(expectedSpecString, io.airbyte.protocol.models.ConnectorSpecification.class)); + final AirbyteMessage traceMessage = AirbyteMessageUtils.createErrorMessage(ERROR_MESSAGE, 123.0); + + when(process.getInputStream()) + .thenReturn(new ByteArrayInputStream((Jsons.serialize(message) + "\n" + Jsons.serialize(traceMessage)).getBytes(Charsets.UTF_8))); + when(process.waitFor(anyLong(), any())).thenReturn(true); + + final ConnectorJobOutput output = worker.run(config, jobRoot); + assertEquals(OutputType.SPEC, output.getOutputType()); + assertNull(output.getSpec()); + + final FailureReason failureReason = output.getFailureReason(); + assertEquals(ERROR_MESSAGE, failureReason.getExternalMessage()); + } + + @Test + void testWithValidSpecAndFailureReason() throws InterruptedException, WorkerException, IOException { final String expectedSpecString = MoreResources.readResource("valid_spec.json"); final AirbyteMessage message = new AirbyteMessage() .withType(Type.SPEC) .withSpec(Jsons.deserialize(expectedSpecString, io.airbyte.protocol.models.ConnectorSpecification.class)); + final AirbyteMessage traceMessage = AirbyteMessageUtils.createErrorMessage(ERROR_MESSAGE, 123.0); - when(process.getInputStream()).thenReturn(new ByteArrayInputStream(Jsons.serialize(message).getBytes(Charsets.UTF_8))); + when(process.getInputStream()) + .thenReturn(new ByteArrayInputStream((Jsons.serialize(message) + "\n" + Jsons.serialize(traceMessage)).getBytes(Charsets.UTF_8))); when(process.waitFor(anyLong(), any())).thenReturn(true); - when(process.exitValue()).thenReturn(1); - assertThatThrownBy(() -> worker.run(config, jobRoot)) - .isInstanceOf(WorkerException.class) - .getCause() - .isInstanceOf(WorkerException.class) - .hasMessageContaining("Spec job subprocess finished with exit code") - .hasNoCause(); + final ConnectorJobOutput output = worker.run(config, jobRoot); + assertEquals(OutputType.SPEC, output.getOutputType()); + assertEquals(output.getSpec(), Jsons.deserialize(expectedSpecString, ConnectorSpecification.class)); + final FailureReason failureReason = output.getFailureReason(); + assertEquals(ERROR_MESSAGE, failureReason.getExternalMessage()); } @Test - void testFailureOnNonzeroExitCodeWithTraceMessage() throws WorkerException, InterruptedException { - final AirbyteMessage message = AirbyteMessageUtils.createErrorMessage("some error from the connector", 123.0); + void testFailureReasonWithTraceMessageOnly() throws WorkerException, InterruptedException { + final AirbyteMessage message = AirbyteMessageUtils.createErrorMessage(ERROR_MESSAGE, 123.0); when(process.getInputStream()).thenReturn(new ByteArrayInputStream(Jsons.serialize(message).getBytes(Charsets.UTF_8))); when(process.waitFor(anyLong(), any())).thenReturn(true); - when(process.exitValue()).thenReturn(1); final ConnectorJobOutput output = worker.run(config, jobRoot); assertEquals(OutputType.SPEC, output.getOutputType()); @@ -126,7 +146,7 @@ void testFailureOnNonzeroExitCodeWithTraceMessage() throws WorkerException, Inte assertNotNull(output.getFailureReason()); final FailureReason failureReason = output.getFailureReason(); - assertEquals("some error from the connector", failureReason.getExternalMessage()); + assertEquals(ERROR_MESSAGE, failureReason.getExternalMessage()); } }