Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix flaky connection manager workflow test #14271

Merged
merged 9 commits into from
Jun 30, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public JobCreationOutput createNewJob(final JobCreationInput input) {

final StandardSync standardSync = configRepository.getStandardSync(input.getConnectionId());
final List<StreamDescriptor> streamsToReset = streamResetPersistence.getStreamResets(input.getConnectionId());
log.info("Found the following streams to reset for connection {}: {}", input.getConnectionId(), streamsToReset);

if (!streamsToReset.isEmpty()) {
final DestinationConnection destination = configRepository.getDestinationConnection(standardSync.getDestinationId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -185,7 +184,7 @@ public void setup() {
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test that a successful workflow retries and waits")
public void runSuccess() throws InterruptedException {
Expand Down Expand Up @@ -228,7 +227,7 @@ public void runSuccess() throws InterruptedException {
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test workflow does not wait to run after a failure")
public void retryAfterFail() throws InterruptedException {
Expand Down Expand Up @@ -269,7 +268,7 @@ public void retryAfterFail() throws InterruptedException {
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test workflow which receives a manual run signal stops waiting")
public void manualRun() throws InterruptedException {
Expand All @@ -290,7 +289,7 @@ public void manualRun() throws InterruptedException {
startWorkflowAndWaitUntilReady(workflow, input);
testEnv.sleep(Duration.ofMinutes(1L)); // any value here, just so it's started
workflow.submitManualSync();
testEnv.sleep(Duration.ofMinutes(1L)); // any value here, just so it's past the empty workflow run
Thread.sleep(500);

final Queue<ChangedStateEvent> events = testStateListener.events(testId);

Expand All @@ -317,7 +316,7 @@ public void manualRun() throws InterruptedException {
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test workflow which receives an update signal stops waiting, doesn't run, and doesn't update the job status")
public void updatedSignalReceived() throws InterruptedException {
Expand All @@ -338,7 +337,7 @@ public void updatedSignalReceived() throws InterruptedException {
startWorkflowAndWaitUntilReady(workflow, input);
testEnv.sleep(Duration.ofSeconds(30L));
workflow.connectionUpdated();
testEnv.sleep(Duration.ofSeconds(20L));
Thread.sleep(500);

final Queue<ChangedStateEvent> events = testStateListener.events(testId);

Expand All @@ -365,7 +364,7 @@ public void updatedSignalReceived() throws InterruptedException {
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test that cancelling a non-running workflow doesn't do anything")
public void cancelNonRunning() throws InterruptedException {
Expand Down Expand Up @@ -408,7 +407,7 @@ public void cancelNonRunning() throws InterruptedException {
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test that the sync is properly deleted")
public void deleteSync() throws InterruptedException {
Expand All @@ -429,7 +428,7 @@ public void deleteSync() throws InterruptedException {
startWorkflowAndWaitUntilReady(workflow, input);
testEnv.sleep(Duration.ofSeconds(30L));
workflow.deleteConnection();
testEnv.sleep(Duration.ofMinutes(20L));
Thread.sleep(500);

final Queue<ChangedStateEvent> events = testStateListener.events(testId);

Expand Down Expand Up @@ -457,7 +456,7 @@ public void deleteSync() throws InterruptedException {
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test that fresh workflow cleans the job state")
public void testStartFromCleanJobState() throws InterruptedException {
Expand Down Expand Up @@ -488,7 +487,7 @@ public void setup() {
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test workflow which receives a manual sync while running a scheduled sync does nothing")
public void manualRun() throws InterruptedException {
Expand Down Expand Up @@ -527,7 +526,6 @@ public void manualRun() throws InterruptedException {

}

@Disabled
@Test
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
Expand Down Expand Up @@ -559,10 +557,7 @@ public void cancelRunning() throws InterruptedException {

workflow.cancelJob();

// TODO
// For some reason this transiently fails if it is below the runtime.
// However, this should be reported almost immediately. I think this is a bug.
testEnv.sleep(Duration.ofMinutes(SleepingSyncWorkflow.RUN_TIME.toMinutes() + 1));
Thread.sleep(500);

final Queue<ChangedStateEvent> eventQueue = testStateListener.events(testId);
final List<ChangedStateEvent> events = new ArrayList<>(eventQueue);
Expand Down Expand Up @@ -611,10 +606,7 @@ public void deleteRunning() throws InterruptedException {

workflow.deleteConnection();

// TODO
// For some reason this transiently fails if it is below the runtime.
// However, this should be reported almost immediately. I think this is a bug.
testEnv.sleep(Duration.ofMinutes(SleepingSyncWorkflow.RUN_TIME.toMinutes() + 1));
Thread.sleep(500);

final Queue<ChangedStateEvent> eventQueue = testStateListener.events(testId);
final List<ChangedStateEvent> events = new ArrayList<>(eventQueue);
Expand All @@ -638,7 +630,7 @@ public void deleteRunning() throws InterruptedException {
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test that resetting a non-running workflow starts a reset job")
public void resetStart() throws InterruptedException {
Expand All @@ -659,7 +651,7 @@ public void resetStart() throws InterruptedException {
startWorkflowAndWaitUntilReady(workflow, input);
testEnv.sleep(Duration.ofMinutes(5L));
workflow.resetConnection();
testEnv.sleep(Duration.ofMinutes(15L));
Thread.sleep(500);

final Queue<ChangedStateEvent> events = testStateListener.events(testId);

Expand Down Expand Up @@ -694,7 +686,7 @@ public void resetCancelRunningWorkflow() throws InterruptedException {
workflow.submitManualSync();
testEnv.sleep(Duration.ofSeconds(30L));
workflow.resetConnection();
testEnv.sleep(Duration.ofMinutes(15L));
Thread.sleep(500);

final Queue<ChangedStateEvent> eventQueue = testStateListener.events(testId);
final List<ChangedStateEvent> events = new ArrayList<>(eventQueue);
Expand All @@ -717,7 +709,6 @@ public void resetCancelRunningWorkflow() throws InterruptedException {
@Timeout(value = 60,
unit = TimeUnit.SECONDS)
@DisplayName("Test that cancelling a reset deletes streamsToReset from stream_resets table")
@Disabled
public void cancelResetRemovesStreamsToReset() throws InterruptedException {
final UUID connectionId = UUID.randomUUID();
final UUID testId = UUID.randomUUID();
Expand All @@ -738,7 +729,7 @@ public void cancelResetRemovesStreamsToReset() throws InterruptedException {

testEnv.sleep(Duration.ofSeconds(30L));
workflow.cancelJob();
testEnv.sleep(Duration.ofMinutes(15L));
Thread.sleep(500);

Mockito.verify(mStreamResetActivity).deleteStreamResetRecordsForJob(new DeleteStreamResetRecordsForJobInput(connectionId, JOB_ID));
}
Expand Down Expand Up @@ -823,7 +814,7 @@ public void setup() {
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test that auto disable activity is touched during failure")
public void testAutoDisableOnFailure() throws InterruptedException {
Expand Down Expand Up @@ -851,7 +842,7 @@ public void testAutoDisableOnFailure() throws InterruptedException {
testEnv.sleep(Duration.ofMinutes(1));

workflow.submitManualSync();
testEnv.sleep(Duration.ofMinutes(1L)); // any time after no-waiting manual run
Thread.sleep(500); // any time after no-waiting manual run

Mockito.verify(mJobCreationAndStatusUpdateActivity, atLeastOnce()).attemptFailureWithAttemptNumber(Mockito.any());
Mockito.verify(mJobCreationAndStatusUpdateActivity, atLeastOnce()).jobFailure(Mockito.any());
Expand All @@ -860,7 +851,7 @@ public void testAutoDisableOnFailure() throws InterruptedException {
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test that auto disable activity is not touched during job success")
public void testNoAutoDisableOnSuccess() throws InterruptedException {
Expand Down Expand Up @@ -888,7 +879,7 @@ public void testNoAutoDisableOnSuccess() throws InterruptedException {
testEnv.sleep(Duration.ofMinutes(1));

workflow.submitManualSync();
testEnv.sleep(Duration.ofMinutes(1L)); // any time after no-waiting manual run
Thread.sleep(500); // any time after no-waiting manual run
Mockito.verifyNoInteractions(mAutoDisableConnectionActivity);
}

Expand Down Expand Up @@ -918,7 +909,7 @@ public void setup() {
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test that Source CHECK failures are recorded")
public void testSourceCheckFailuresRecorded() throws InterruptedException {
Expand Down Expand Up @@ -949,14 +940,14 @@ public void testSourceCheckFailuresRecorded() throws InterruptedException {
testEnv.sleep(Duration.ofMinutes(1));

workflow.submitManualSync();
testEnv.sleep(Duration.ofMinutes(1L)); // any time after no-waiting manual run
Thread.sleep(500); // any time after no-waiting manual run

Mockito.verify(mJobCreationAndStatusUpdateActivity)
.attemptFailureWithAttemptNumber(Mockito.argThat(new HasFailureFromOriginWithType(FailureOrigin.SOURCE, FailureType.CONFIG_ERROR)));
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test that Destination CHECK failures are recorded")
public void testDestinationCheckFailuresRecorded() throws InterruptedException {
Expand Down Expand Up @@ -988,14 +979,14 @@ public void testDestinationCheckFailuresRecorded() throws InterruptedException {
testEnv.sleep(Duration.ofMinutes(1));

workflow.submitManualSync();
testEnv.sleep(Duration.ofMinutes(1L)); // any time after no-waiting manual run
Thread.sleep(500); // any time after no-waiting manual run

Mockito.verify(mJobCreationAndStatusUpdateActivity)
.attemptFailureWithAttemptNumber(Mockito.argThat(new HasFailureFromOriginWithType(FailureOrigin.DESTINATION, FailureType.CONFIG_ERROR)));
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test that reset workflows do not CHECK the source")
public void testSourceCheckSkippedWhenReset() throws InterruptedException {
Expand Down Expand Up @@ -1028,14 +1019,14 @@ public void testSourceCheckSkippedWhenReset() throws InterruptedException {
testEnv.sleep(Duration.ofMinutes(1));

workflow.submitManualSync();
testEnv.sleep(Duration.ofMinutes(1L)); // any time after no-waiting manual run
Thread.sleep(500); // any time after no-waiting manual run

Mockito.verify(mJobCreationAndStatusUpdateActivity, atLeastOnce())
.attemptFailureWithAttemptNumber(Mockito.argThat(new HasFailureFromOriginWithType(FailureOrigin.DESTINATION, FailureType.CONFIG_ERROR)));
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test that source and destination failures are recorded")
public void testSourceAndDestinationFailuresRecorded() throws InterruptedException {
Expand All @@ -1062,7 +1053,7 @@ public void testSourceAndDestinationFailuresRecorded() throws InterruptedExcepti
testEnv.sleep(Duration.ofMinutes(1));

workflow.submitManualSync();
testEnv.sleep(Duration.ofMinutes(1L)); // any time after no-waiting manual run
Thread.sleep(500); // any time after no-waiting manual run

Mockito.verify(mJobCreationAndStatusUpdateActivity)
.attemptFailureWithAttemptNumber(Mockito.argThat(new HasFailureFromOrigin(FailureOrigin.SOURCE)));
Expand All @@ -1071,7 +1062,7 @@ public void testSourceAndDestinationFailuresRecorded() throws InterruptedExcepti
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test that normalization failure is recorded")
public void testNormalizationFailure() throws InterruptedException {
Expand All @@ -1098,14 +1089,14 @@ public void testNormalizationFailure() throws InterruptedException {
testEnv.sleep(Duration.ofMinutes(1));

workflow.submitManualSync();
testEnv.sleep(Duration.ofMinutes(1L)); // any time after no-waiting manual run
Thread.sleep(500); // any time after no-waiting manual run

Mockito.verify(mJobCreationAndStatusUpdateActivity)
.attemptFailureWithAttemptNumber(Mockito.argThat(new HasFailureFromOrigin(FailureOrigin.NORMALIZATION)));
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test that dbt failure is recorded")
public void testDbtFailureRecorded() throws InterruptedException {
Expand All @@ -1132,14 +1123,14 @@ public void testDbtFailureRecorded() throws InterruptedException {
testEnv.sleep(Duration.ofMinutes(1));

workflow.submitManualSync();
testEnv.sleep(Duration.ofMinutes(1L)); // any time after no-waiting manual run
Thread.sleep(500); // any time after no-waiting manual run

Mockito.verify(mJobCreationAndStatusUpdateActivity)
.attemptFailureWithAttemptNumber(Mockito.argThat(new HasFailureFromOrigin(FailureOrigin.DBT)));
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test that persistence failure is recorded")
public void testPersistenceFailureRecorded() throws InterruptedException {
Expand All @@ -1166,14 +1157,14 @@ public void testPersistenceFailureRecorded() throws InterruptedException {
testEnv.sleep(Duration.ofMinutes(1));

workflow.submitManualSync();
testEnv.sleep(Duration.ofMinutes(1L)); // any time after no-waiting manual run
Thread.sleep(500); // any time after no-waiting manual run

Mockito.verify(mJobCreationAndStatusUpdateActivity)
.attemptFailureWithAttemptNumber(Mockito.argThat(new HasFailureFromOrigin(FailureOrigin.PERSISTENCE)));
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test that replication worker failure is recorded")
public void testReplicationFailureRecorded() throws InterruptedException {
Expand All @@ -1200,7 +1191,7 @@ public void testReplicationFailureRecorded() throws InterruptedException {
testEnv.sleep(Duration.ofMinutes(1));

workflow.submitManualSync();
testEnv.sleep(Duration.ofMinutes(1L)); // any time after no-waiting manual run
Thread.sleep(500); // any time after no-waiting manual run

Mockito.verify(mJobCreationAndStatusUpdateActivity)
.attemptFailureWithAttemptNumber(Mockito.argThat(new HasFailureFromOrigin(FailureOrigin.REPLICATION)));
Expand Down Expand Up @@ -1293,7 +1284,7 @@ void testCanRetryFailedActivity() throws InterruptedException {
// Sleep test env for half of restart delay, so that we know we are in the middle of the delay
testEnv.sleep(ConnectionManagerWorkflowImpl.WORKFLOW_FAILURE_RESTART_DELAY.dividedBy(2));
workflow.retryFailedActivity();
testEnv.sleep(Duration.ofSeconds(30L));
Thread.sleep(500); // any time after no-waiting manual run

final Queue<ChangedStateEvent> events = testStateListener.events(testId);

Expand Down