diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java index 32aeb8787e2a..b14aa55faf3e 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/JobCreationAndStatusUpdateActivityImpl.java @@ -75,6 +75,7 @@ public JobCreationOutput createNewJob(final JobCreationInput input) { final StandardSync standardSync = configRepository.getStandardSync(input.getConnectionId()); final List streamsToReset = streamResetPersistence.getStreamResets(input.getConnectionId()); + log.info("Found the following streams to reset for connection {}: {}", input.getConnectionId(), streamsToReset); if (!streamsToReset.isEmpty()) { final DestinationConnection destination = configRepository.getDestinationConnection(standardSync.getDestinationId()); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java index 1f67539fec57..9f4248b4124a 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java @@ -67,7 +67,6 @@ import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; @@ -185,7 +184,7 @@ public void setup() { } @Test - @Timeout(value = 2, + @Timeout(value = 10, unit = TimeUnit.SECONDS) @DisplayName("Test that a successful workflow retries and waits") public void runSuccess() throws InterruptedException { @@ -228,7 +227,7 @@ public void runSuccess() throws InterruptedException { } @Test - @Timeout(value = 2, + @Timeout(value = 10, unit = TimeUnit.SECONDS) @DisplayName("Test workflow does not wait to run after a failure") public void retryAfterFail() throws InterruptedException { @@ -269,7 +268,7 @@ public void retryAfterFail() throws InterruptedException { } @Test - @Timeout(value = 2, + @Timeout(value = 10, unit = TimeUnit.SECONDS) @DisplayName("Test workflow which receives a manual run signal stops waiting") public void manualRun() throws InterruptedException { @@ -290,7 +289,7 @@ public void manualRun() throws InterruptedException { startWorkflowAndWaitUntilReady(workflow, input); testEnv.sleep(Duration.ofMinutes(1L)); // any value here, just so it's started workflow.submitManualSync(); - testEnv.sleep(Duration.ofMinutes(1L)); // any value here, just so it's past the empty workflow run + Thread.sleep(500); final Queue events = testStateListener.events(testId); @@ -317,7 +316,7 @@ public void manualRun() throws InterruptedException { } @Test - @Timeout(value = 2, + @Timeout(value = 10, unit = TimeUnit.SECONDS) @DisplayName("Test workflow which receives an update signal stops waiting, doesn't run, and doesn't update the job status") public void updatedSignalReceived() throws InterruptedException { @@ -338,7 +337,7 @@ public void updatedSignalReceived() throws InterruptedException { startWorkflowAndWaitUntilReady(workflow, input); testEnv.sleep(Duration.ofSeconds(30L)); workflow.connectionUpdated(); - testEnv.sleep(Duration.ofSeconds(20L)); + Thread.sleep(500); final Queue events = testStateListener.events(testId); @@ -365,7 +364,7 @@ public void updatedSignalReceived() throws InterruptedException { } @Test - @Timeout(value = 2, + @Timeout(value = 10, unit = TimeUnit.SECONDS) @DisplayName("Test that cancelling a non-running workflow doesn't do anything") public void cancelNonRunning() throws InterruptedException { @@ -408,7 +407,7 @@ public void cancelNonRunning() throws InterruptedException { } @Test - @Timeout(value = 2, + @Timeout(value = 10, unit = TimeUnit.SECONDS) @DisplayName("Test that the sync is properly deleted") public void deleteSync() throws InterruptedException { @@ -429,7 +428,7 @@ public void deleteSync() throws InterruptedException { startWorkflowAndWaitUntilReady(workflow, input); testEnv.sleep(Duration.ofSeconds(30L)); workflow.deleteConnection(); - testEnv.sleep(Duration.ofMinutes(20L)); + Thread.sleep(500); final Queue events = testStateListener.events(testId); @@ -457,7 +456,7 @@ public void deleteSync() throws InterruptedException { } @Test - @Timeout(value = 2, + @Timeout(value = 10, unit = TimeUnit.SECONDS) @DisplayName("Test that fresh workflow cleans the job state") public void testStartFromCleanJobState() throws InterruptedException { @@ -488,7 +487,7 @@ public void setup() { } @Test - @Timeout(value = 2, + @Timeout(value = 10, unit = TimeUnit.SECONDS) @DisplayName("Test workflow which receives a manual sync while running a scheduled sync does nothing") public void manualRun() throws InterruptedException { @@ -527,7 +526,6 @@ public void manualRun() throws InterruptedException { } - @Disabled @Test @Timeout(value = 10, unit = TimeUnit.SECONDS) @@ -559,10 +557,7 @@ public void cancelRunning() throws InterruptedException { workflow.cancelJob(); - // TODO - // For some reason this transiently fails if it is below the runtime. - // However, this should be reported almost immediately. I think this is a bug. - testEnv.sleep(Duration.ofMinutes(SleepingSyncWorkflow.RUN_TIME.toMinutes() + 1)); + Thread.sleep(500); final Queue eventQueue = testStateListener.events(testId); final List events = new ArrayList<>(eventQueue); @@ -611,10 +606,7 @@ public void deleteRunning() throws InterruptedException { workflow.deleteConnection(); - // TODO - // For some reason this transiently fails if it is below the runtime. - // However, this should be reported almost immediately. I think this is a bug. - testEnv.sleep(Duration.ofMinutes(SleepingSyncWorkflow.RUN_TIME.toMinutes() + 1)); + Thread.sleep(500); final Queue eventQueue = testStateListener.events(testId); final List events = new ArrayList<>(eventQueue); @@ -638,7 +630,7 @@ public void deleteRunning() throws InterruptedException { } @Test - @Timeout(value = 2, + @Timeout(value = 10, unit = TimeUnit.SECONDS) @DisplayName("Test that resetting a non-running workflow starts a reset job") public void resetStart() throws InterruptedException { @@ -659,7 +651,7 @@ public void resetStart() throws InterruptedException { startWorkflowAndWaitUntilReady(workflow, input); testEnv.sleep(Duration.ofMinutes(5L)); workflow.resetConnection(); - testEnv.sleep(Duration.ofMinutes(15L)); + Thread.sleep(500); final Queue events = testStateListener.events(testId); @@ -694,7 +686,7 @@ public void resetCancelRunningWorkflow() throws InterruptedException { workflow.submitManualSync(); testEnv.sleep(Duration.ofSeconds(30L)); workflow.resetConnection(); - testEnv.sleep(Duration.ofMinutes(15L)); + Thread.sleep(500); final Queue eventQueue = testStateListener.events(testId); final List events = new ArrayList<>(eventQueue); @@ -717,7 +709,6 @@ public void resetCancelRunningWorkflow() throws InterruptedException { @Timeout(value = 60, unit = TimeUnit.SECONDS) @DisplayName("Test that cancelling a reset deletes streamsToReset from stream_resets table") - @Disabled public void cancelResetRemovesStreamsToReset() throws InterruptedException { final UUID connectionId = UUID.randomUUID(); final UUID testId = UUID.randomUUID(); @@ -738,7 +729,7 @@ public void cancelResetRemovesStreamsToReset() throws InterruptedException { testEnv.sleep(Duration.ofSeconds(30L)); workflow.cancelJob(); - testEnv.sleep(Duration.ofMinutes(15L)); + Thread.sleep(500); Mockito.verify(mStreamResetActivity).deleteStreamResetRecordsForJob(new DeleteStreamResetRecordsForJobInput(connectionId, JOB_ID)); } @@ -823,7 +814,7 @@ public void setup() { } @Test - @Timeout(value = 2, + @Timeout(value = 10, unit = TimeUnit.SECONDS) @DisplayName("Test that auto disable activity is touched during failure") public void testAutoDisableOnFailure() throws InterruptedException { @@ -851,7 +842,7 @@ public void testAutoDisableOnFailure() throws InterruptedException { testEnv.sleep(Duration.ofMinutes(1)); workflow.submitManualSync(); - testEnv.sleep(Duration.ofMinutes(1L)); // any time after no-waiting manual run + Thread.sleep(500); // any time after no-waiting manual run Mockito.verify(mJobCreationAndStatusUpdateActivity, atLeastOnce()).attemptFailureWithAttemptNumber(Mockito.any()); Mockito.verify(mJobCreationAndStatusUpdateActivity, atLeastOnce()).jobFailure(Mockito.any()); @@ -860,7 +851,7 @@ public void testAutoDisableOnFailure() throws InterruptedException { } @Test - @Timeout(value = 2, + @Timeout(value = 10, unit = TimeUnit.SECONDS) @DisplayName("Test that auto disable activity is not touched during job success") public void testNoAutoDisableOnSuccess() throws InterruptedException { @@ -888,7 +879,7 @@ public void testNoAutoDisableOnSuccess() throws InterruptedException { testEnv.sleep(Duration.ofMinutes(1)); workflow.submitManualSync(); - testEnv.sleep(Duration.ofMinutes(1L)); // any time after no-waiting manual run + Thread.sleep(500); // any time after no-waiting manual run Mockito.verifyNoInteractions(mAutoDisableConnectionActivity); } @@ -918,7 +909,7 @@ public void setup() { } @Test - @Timeout(value = 2, + @Timeout(value = 10, unit = TimeUnit.SECONDS) @DisplayName("Test that Source CHECK failures are recorded") public void testSourceCheckFailuresRecorded() throws InterruptedException { @@ -949,14 +940,14 @@ public void testSourceCheckFailuresRecorded() throws InterruptedException { testEnv.sleep(Duration.ofMinutes(1)); workflow.submitManualSync(); - testEnv.sleep(Duration.ofMinutes(1L)); // any time after no-waiting manual run + Thread.sleep(500); // any time after no-waiting manual run Mockito.verify(mJobCreationAndStatusUpdateActivity) .attemptFailureWithAttemptNumber(Mockito.argThat(new HasFailureFromOriginWithType(FailureOrigin.SOURCE, FailureType.CONFIG_ERROR))); } @Test - @Timeout(value = 2, + @Timeout(value = 10, unit = TimeUnit.SECONDS) @DisplayName("Test that Destination CHECK failures are recorded") public void testDestinationCheckFailuresRecorded() throws InterruptedException { @@ -988,14 +979,14 @@ public void testDestinationCheckFailuresRecorded() throws InterruptedException { testEnv.sleep(Duration.ofMinutes(1)); workflow.submitManualSync(); - testEnv.sleep(Duration.ofMinutes(1L)); // any time after no-waiting manual run + Thread.sleep(500); // any time after no-waiting manual run Mockito.verify(mJobCreationAndStatusUpdateActivity) .attemptFailureWithAttemptNumber(Mockito.argThat(new HasFailureFromOriginWithType(FailureOrigin.DESTINATION, FailureType.CONFIG_ERROR))); } @Test - @Timeout(value = 2, + @Timeout(value = 10, unit = TimeUnit.SECONDS) @DisplayName("Test that reset workflows do not CHECK the source") public void testSourceCheckSkippedWhenReset() throws InterruptedException { @@ -1028,14 +1019,14 @@ public void testSourceCheckSkippedWhenReset() throws InterruptedException { testEnv.sleep(Duration.ofMinutes(1)); workflow.submitManualSync(); - testEnv.sleep(Duration.ofMinutes(1L)); // any time after no-waiting manual run + Thread.sleep(500); // any time after no-waiting manual run Mockito.verify(mJobCreationAndStatusUpdateActivity, atLeastOnce()) .attemptFailureWithAttemptNumber(Mockito.argThat(new HasFailureFromOriginWithType(FailureOrigin.DESTINATION, FailureType.CONFIG_ERROR))); } @Test - @Timeout(value = 2, + @Timeout(value = 10, unit = TimeUnit.SECONDS) @DisplayName("Test that source and destination failures are recorded") public void testSourceAndDestinationFailuresRecorded() throws InterruptedException { @@ -1062,7 +1053,7 @@ public void testSourceAndDestinationFailuresRecorded() throws InterruptedExcepti testEnv.sleep(Duration.ofMinutes(1)); workflow.submitManualSync(); - testEnv.sleep(Duration.ofMinutes(1L)); // any time after no-waiting manual run + Thread.sleep(500); // any time after no-waiting manual run Mockito.verify(mJobCreationAndStatusUpdateActivity) .attemptFailureWithAttemptNumber(Mockito.argThat(new HasFailureFromOrigin(FailureOrigin.SOURCE))); @@ -1071,7 +1062,7 @@ public void testSourceAndDestinationFailuresRecorded() throws InterruptedExcepti } @Test - @Timeout(value = 2, + @Timeout(value = 10, unit = TimeUnit.SECONDS) @DisplayName("Test that normalization failure is recorded") public void testNormalizationFailure() throws InterruptedException { @@ -1098,14 +1089,14 @@ public void testNormalizationFailure() throws InterruptedException { testEnv.sleep(Duration.ofMinutes(1)); workflow.submitManualSync(); - testEnv.sleep(Duration.ofMinutes(1L)); // any time after no-waiting manual run + Thread.sleep(500); // any time after no-waiting manual run Mockito.verify(mJobCreationAndStatusUpdateActivity) .attemptFailureWithAttemptNumber(Mockito.argThat(new HasFailureFromOrigin(FailureOrigin.NORMALIZATION))); } @Test - @Timeout(value = 2, + @Timeout(value = 10, unit = TimeUnit.SECONDS) @DisplayName("Test that dbt failure is recorded") public void testDbtFailureRecorded() throws InterruptedException { @@ -1132,14 +1123,14 @@ public void testDbtFailureRecorded() throws InterruptedException { testEnv.sleep(Duration.ofMinutes(1)); workflow.submitManualSync(); - testEnv.sleep(Duration.ofMinutes(1L)); // any time after no-waiting manual run + Thread.sleep(500); // any time after no-waiting manual run Mockito.verify(mJobCreationAndStatusUpdateActivity) .attemptFailureWithAttemptNumber(Mockito.argThat(new HasFailureFromOrigin(FailureOrigin.DBT))); } @Test - @Timeout(value = 2, + @Timeout(value = 10, unit = TimeUnit.SECONDS) @DisplayName("Test that persistence failure is recorded") public void testPersistenceFailureRecorded() throws InterruptedException { @@ -1166,14 +1157,14 @@ public void testPersistenceFailureRecorded() throws InterruptedException { testEnv.sleep(Duration.ofMinutes(1)); workflow.submitManualSync(); - testEnv.sleep(Duration.ofMinutes(1L)); // any time after no-waiting manual run + Thread.sleep(500); // any time after no-waiting manual run Mockito.verify(mJobCreationAndStatusUpdateActivity) .attemptFailureWithAttemptNumber(Mockito.argThat(new HasFailureFromOrigin(FailureOrigin.PERSISTENCE))); } @Test - @Timeout(value = 2, + @Timeout(value = 10, unit = TimeUnit.SECONDS) @DisplayName("Test that replication worker failure is recorded") public void testReplicationFailureRecorded() throws InterruptedException { @@ -1200,7 +1191,7 @@ public void testReplicationFailureRecorded() throws InterruptedException { testEnv.sleep(Duration.ofMinutes(1)); workflow.submitManualSync(); - testEnv.sleep(Duration.ofMinutes(1L)); // any time after no-waiting manual run + Thread.sleep(500); // any time after no-waiting manual run Mockito.verify(mJobCreationAndStatusUpdateActivity) .attemptFailureWithAttemptNumber(Mockito.argThat(new HasFailureFromOrigin(FailureOrigin.REPLICATION))); @@ -1293,7 +1284,7 @@ void testCanRetryFailedActivity() throws InterruptedException { // Sleep test env for half of restart delay, so that we know we are in the middle of the delay testEnv.sleep(ConnectionManagerWorkflowImpl.WORKFLOW_FAILURE_RESTART_DELAY.dividedBy(2)); workflow.retryFailedActivity(); - testEnv.sleep(Duration.ofSeconds(30L)); + Thread.sleep(500); // any time after no-waiting manual run final Queue events = testStateListener.events(testId);