From c61b8c2161c1f4ba0ef124155aa2a8b66f62c9cf Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Wed, 9 Feb 2022 14:06:18 -0800 Subject: [PATCH] Do not set cancel as true in a reset (#10228) --- .../scheduling/ConnectionManagerWorkflowImpl.java | 7 ++++--- .../temporal/scheduling/state/WorkflowState.java | 10 ++++++++++ .../state/listener/WorkflowStateChangedListener.java | 3 ++- .../scheduling/ConnectionManagerWorkflowTest.java | 4 ++++ 4 files changed, 20 insertions(+), 4 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java index 83af030fccf1..2bcc17b85464 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowImpl.java @@ -187,7 +187,7 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr // When cancelling a reset, we endure that the next workflow won't be a reset. // We are using a specific workflow state for that, this makes the set of the fact that we are going // to continue as a reset testable. - if (workflowState.isResetConnection() && !workflowState.isCancelled()) { + if (workflowState.isCancelledForReset()) { workflowState.setContinueAsReset(true); connectionUpdaterInput.setJobId(null); connectionUpdaterInput.setAttemptNumber(1); @@ -205,7 +205,7 @@ public void run(final ConnectionUpdaterInput connectionUpdaterInput) throws Retr final ConnectionDeletionInput connectionDeletionInput = new ConnectionDeletionInput(connectionUpdaterInput.getConnectionId()); connectionDeletionActivity.deleteConnection(connectionDeletionInput); return; - } else if (workflowState.isCancelled()) { + } else if (workflowState.isCancelled() || workflowState.isCancelledForReset()) { jobCreationAndStatusUpdateActivity.jobCancelled(new JobCancelledInput( maybeJobId.get(), maybeAttemptId.get(), @@ -301,7 +301,8 @@ public void connectionUpdated() { public void resetConnection() { workflowState.setResetConnection(true); if (workflowState.isRunning()) { - cancelJob(); + workflowState.setCancelledForReset(true); + syncWorkflowCancellationScope.cancel(); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowState.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowState.java index a80a7bbec58a..2769d302b210 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowState.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/WorkflowState.java @@ -30,6 +30,7 @@ public WorkflowState(final UUID id, final WorkflowStateChangedListener stateChan private boolean failed = false; private boolean resetConnection = false; private boolean continueAsReset = false; + private boolean cancelledForReset = false; public void setRunning(final boolean running) { final ChangedStateEvent event = new ChangedStateEvent( @@ -95,6 +96,14 @@ public void setContinueAsReset(final boolean continueAsReset) { this.continueAsReset = continueAsReset; } + public void setCancelledForReset(final boolean cancelledForReset) { + final ChangedStateEvent event = new ChangedStateEvent( + StateField.CANCELLED_FOR_RESET, + cancelledForReset); + stateChangedListener.addEvent(id, event); + this.cancelledForReset = cancelledForReset; + } + public void reset() { this.setRunning(false); this.setDeleted(false); @@ -104,6 +113,7 @@ public void reset() { this.setFailed(false); this.setResetConnection(false); this.setContinueAsReset(false); + this.setCancelledForReset(false); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/WorkflowStateChangedListener.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/WorkflowStateChangedListener.java index 99d4675c3542..b6ebdd6fed73 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/WorkflowStateChangedListener.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/state/listener/WorkflowStateChangedListener.java @@ -30,7 +30,8 @@ enum StateField { UPDATED, FAILED, RESET, - CONTINUE_AS_RESET + CONTINUE_AS_RESET, + CANCELLED_FOR_RESET } @Value diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java index ed00cdd74bda..84deda18ef5e 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionManagerWorkflowTest.java @@ -508,6 +508,10 @@ public void resetCancelRunningWorkflow() { final Queue events = testStateListener.events(testId); + Assertions.assertThat(events) + .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.CANCELLED_FOR_RESET && changedStateEvent.isValue()) + .hasSizeGreaterThanOrEqualTo(1); + Assertions.assertThat(events) .filteredOn(changedStateEvent -> changedStateEvent.getField() == StateField.RESET && changedStateEvent.isValue()) .hasSizeGreaterThanOrEqualTo(1);