diff --git a/service/history/shard/context_impl.go b/service/history/shard/context_impl.go index 157e264388a..1a7a76fe23e 100644 --- a/service/history/shard/context_impl.go +++ b/service/history/shard/context_impl.go @@ -787,11 +787,26 @@ func (s *ContextImpl) DeleteWorkflowExecution( newTaskVersion int64, closeTime *time.Time, ) error { + // DeleteWorkflowExecution is a 4-steps process (order is very important and should not be changed): + // 1. Add visibility delete task, i.e. schedule visibility record delete, + // 2. Delete current workflow execution pointer, + // 3. Delete workflow mutable state, + // 4. Delete history branch. + + // This function is called from task processor and should not be called directly. + // It may fail at any step and task processor will retry. All steps are idempotent. + + // If process fails after step 1 then workflow execution becomes invisible but mutable state is still there and task can be safely retried. + // Step 2 doesn't affect mutable state neither and doesn't block retry. + // After step 3 task can't be retried because mutable state is gone and this might leave history branch in DB. + // The history branch won't be accessible (because mutable state is deleted) and special garbage collection workflow will delete it eventually. + // Step 4 shouldn't be done earlier because if this func fails after it, workflow execution will be accessible but won't have history (inconsistent state). + if err := s.errorByState(); err != nil { return err } - // do not try to get namespace cache within shard lock + // Do not get namespace cache within shard lock. namespaceEntry, err := s.GetNamespaceRegistry().GetNamespaceByID(namespace.ID(key.NamespaceID)) if err != nil { return err @@ -800,6 +815,30 @@ func (s *ContextImpl) DeleteWorkflowExecution( s.wLock() defer s.wUnlock() + // Step 1. Delete visibility. + addTasksRequest := &persistence.AddTasksRequest{ + ShardID: s.shardID, + NamespaceID: key.NamespaceID, + WorkflowID: key.WorkflowID, + RunID: key.RunID, + + TransferTasks: nil, + TimerTasks: nil, + ReplicationTasks: nil, + VisibilityTasks: []tasks.Task{&tasks.DeleteExecutionVisibilityTask{ + // TaskID is set by addTasksLocked + WorkflowKey: key, + VisibilityTimestamp: s.timeSource.Now(), + Version: newTaskVersion, + CloseTime: closeTime, + }}, + } + err = s.addTasksLocked(addTasksRequest, namespaceEntry) + if err != nil { + return err + } + + // Step 2. Delete current workflow execution pointer. delCurRequest := &persistence.DeleteCurrentWorkflowExecutionRequest{ ShardID: s.shardID, NamespaceID: key.NamespaceID, @@ -814,6 +853,7 @@ func (s *ContextImpl) DeleteWorkflowExecution( return err } + // Step 3. Delete workflow mutable state. delRequest := &persistence.DeleteWorkflowExecutionRequest{ ShardID: s.shardID, NamespaceID: key.NamespaceID, @@ -828,6 +868,7 @@ func (s *ContextImpl) DeleteWorkflowExecution( return err } + // Step 4. Delete history branch. if branchToken != nil { delHistoryRequest := &persistence.DeleteHistoryBranchRequest{ BranchToken: branchToken, @@ -842,29 +883,6 @@ func (s *ContextImpl) DeleteWorkflowExecution( } } - // Delete visibility - addTasksRequest := &persistence.AddTasksRequest{ - ShardID: s.shardID, - NamespaceID: key.NamespaceID, - WorkflowID: key.WorkflowID, - RunID: key.RunID, - - TransferTasks: nil, - TimerTasks: nil, - ReplicationTasks: nil, - VisibilityTasks: []tasks.Task{&tasks.DeleteExecutionVisibilityTask{ - // TaskID is set by addTasksLocked - WorkflowKey: key, - VisibilityTimestamp: s.timeSource.Now(), - Version: newTaskVersion, - CloseTime: closeTime, - }}, - } - err = s.addTasksLocked(addTasksRequest, namespaceEntry) - if err != nil { - return err - } - return nil }