Skip to content

Commit

Permalink
update tests to handle new restart behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
lmossman committed Jun 15, 2022
1 parent 5f8613b commit 802fda9
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public class ConnectionManagerWorkflowImpl implements ConnectionManagerWorkflow
private static final String CHECK_BEFORE_SYNC_TAG = "check_before_sync";
private static final int CHECK_BEFORE_SYNC_CURRENT_VERSION = 1;

private static final Duration WORKFLOW_FAILURE_RESTART_DELAY = Duration.ofSeconds(new EnvConfigs().getWorkflowFailureRestartDelaySeconds());
static final Duration WORKFLOW_FAILURE_RESTART_DELAY = Duration.ofSeconds(new EnvConfigs().getWorkflowFailureRestartDelaySeconds());

private WorkflowState workflowState = new WorkflowState(UUID.randomUUID(), new NoopStateListener());

Expand Down Expand Up @@ -491,7 +491,7 @@ private <INPUT, OUTPUT> OUTPUT runMandatoryActivityWithOutput(final Function<INP
try {
return mapper.apply(input);
} catch (final Exception e) {
log.error("[ACTIVITY-RETRY-FAILURE] Connection " + connectionId +
log.error("[ACTIVITY-FAILURE] Connection " + connectionId +
" failed to run an activity. Connection manager workflow will be restarted after a delay of " +
WORKFLOW_FAILURE_RESTART_DELAY.getSeconds() + " seconds.", e);
// TODO (https://github.com/airbytehq/airbyte/issues/13773) add tracking/notification
Expand All @@ -512,8 +512,7 @@ private <INPUT, OUTPUT> OUTPUT runMandatoryActivityWithOutput(final Function<INP
return runMandatoryActivityWithOutput(mapper, input);
}

log.info("Finished wait for connection {}, restarting connection manager workflow",
WORKFLOW_FAILURE_RESTART_DELAY.getSeconds());
log.info("Finished wait for connection {}, restarting connection manager workflow", connectionId);

final ConnectionUpdaterInput newWorkflowInput = ConnectionManagerUtils.buildStartWorkflowInput(connectionId);
// this ensures that the new workflow will still perform a reset if an activity failed while
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@
import io.airbyte.workers.temporal.scheduling.testsyncworkflow.SyncWorkflowFailingWithHearbeatTimeoutException;
import io.airbyte.workers.temporal.scheduling.testsyncworkflow.SyncWorkflowWithActivityFailureException;
import io.airbyte.workers.temporal.sync.SyncWorkflow;
import io.temporal.api.enums.v1.WorkflowExecutionStatus;
import io.temporal.api.filter.v1.WorkflowExecutionFilter;
import io.temporal.api.workflowservice.v1.ListClosedWorkflowExecutionsRequest;
import io.temporal.api.workflowservice.v1.ListClosedWorkflowExecutionsResponse;
import io.temporal.client.WorkflowClient;
import io.temporal.client.WorkflowOptions;
import io.temporal.failure.ApplicationFailure;
Expand All @@ -57,7 +61,6 @@
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.assertj.core.api.Assertions;
Expand Down Expand Up @@ -88,6 +91,7 @@ public class ConnectionManagerWorkflowTest {
private static final int ATTEMPT_ID = 1;

private static final Duration SCHEDULE_WAIT = Duration.ofMinutes(20L);
private static final String WORKFLOW_ID = "workflow-id";

private final ConfigFetchActivity mConfigFetchActivity =
Mockito.mock(ConfigFetchActivity.class, Mockito.withSettings().withoutAnnotations());
Expand Down Expand Up @@ -1221,15 +1225,15 @@ public void testReplicationFailureRecorded() throws InterruptedException {
}

@Nested
@DisplayName("Test that the workflow are properly getting stuck")
class StuckWorkflow {
@DisplayName("Test that the workflow is properly restarted after activity failures.")
class FailedActivityWorkflow {

@BeforeEach
public void setup() {
setupSpecificChildWorkflow(SleepingSyncWorkflow.class);
}

public static Stream<Arguments> getSetupFailingFailingActivityBeforeRun() {
public static Stream<Arguments> getSetupFailingActivity() {
return Stream.of(
Arguments.of(new Thread(() -> Mockito.when(mJobCreationAndStatusUpdateActivity.createNewJob(Mockito.any()))
.thenThrow(ApplicationFailure.newNonRetryableFailure("", "")))),
Expand All @@ -1243,8 +1247,8 @@ public static Stream<Arguments> getSetupFailingFailingActivityBeforeRun() {
}

@ParameterizedTest
@MethodSource("getSetupFailingFailingActivityBeforeRun")
void testGetStuckBeforeRun(final Thread mockSetup) throws InterruptedException {
@MethodSource("getSetupFailingActivity")
void testWorkflowRestarted(final Thread mockSetup) throws InterruptedException {
mockSetup.run();
Mockito.when(mConfigFetchActivity.getTimeToWait(Mockito.any())).thenReturn(new ScheduleRetrieverOutput(
Duration.ZERO));
Expand All @@ -1266,21 +1270,22 @@ void testGetStuckBeforeRun(final Thread mockSetup) throws InterruptedException {
.build();

startWorkflowAndWaitUntilReady(workflow, input);
testEnv.sleep(Duration.ofMinutes(2L));

// Sleep test env for restart delay, plus a small buffer to ensure that the workflow executed the
// logic after the delay
testEnv.sleep(ConnectionManagerWorkflowImpl.WORKFLOW_FAILURE_RESTART_DELAY.plus(Duration.ofSeconds(10)));

final Queue<ChangedStateEvent> events = testStateListener.events(testId);

Assertions.assertThat(events)
.filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.RUNNING && changedStateEvent.isValue())
.isEmpty();

Assertions.assertThat(events)
.filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.QUARANTINED && changedStateEvent.isValue())
.hasSize(1);
assertWorkflowWasContinuedAsNew();
}

@Test
void testCanGetUnstuck() throws InterruptedException {
void testCanRetryFailedActivity() throws InterruptedException {
Mockito.when(mJobCreationAndStatusUpdateActivity.createNewJob(Mockito.any()))
.thenThrow(ApplicationFailure.newNonRetryableFailure("", ""))
.thenReturn(new JobCreationOutput(1l));
Expand All @@ -1305,76 +1310,18 @@ void testCanGetUnstuck() throws InterruptedException {

startWorkflowAndWaitUntilReady(workflow, input);

testEnv.sleep(Duration.ofSeconds(80L));
// Sleep test env for half of restart delay, so that we know we are in the middle of the delay
testEnv.sleep(ConnectionManagerWorkflowImpl.WORKFLOW_FAILURE_RESTART_DELAY.dividedBy(2));
workflow.retryFailedActivity();
testEnv.sleep(Duration.ofSeconds(30L));

final Queue<ChangedStateEvent> events = testStateListener.events(testId);

Assertions.assertThat(events)
.filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.QUARANTINED && changedStateEvent.isValue())
.hasSizeGreaterThanOrEqualTo(1);

Assertions.assertThat(events)
.filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.RETRY_FAILED_ACTIVITY && changedStateEvent.isValue())
.hasSize(1);
}

public static Stream<Arguments> getSetupFailingFailingActivityAfterRun() {
return Stream.of(
Arguments.of((Consumer<ConnectionManagerWorkflow>) ((ConnectionManagerWorkflow workflow) -> System.out.println("do Nothing")),
new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", ""))
.when(mJobCreationAndStatusUpdateActivity).jobSuccessWithAttemptNumber(Mockito.any(JobSuccessInputWithAttemptNumber.class)))),
Arguments.of((Consumer<ConnectionManagerWorkflow>) ((ConnectionManagerWorkflow workflow) -> workflow.cancelJob()),
new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", ""))
.when(mJobCreationAndStatusUpdateActivity).jobCancelledWithAttemptNumber(Mockito.any(JobCancelledInputWithAttemptNumber.class)))),
Arguments.of((Consumer<ConnectionManagerWorkflow>) ((ConnectionManagerWorkflow workflow) -> workflow.deleteConnection()),
new Thread(() -> Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", ""))
.when(mConnectionDeletionActivity).deleteConnection(Mockito.any()))));
}

@ParameterizedTest
@MethodSource("getSetupFailingFailingActivityAfterRun")
void testGetStuckAfterRun(final Consumer<ConnectionManagerWorkflow> signalSender, final Thread mockSetup) throws InterruptedException {
mockSetup.run();

final UUID testId = UUID.randomUUID();
final TestStateListener testStateListener = new TestStateListener();
final WorkflowState workflowState = new WorkflowState(testId, testStateListener);

final ConnectionUpdaterInput input = ConnectionUpdaterInput.builder()
.connectionId(UUID.randomUUID())
.jobId(null)
.attemptId(null)
.fromFailure(false)
.attemptNumber(1)
.workflowState(workflowState)
.resetConnection(false)
.fromJobResetFailure(false)
.build();

startWorkflowAndWaitUntilReady(workflow, input);

// wait for workflow to initialize
testEnv.sleep(Duration.ofSeconds(5));
workflow.submitManualSync();

// wait for workflow to initialize
testEnv.sleep(Duration.ofSeconds(5));
signalSender.accept(workflow);

// TODO
// For some reason this transiently fails if it is below the runtime.
// However, this should be reported almost immediately. I think this is a bug.
testEnv.sleep(Duration.ofSeconds(SleepingSyncWorkflow.RUN_TIME.toSeconds() + 2));

final Queue<ChangedStateEvent> events = testStateListener.events(testId);

Assertions.assertThat(events)
.filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.QUARANTINED && changedStateEvent.isValue())
.hasSizeGreaterThanOrEqualTo(1);
}

}

@Nested
Expand Down Expand Up @@ -1449,53 +1396,6 @@ public void failedResetContinueAsReset() throws InterruptedException {

}

@RepeatedTest(10)
@Timeout(value = 2,
unit = TimeUnit.SECONDS)
@DisplayName("Test that we are getting stuck if the report of a failure happen")
void testGetStuckAfterRun() throws InterruptedException {
Mockito.doThrow(ApplicationFailure.newNonRetryableFailure("", ""))
.when(mJobCreationAndStatusUpdateActivity).attemptFailureWithAttemptNumber(Mockito.any());

Mockito.when(mConfigFetchActivity.getMaxAttempt())
.thenReturn(new GetMaxAttemptOutput(3));

final UUID testId = UUID.randomUUID();
final TestStateListener testStateListener = new TestStateListener();
final WorkflowState workflowState = new WorkflowState(testId, testStateListener);

final ConnectionUpdaterInput input = ConnectionUpdaterInput.builder()
.connectionId(UUID.randomUUID())
.jobId(null)
.attemptId(null)
.fromFailure(false)
.attemptNumber(1)
.workflowState(workflowState)
.resetConnection(false)
.fromJobResetFailure(false)
.build();

startWorkflowAndWaitUntilReady(workflow, input);

// wait for workflow to initialize
testEnv.sleep(Duration.ofSeconds(5));
workflow.submitManualSync();

// wait for workflow to initialize
testEnv.sleep(Duration.ofSeconds(5));

// TODO
// For some reason this transiently fails if it is below the runtime.
// However, this should be reported almost immediately. I think this is a bug.
testEnv.sleep(Duration.ofSeconds(SleepingSyncWorkflow.RUN_TIME.toSeconds() + 2));

final Queue<ChangedStateEvent> events = testStateListener.events(testId);

Assertions.assertThat(events)
.filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.QUARANTINED && changedStateEvent.isValue())
.hasSize(1);
}

@ParameterizedTest
@MethodSource("io.airbyte.workers.temporal.scheduling.ConnectionManagerWorkflowTest#getMaxAttemptForResetRetry")
public void failedResetContinueAttemptAsReset(final int maxAttempt) throws InterruptedException {
Expand Down Expand Up @@ -1623,6 +1523,7 @@ private <T extends SyncWorkflow> void setupSpecificChildWorkflow(final Class<T>
ConnectionManagerWorkflow.class,
WorkflowOptions.newBuilder()
.setTaskQueue(TemporalJobType.CONNECTION_UPDATER.name())
.setWorkflowId(WORKFLOW_ID)
.build());
}

Expand Down Expand Up @@ -1687,4 +1588,18 @@ private void runRetryResetWaitsAfterJobFailureTest() throws InterruptedException
.isFalse();
}

private void assertWorkflowWasContinuedAsNew() {
final ListClosedWorkflowExecutionsRequest request = ListClosedWorkflowExecutionsRequest.newBuilder()
.setNamespace(testEnv.getNamespace())
.setExecutionFilter(WorkflowExecutionFilter.newBuilder().setWorkflowId(WORKFLOW_ID))
.build();
final ListClosedWorkflowExecutionsResponse listResponse = testEnv
.getWorkflowService()
.blockingStub()
.listClosedWorkflowExecutions(request);
Assertions.assertThat(listResponse.getExecutionsCount()).isGreaterThanOrEqualTo(1);
Assertions.assertThat(listResponse.getExecutionsList().get(0).getStatus())
.isEqualTo(WorkflowExecutionStatus.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW);
}

}

0 comments on commit 802fda9

Please sign in to comment.