Skip to content

Commit

Permalink
Order delete workflow execution steps and document the order (#2466)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin authored Feb 5, 2022
1 parent e642020 commit 6e7489a
Showing 1 changed file with 42 additions and 24 deletions.
66 changes: 42 additions & 24 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,11 +787,26 @@ func (s *ContextImpl) DeleteWorkflowExecution(
newTaskVersion int64,
closeTime *time.Time,
) error {
// DeleteWorkflowExecution is a 4-steps process (order is very important and should not be changed):
// 1. Add visibility delete task, i.e. schedule visibility record delete,
// 2. Delete current workflow execution pointer,
// 3. Delete workflow mutable state,
// 4. Delete history branch.

// This function is called from task processor and should not be called directly.
// It may fail at any step and task processor will retry. All steps are idempotent.

// If process fails after step 1 then workflow execution becomes invisible but mutable state is still there and task can be safely retried.
// Step 2 doesn't affect mutable state neither and doesn't block retry.
// After step 3 task can't be retried because mutable state is gone and this might leave history branch in DB.
// The history branch won't be accessible (because mutable state is deleted) and special garbage collection workflow will delete it eventually.
// Step 4 shouldn't be done earlier because if this func fails after it, workflow execution will be accessible but won't have history (inconsistent state).

if err := s.errorByState(); err != nil {
return err
}

// do not try to get namespace cache within shard lock
// Do not get namespace cache within shard lock.
namespaceEntry, err := s.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(key.NamespaceID))
if err != nil {
return err
Expand All @@ -800,6 +815,30 @@ func (s *ContextImpl) DeleteWorkflowExecution(
s.wLock()
defer s.wUnlock()

// Step 1. Delete visibility.
addTasksRequest := &persistence.AddTasksRequest{
ShardID: s.shardID,
NamespaceID: key.NamespaceID,
WorkflowID: key.WorkflowID,
RunID: key.RunID,

TransferTasks: nil,
TimerTasks: nil,
ReplicationTasks: nil,
VisibilityTasks: []tasks.Task{&tasks.DeleteExecutionVisibilityTask{
// TaskID is set by addTasksLocked
WorkflowKey: key,
VisibilityTimestamp: s.timeSource.Now(),
Version: newTaskVersion,
CloseTime: closeTime,
}},
}
err = s.addTasksLocked(addTasksRequest, namespaceEntry)
if err != nil {
return err
}

// Step 2. Delete current workflow execution pointer.
delCurRequest := &persistence.DeleteCurrentWorkflowExecutionRequest{
ShardID: s.shardID,
NamespaceID: key.NamespaceID,
Expand All @@ -814,6 +853,7 @@ func (s *ContextImpl) DeleteWorkflowExecution(
return err
}

// Step 3. Delete workflow mutable state.
delRequest := &persistence.DeleteWorkflowExecutionRequest{
ShardID: s.shardID,
NamespaceID: key.NamespaceID,
Expand All @@ -828,6 +868,7 @@ func (s *ContextImpl) DeleteWorkflowExecution(
return err
}

// Step 4. Delete history branch.
if branchToken != nil {
delHistoryRequest := &persistence.DeleteHistoryBranchRequest{
BranchToken: branchToken,
Expand All @@ -842,29 +883,6 @@ func (s *ContextImpl) DeleteWorkflowExecution(
}
}

// Delete visibility
addTasksRequest := &persistence.AddTasksRequest{
ShardID: s.shardID,
NamespaceID: key.NamespaceID,
WorkflowID: key.WorkflowID,
RunID: key.RunID,

TransferTasks: nil,
TimerTasks: nil,
ReplicationTasks: nil,
VisibilityTasks: []tasks.Task{&tasks.DeleteExecutionVisibilityTask{
// TaskID is set by addTasksLocked
WorkflowKey: key,
VisibilityTimestamp: s.timeSource.Now(),
Version: newTaskVersion,
CloseTime: closeTime,
}},
}
err = s.addTasksLocked(addTasksRequest, namespaceEntry)
if err != nil {
return err
}

return nil
}

Expand Down

0 comments on commit 6e7489a

Please sign in to comment.