diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java index e59d39bcc4c6..4e9ad5ff9ec5 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java @@ -64,7 +64,10 @@ public IntegrationRunner(Source source) { } @VisibleForTesting - IntegrationRunner(IntegrationCliParser cliParser, Consumer outputRecordCollector, Destination destination, Source source) { + IntegrationRunner(IntegrationCliParser cliParser, + Consumer outputRecordCollector, + Destination destination, + Source source) { Preconditions.checkState(destination != null ^ source != null, "can only pass in a destination or a source"); this.cliParser = cliParser; this.outputRecordCollector = outputRecordCollector; @@ -97,6 +100,7 @@ public void run(String[] args) throws Exception { // todo (cgardens) - it is incongruous that that read and write return airbyte message (the // envelope) while the other commands return what goes inside it. case READ -> { + final JsonNode config = parseConfig(parsed.getConfigPath()); final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class); final Optional stateOptional = parsed.getStatePath().map(IntegrationRunner::parseConfig); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/DefaultReplicationWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/DefaultReplicationWorker.java index 03fe3aa81a6b..6a30b0d5bf32 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/DefaultReplicationWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/DefaultReplicationWorker.java @@ -151,9 +151,12 @@ public ReplicationOutput run(StandardSyncInput syncInput, Path jobRoot) throws W } final ReplicationStatus outputStatus; + // First check if the process was cancelled. Cancellation takes precedence over failures. if (cancelled.get()) { outputStatus = ReplicationStatus.CANCELLED; - } else if (hasFailed.get()) { + } + // if the process was not cancelled but still failed, then it's an actual failure + else if (hasFailed.get()) { outputStatus = ReplicationStatus.FAILED; } else { outputStatus = ReplicationStatus.COMPLETED; diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java index 6b3631cfdda6..195f8ec3bf26 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestination.java @@ -110,7 +110,7 @@ public void notifyEndOfStream() throws IOException { } @Override - public void close() throws IOException { + public void close() throws Exception { if (destinationProcess == null) { return; } @@ -122,9 +122,9 @@ public void close() throws IOException { LOGGER.debug("Closing destination process"); WorkerUtils.gentleClose(destinationProcess, 10, TimeUnit.HOURS); if (destinationProcess.isAlive() || destinationProcess.exitValue() != 0) { - LOGGER.warn( - "Destination process might not have shut down correctly. destination process alive: {}, destination process exit value: {}. This warning is normal if the job was cancelled.", - destinationProcess.isAlive(), destinationProcess.exitValue()); + String message = + destinationProcess.isAlive() ? "Destination has not terminated " : "Destination process exit with code " + destinationProcess.exitValue(); + throw new WorkerException(message + ". This warning is normal if the job was cancelled."); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSource.java b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSource.java index 4998b6bb56c3..cc6a0fb919e7 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSource.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSource.java @@ -33,6 +33,7 @@ import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.workers.WorkerConstants; +import io.airbyte.workers.WorkerException; import io.airbyte.workers.WorkerUtils; import io.airbyte.workers.process.IntegrationLauncher; import java.nio.file.Path; @@ -129,9 +130,8 @@ public void close() throws Exception { FORCED_SHUTDOWN_DURATION); if (sourceProcess.isAlive() || sourceProcess.exitValue() != 0) { - LOGGER.warn( - "Source process might not have shut down correctly. source process alive: {}, source process exit value: {}. This warning is normal if the job was cancelled.", - sourceProcess.isAlive(), sourceProcess.exitValue()); + String message = sourceProcess.isAlive() ? "Source has not terminated " : "Source process exit with code " + sourceProcess.exitValue(); + throw new WorkerException(message + ". This warning is normal if the job was cancelled."); } } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/DefaultReplicationWorkerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/DefaultReplicationWorkerTest.java index 1ecb849ea29d..d84f2f4cd19b 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/DefaultReplicationWorkerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/DefaultReplicationWorkerTest.java @@ -217,7 +217,43 @@ void testCancellation() throws InterruptedException { @Test void testPopulatesOutputOnSuccess() throws WorkerException { - testPopulatesOutput(); + final JsonNode expectedState = Jsons.jsonNode(ImmutableMap.of("updated_at", 10L)); + when(sourceMessageTracker.getRecordCount()).thenReturn(12L); + when(sourceMessageTracker.getBytesCount()).thenReturn(100L); + when(destinationMessageTracker.getOutputState()).thenReturn(Optional.of(new State().withState(expectedState))); + + final ReplicationWorker worker = new DefaultReplicationWorker( + JOB_ID, + JOB_ATTEMPT, + source, + mapper, + destination, + sourceMessageTracker, + destinationMessageTracker); + + final ReplicationOutput actual = worker.run(syncInput, jobRoot); + final ReplicationOutput replicationOutput = new ReplicationOutput() + .withReplicationAttemptSummary(new ReplicationAttemptSummary() + .withRecordsSynced(12L) + .withBytesSynced(100L) + .withStatus(ReplicationStatus.COMPLETED)) + .withOutputCatalog(syncInput.getCatalog()) + .withState(new State().withState(expectedState)); + + // good enough to verify that times are present. + assertNotNull(actual.getReplicationAttemptSummary().getStartTime()); + assertNotNull(actual.getReplicationAttemptSummary().getEndTime()); + + // verify output object matches declared json schema spec. + final Set validate = new JsonSchemaValidator() + .validate(Jsons.jsonNode(Jsons.jsonNode(JsonSchemaValidator.getSchema(ConfigSchema.REPLICATION_OUTPUT.getFile()))), Jsons.jsonNode(actual)); + assertTrue(validate.isEmpty(), "Validation errors: " + Strings.join(validate, ",")); + + // remove times so we can do the rest of the object <> object comparison. + actual.getReplicationAttemptSummary().withStartTime(null); + actual.getReplicationAttemptSummary().withEndTime(null); + + assertEquals(replicationOutput, actual); } @Test @@ -295,44 +331,4 @@ void testDoesNotPopulateOnIrrecoverableFailure() { assertThrows(WorkerException.class, () -> worker.run(syncInput, jobRoot)); } - private void testPopulatesOutput() throws WorkerException { - final JsonNode expectedState = Jsons.jsonNode(ImmutableMap.of("updated_at", 10L)); - when(sourceMessageTracker.getRecordCount()).thenReturn(12L); - when(sourceMessageTracker.getBytesCount()).thenReturn(100L); - when(destinationMessageTracker.getOutputState()).thenReturn(Optional.of(new State().withState(expectedState))); - - final ReplicationWorker worker = new DefaultReplicationWorker( - JOB_ID, - JOB_ATTEMPT, - source, - mapper, - destination, - sourceMessageTracker, - destinationMessageTracker); - - final ReplicationOutput actual = worker.run(syncInput, jobRoot); - final ReplicationOutput replicationOutput = new ReplicationOutput() - .withReplicationAttemptSummary(new ReplicationAttemptSummary() - .withRecordsSynced(12L) - .withBytesSynced(100L) - .withStatus(ReplicationStatus.COMPLETED)) - .withOutputCatalog(syncInput.getCatalog()) - .withState(new State().withState(expectedState)); - - // good enough to verify that times are present. - assertNotNull(actual.getReplicationAttemptSummary().getStartTime()); - assertNotNull(actual.getReplicationAttemptSummary().getEndTime()); - - // verify output object matches declared json schema spec. - final Set validate = new JsonSchemaValidator() - .validate(Jsons.jsonNode(Jsons.jsonNode(JsonSchemaValidator.getSchema(ConfigSchema.REPLICATION_OUTPUT.getFile()))), Jsons.jsonNode(actual)); - assertTrue(validate.isEmpty(), "Validation errors: " + Strings.join(validate, ",")); - - // remove times so we can do the rest of the object <> object comparison. - actual.getReplicationAttemptSummary().withStartTime(null); - actual.getReplicationAttemptSummary().withEndTime(null); - - assertEquals(replicationOutput, actual); - } - } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestinationTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestinationTest.java index 65e8bf1341ab..72c793b9bef3 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestinationTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteDestinationTest.java @@ -154,4 +154,14 @@ public void testCloseNotifiesLifecycle() throws Exception { verify(outputStream).close(); } + @Test + public void testNonzeroExitCodeThrowsException() throws Exception { + final AirbyteDestination destination = new DefaultAirbyteDestination(integrationLauncher); + destination.start(DESTINATION_CONFIG, jobRoot); + + when(process.isAlive()).thenReturn(false); + when(process.exitValue()).thenReturn(1); + Assertions.assertThrows(WorkerException.class, destination::close); + } + } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSourceTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSourceTest.java index f69dd70a6ea5..f7890e77b2a9 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSourceTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSourceTest.java @@ -147,4 +147,14 @@ public void testSuccessfulLifecycle() throws Exception { verify(process).exitValue(); } + @Test + public void testNonzeroExitCodeThrows() throws Exception { + final AirbyteSource tap = new DefaultAirbyteSource(integrationLauncher, streamFactory, heartbeatMonitor); + tap.start(SOURCE_CONFIG, jobRoot); + + when(process.exitValue()).thenReturn(1); + + Assertions.assertThrows(WorkerException.class, tap::close); + } + }