Skip to content

Commit

Permalink
Fix flaky connection manager workflow test (#14271)
Browse files Browse the repository at this point in the history
* try thread sleep instead of test env, and run 100 times

* replace testEnv.sleep with Thread.sleep in several tests

* replace RepeatedTest with Test

* replace testEnv.sleep with Thread.sleep after signals are executed

* run each test 100 times to see if any are flaky

* add log

* change repetitions to 100 to avoid out of memory

* format

* swap repeated test for normal test
  • Loading branch information
lmossman authored Jun 30, 2022
1 parent 8784481 commit 29ce34f
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public JobCreationOutput createNewJob(final JobCreationInput input) {

final StandardSync standardSync = configRepository.getStandardSync(input.getConnectionId());
final List<StreamDescriptor> streamsToReset = streamResetPersistence.getStreamResets(input.getConnectionId());
log.info("Found the following streams to reset for connection {}: {}", input.getConnectionId(), streamsToReset);

if (!streamsToReset.isEmpty()) {
final DestinationConnection destination = configRepository.getDestinationConnection(standardSync.getDestinationId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -185,7 +184,7 @@ public void setup() {
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test that a successful workflow retries and waits")
public void runSuccess() throws InterruptedException {
Expand Down Expand Up @@ -228,7 +227,7 @@ public void runSuccess() throws InterruptedException {
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test workflow does not wait to run after a failure")
public void retryAfterFail() throws InterruptedException {
Expand Down Expand Up @@ -269,7 +268,7 @@ public void retryAfterFail() throws InterruptedException {
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test workflow which receives a manual run signal stops waiting")
public void manualRun() throws InterruptedException {
Expand All @@ -290,7 +289,7 @@ public void manualRun() throws InterruptedException {
startWorkflowAndWaitUntilReady(workflow, input);
testEnv.sleep(Duration.ofMinutes(1L)); // any value here, just so it's started
workflow.submitManualSync();
testEnv.sleep(Duration.ofMinutes(1L)); // any value here, just so it's past the empty workflow run
Thread.sleep(500);

final Queue<ChangedStateEvent> events = testStateListener.events(testId);

Expand All @@ -317,7 +316,7 @@ public void manualRun() throws InterruptedException {
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test workflow which receives an update signal stops waiting, doesn't run, and doesn't update the job status")
public void updatedSignalReceived() throws InterruptedException {
Expand All @@ -338,7 +337,7 @@ public void updatedSignalReceived() throws InterruptedException {
startWorkflowAndWaitUntilReady(workflow, input);
testEnv.sleep(Duration.ofSeconds(30L));
workflow.connectionUpdated();
testEnv.sleep(Duration.ofSeconds(20L));
Thread.sleep(500);

final Queue<ChangedStateEvent> events = testStateListener.events(testId);

Expand All @@ -365,7 +364,7 @@ public void updatedSignalReceived() throws InterruptedException {
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test that cancelling a non-running workflow doesn't do anything")
public void cancelNonRunning() throws InterruptedException {
Expand Down Expand Up @@ -408,7 +407,7 @@ public void cancelNonRunning() throws InterruptedException {
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test that the sync is properly deleted")
public void deleteSync() throws InterruptedException {
Expand All @@ -429,7 +428,7 @@ public void deleteSync() throws InterruptedException {
startWorkflowAndWaitUntilReady(workflow, input);
testEnv.sleep(Duration.ofSeconds(30L));
workflow.deleteConnection();
testEnv.sleep(Duration.ofMinutes(20L));
Thread.sleep(500);

final Queue<ChangedStateEvent> events = testStateListener.events(testId);

Expand Down Expand Up @@ -457,7 +456,7 @@ public void deleteSync() throws InterruptedException {
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test that fresh workflow cleans the job state")
public void testStartFromCleanJobState() throws InterruptedException {
Expand Down Expand Up @@ -488,7 +487,7 @@ public void setup() {
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test workflow which receives a manual sync while running a scheduled sync does nothing")
public void manualRun() throws InterruptedException {
Expand Down Expand Up @@ -527,7 +526,6 @@ public void manualRun() throws InterruptedException {

}

@Disabled
@Test
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
Expand Down Expand Up @@ -559,10 +557,7 @@ public void cancelRunning() throws InterruptedException {

workflow.cancelJob();

// TODO
// For some reason this transiently fails if it is below the runtime.
// However, this should be reported almost immediately. I think this is a bug.
testEnv.sleep(Duration.ofMinutes(SleepingSyncWorkflow.RUN_TIME.toMinutes() + 1));
Thread.sleep(500);

final Queue<ChangedStateEvent> eventQueue = testStateListener.events(testId);
final List<ChangedStateEvent> events = new ArrayList<>(eventQueue);
Expand Down Expand Up @@ -611,10 +606,7 @@ public void deleteRunning() throws InterruptedException {

workflow.deleteConnection();

// TODO
// For some reason this transiently fails if it is below the runtime.
// However, this should be reported almost immediately. I think this is a bug.
testEnv.sleep(Duration.ofMinutes(SleepingSyncWorkflow.RUN_TIME.toMinutes() + 1));
Thread.sleep(500);

final Queue<ChangedStateEvent> eventQueue = testStateListener.events(testId);
final List<ChangedStateEvent> events = new ArrayList<>(eventQueue);
Expand All @@ -638,7 +630,7 @@ public void deleteRunning() throws InterruptedException {
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test that resetting a non-running workflow starts a reset job")
public void resetStart() throws InterruptedException {
Expand All @@ -659,7 +651,7 @@ public void resetStart() throws InterruptedException {
startWorkflowAndWaitUntilReady(workflow, input);
testEnv.sleep(Duration.ofMinutes(5L));
workflow.resetConnection();
testEnv.sleep(Duration.ofMinutes(15L));
Thread.sleep(500);

final Queue<ChangedStateEvent> events = testStateListener.events(testId);

Expand Down Expand Up @@ -694,7 +686,7 @@ public void resetCancelRunningWorkflow() throws InterruptedException {
workflow.submitManualSync();
testEnv.sleep(Duration.ofSeconds(30L));
workflow.resetConnection();
testEnv.sleep(Duration.ofMinutes(15L));
Thread.sleep(500);

final Queue<ChangedStateEvent> eventQueue = testStateListener.events(testId);
final List<ChangedStateEvent> events = new ArrayList<>(eventQueue);
Expand All @@ -717,7 +709,6 @@ public void resetCancelRunningWorkflow() throws InterruptedException {
@Timeout(value = 60,
unit = TimeUnit.SECONDS)
@DisplayName("Test that cancelling a reset deletes streamsToReset from stream_resets table")
@Disabled
public void cancelResetRemovesStreamsToReset() throws InterruptedException {
final UUID connectionId = UUID.randomUUID();
final UUID testId = UUID.randomUUID();
Expand All @@ -738,7 +729,7 @@ public void cancelResetRemovesStreamsToReset() throws InterruptedException {

testEnv.sleep(Duration.ofSeconds(30L));
workflow.cancelJob();
testEnv.sleep(Duration.ofMinutes(15L));
Thread.sleep(500);

Mockito.verify(mStreamResetActivity).deleteStreamResetRecordsForJob(new DeleteStreamResetRecordsForJobInput(connectionId, JOB_ID));
}
Expand Down Expand Up @@ -823,7 +814,7 @@ public void setup() {
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test that auto disable activity is touched during failure")
public void testAutoDisableOnFailure() throws InterruptedException {
Expand Down Expand Up @@ -851,7 +842,7 @@ public void testAutoDisableOnFailure() throws InterruptedException {
testEnv.sleep(Duration.ofMinutes(1));

workflow.submitManualSync();
testEnv.sleep(Duration.ofMinutes(1L)); // any time after no-waiting manual run
Thread.sleep(500); // any time after no-waiting manual run

Mockito.verify(mJobCreationAndStatusUpdateActivity, atLeastOnce()).attemptFailureWithAttemptNumber(Mockito.any());
Mockito.verify(mJobCreationAndStatusUpdateActivity, atLeastOnce()).jobFailure(Mockito.any());
Expand All @@ -860,7 +851,7 @@ public void testAutoDisableOnFailure() throws InterruptedException {
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test that auto disable activity is not touched during job success")
public void testNoAutoDisableOnSuccess() throws InterruptedException {
Expand Down Expand Up @@ -888,7 +879,7 @@ public void testNoAutoDisableOnSuccess() throws InterruptedException {
testEnv.sleep(Duration.ofMinutes(1));

workflow.submitManualSync();
testEnv.sleep(Duration.ofMinutes(1L)); // any time after no-waiting manual run
Thread.sleep(500); // any time after no-waiting manual run
Mockito.verifyNoInteractions(mAutoDisableConnectionActivity);
}

Expand Down Expand Up @@ -918,7 +909,7 @@ public void setup() {
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test that Source CHECK failures are recorded")
public void testSourceCheckFailuresRecorded() throws InterruptedException {
Expand Down Expand Up @@ -949,14 +940,14 @@ public void testSourceCheckFailuresRecorded() throws InterruptedException {
testEnv.sleep(Duration.ofMinutes(1));

workflow.submitManualSync();
testEnv.sleep(Duration.ofMinutes(1L)); // any time after no-waiting manual run
Thread.sleep(500); // any time after no-waiting manual run

Mockito.verify(mJobCreationAndStatusUpdateActivity)
.attemptFailureWithAttemptNumber(Mockito.argThat(new HasFailureFromOriginWithType(FailureOrigin.SOURCE, FailureType.CONFIG_ERROR)));
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test that Destination CHECK failures are recorded")
public void testDestinationCheckFailuresRecorded() throws InterruptedException {
Expand Down Expand Up @@ -988,14 +979,14 @@ public void testDestinationCheckFailuresRecorded() throws InterruptedException {
testEnv.sleep(Duration.ofMinutes(1));

workflow.submitManualSync();
testEnv.sleep(Duration.ofMinutes(1L)); // any time after no-waiting manual run
Thread.sleep(500); // any time after no-waiting manual run

Mockito.verify(mJobCreationAndStatusUpdateActivity)
.attemptFailureWithAttemptNumber(Mockito.argThat(new HasFailureFromOriginWithType(FailureOrigin.DESTINATION, FailureType.CONFIG_ERROR)));
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test that reset workflows do not CHECK the source")
public void testSourceCheckSkippedWhenReset() throws InterruptedException {
Expand Down Expand Up @@ -1028,14 +1019,14 @@ public void testSourceCheckSkippedWhenReset() throws InterruptedException {
testEnv.sleep(Duration.ofMinutes(1));

workflow.submitManualSync();
testEnv.sleep(Duration.ofMinutes(1L)); // any time after no-waiting manual run
Thread.sleep(500); // any time after no-waiting manual run

Mockito.verify(mJobCreationAndStatusUpdateActivity, atLeastOnce())
.attemptFailureWithAttemptNumber(Mockito.argThat(new HasFailureFromOriginWithType(FailureOrigin.DESTINATION, FailureType.CONFIG_ERROR)));
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test that source and destination failures are recorded")
public void testSourceAndDestinationFailuresRecorded() throws InterruptedException {
Expand All @@ -1062,7 +1053,7 @@ public void testSourceAndDestinationFailuresRecorded() throws InterruptedExcepti
testEnv.sleep(Duration.ofMinutes(1));

workflow.submitManualSync();
testEnv.sleep(Duration.ofMinutes(1L)); // any time after no-waiting manual run
Thread.sleep(500); // any time after no-waiting manual run

Mockito.verify(mJobCreationAndStatusUpdateActivity)
.attemptFailureWithAttemptNumber(Mockito.argThat(new HasFailureFromOrigin(FailureOrigin.SOURCE)));
Expand All @@ -1071,7 +1062,7 @@ public void testSourceAndDestinationFailuresRecorded() throws InterruptedExcepti
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test that normalization failure is recorded")
public void testNormalizationFailure() throws InterruptedException {
Expand All @@ -1098,14 +1089,14 @@ public void testNormalizationFailure() throws InterruptedException {
testEnv.sleep(Duration.ofMinutes(1));

workflow.submitManualSync();
testEnv.sleep(Duration.ofMinutes(1L)); // any time after no-waiting manual run
Thread.sleep(500); // any time after no-waiting manual run

Mockito.verify(mJobCreationAndStatusUpdateActivity)
.attemptFailureWithAttemptNumber(Mockito.argThat(new HasFailureFromOrigin(FailureOrigin.NORMALIZATION)));
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test that dbt failure is recorded")
public void testDbtFailureRecorded() throws InterruptedException {
Expand All @@ -1132,14 +1123,14 @@ public void testDbtFailureRecorded() throws InterruptedException {
testEnv.sleep(Duration.ofMinutes(1));

workflow.submitManualSync();
testEnv.sleep(Duration.ofMinutes(1L)); // any time after no-waiting manual run
Thread.sleep(500); // any time after no-waiting manual run

Mockito.verify(mJobCreationAndStatusUpdateActivity)
.attemptFailureWithAttemptNumber(Mockito.argThat(new HasFailureFromOrigin(FailureOrigin.DBT)));
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test that persistence failure is recorded")
public void testPersistenceFailureRecorded() throws InterruptedException {
Expand All @@ -1166,14 +1157,14 @@ public void testPersistenceFailureRecorded() throws InterruptedException {
testEnv.sleep(Duration.ofMinutes(1));

workflow.submitManualSync();
testEnv.sleep(Duration.ofMinutes(1L)); // any time after no-waiting manual run
Thread.sleep(500); // any time after no-waiting manual run

Mockito.verify(mJobCreationAndStatusUpdateActivity)
.attemptFailureWithAttemptNumber(Mockito.argThat(new HasFailureFromOrigin(FailureOrigin.PERSISTENCE)));
}

@Test
@Timeout(value = 2,
@Timeout(value = 10,
unit = TimeUnit.SECONDS)
@DisplayName("Test that replication worker failure is recorded")
public void testReplicationFailureRecorded() throws InterruptedException {
Expand All @@ -1200,7 +1191,7 @@ public void testReplicationFailureRecorded() throws InterruptedException {
testEnv.sleep(Duration.ofMinutes(1));

workflow.submitManualSync();
testEnv.sleep(Duration.ofMinutes(1L)); // any time after no-waiting manual run
Thread.sleep(500); // any time after no-waiting manual run

Mockito.verify(mJobCreationAndStatusUpdateActivity)
.attemptFailureWithAttemptNumber(Mockito.argThat(new HasFailureFromOrigin(FailureOrigin.REPLICATION)));
Expand Down Expand Up @@ -1293,7 +1284,7 @@ void testCanRetryFailedActivity() throws InterruptedException {
// Sleep test env for half of restart delay, so that we know we are in the middle of the delay
testEnv.sleep(ConnectionManagerWorkflowImpl.WORKFLOW_FAILURE_RESTART_DELAY.dividedBy(2));
workflow.retryFailedActivity();
testEnv.sleep(Duration.ofSeconds(30L));
Thread.sleep(500); // any time after no-waiting manual run

final Queue<ChangedStateEvent> events = testStateListener.events(testId);

Expand Down

0 comments on commit 29ce34f

Please sign in to comment.