-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Repair temporal state when performing manual actions #12289
Conversation
* @throws DeletedWorkflowException if the workflow was deleted, according to the workflow state | ||
* @throws UnreachableWorkflowException if the workflow is unreachable | ||
*/ | ||
private ConnectionManagerWorkflow getConnectionManagerWorkflow(final UUID connectionId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method and the following method are the main changes in this PR - these methods offer the rest of this class a way to retrieve the connection manager workflow while also enforcing that they handle the deleted case and the unreachable workflow case through exceptions. The second method repairAndRetrieveConnectionManagerWorkflow
handles the unreachable case automatically by automatically restarting the temporal workflow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! I like this approach alot. Made a couple clean up suggestions but otherwise it looks good!
airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java
Outdated
Show resolved
Hide resolved
airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java
Outdated
Show resolved
Hide resolved
final ConnectionManagerWorkflow connectionManagerWorkflow; | ||
try { | ||
connectionManagerWorkflow = repairAndRetrieveConnectionManagerWorkflow(connectionId); | ||
} catch (final DeletedWorkflowException e) { | ||
log.error("Can't cancel a deleted workflow"); | ||
return new ManualOperationResult( | ||
Optional.of(e.getMessage()), | ||
Optional.empty()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this block of code is repeated a bunch of times in this class (except for the contents of the log message). can we DRY it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is a hard piece of logic to DRY, because this logic means that this method should return a ManualOperationResult in the case of a DeletedWorkflowException. So if I try to move this logic into another method, that method will need to somehow indicate to this method that it should return a ManualOperationResult in that case, or return a ConnectionManagerWorkflow in the normal case, and this method will need to handle both cases with a conditional. So I think either way, this will be somewhat ugly if we try to DRY it.
|
||
package io.airbyte.workers.temporal.exception; | ||
|
||
public class DeletedWorkflowException extends Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe helpful for this exception and the other one to add a javadoc comment explaining what they mean and why they might occur.
} | ||
|
||
final ConnectionManagerWorkflow connectionManagerWorkflow = | ||
getExistingWorkflow(ConnectionManagerWorkflow.class, getConnectionManagerName(connectionId)); | ||
if (workflowState.isDeleted()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand how this block is working. I means than for most of the deleted workflow, we will consider them as Unreachable because the workflow are suppose to be terminated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@benmoriceau One thing I discovered while investigating this issue is that when we delete a connection, the temporal workflow is actually not terminated. The temporal workflow in the delete case has status Completed
, which is a separate status from Terminated
. For these deleted workflows, we can actually still retrieve the workflow and read values from its workflowState, but we cannot call any signal methods on the workflow because it is not actively running.
I think the reason the temporal workflow is Completed
and not Terminated
in the deletion case is because we just return
in that case:
Lines 105 to 109 in f816946
if (workflowState.isDeleted()) { | |
log.info("Workflow deletion was requested. Calling deleteConnection activity before terminating the workflow."); | |
deleteConnectionBeforeTerminatingTheWorkflow(); | |
return; | |
} |
And I think when the
run
method return
s, temporal marks the workflow as Completed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So my new understanding is that the "unreachable" case is not ever really expected, which is why we repair the workflow in that case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see and for a Terminated
workflow, I guess we can't query the state.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep that's correct
This looks fine to me with this assumption. I have one more concern about the fact that we are starting the workflow asynchronously is that I am wondering if we should use https://www.javadoc.io/static/io.temporal/temporal-sdk/1.0.3/io/temporal/client/WorkflowClient.html#newSignalWithStartRequest-- and https://www.javadoc.io/static/io.temporal/temporal-sdk/1.0.3/io/temporal/client/WorkflowClient.html#signalWithStart-io.temporal.client.BatchRequest- to ensure that there won't be any race condition between the workflow being accessible and the signal being send after a restart. |
@benmoriceau to make sure I understand correctly, you are suggesting that in these cases where we restart the workflow before sending a signal, we should instead submit the signal with the start request in the same |
@lmossman yes it is exactly that. Temporal will ensure that the workflow is reachable before submitting the signal to it. |
LGTM, I have approved it. |
37b0fdc
to
925a421
Compare
…ow (#12589) * first working iteration of cleaning job state on first workflow run * second iteration, with tests * undo local testing changes * move method * add comment explaining placement of clean job state logic * change connection_workflow failure origin value to platform * remove cast from new query * create static var for non terminal job statuses * change failure origin value to airbyte_platform * tweak external message wording * remove unused variable * reword external message * fix merge conflict * remove log lines * move cleaning job state to beginning of workflow * do not clean job state if there is already a job id for this workflow, and add test * see if sleeping fixes test on CI * add repeated test annotation to protect from flakiness * fail jobs before creating new ones to protect from quarantined state * update external message for cleaning job state error
* Repair temporal state when performing manual actions * refactor temporal client and fix tests * add unreachable workflow exception * format * test repeated deletion * add acceptance tests for automatic workflow repair * rename and DRY up manual operation methods in SchedulerHandler * refactor temporal client to batch signal and start requests together in repair case * add comment * remove main method * fix job id fetching * only overwrite workflowState if reset flags are true on input * fix test * fix cancel endpoint * Clean job state before creating new jobs in connection manager workflow (#12589) * first working iteration of cleaning job state on first workflow run * second iteration, with tests * undo local testing changes * move method * add comment explaining placement of clean job state logic * change connection_workflow failure origin value to platform * remove cast from new query * create static var for non terminal job statuses * change failure origin value to airbyte_platform * tweak external message wording * remove unused variable * reword external message * fix merge conflict * remove log lines * move cleaning job state to beginning of workflow * do not clean job state if there is already a job id for this workflow, and add test * see if sleeping fixes test on CI * add repeated test annotation to protect from flakiness * fail jobs before creating new ones to protect from quarantined state * update external message for cleaning job state error
What
Resolves #10931
Resolves #11216
Resolves #11213
Resolves #12160
The main goal of this PR is to fix our interactions with connection manager workflow in the TemporalClient. After some investigation, I found that our TemporalClient was not correctly handling the various states that a workflow can be in.
Namely, if a connection and its workflow have been deleted, the workflow and its state are still retrievable using the WorkflowClient, but signal methods cannot be executed on the workflow because it is in state "Completed" (this was the root cause of this issue). Separately, if a workflow is terminated, then its state cannot be retrieved at all.
This PR makes both of these cases more explicit through new exceptions, and updates the various manual operation methods to handle these exceptions and attempt to automatically repair workflows that are in bad states.
How
By refactoring the TemporalClient and using the new
repairAndRetrieveConnectionManagerWorkflow()
method, this PR makes each manual operation correctly handle the deleted case, and automatically repair the workflow in the "unexpected state" case.The plan is to follow this up with another PR to address the start from a clean state ticket, so that when these automatic workflow repairs are performed, they do not cause any weird issues with job state.
I plan to wait to merge this PR until that second PR has been reviewed and merged into this one, at which point they will be merged to master together, because we should really add both at the same time to be safe.
This PR also renames some classes and methods to be more consistent, and adds more TemporalClient tests.
Recommended reading order