From a9f3a89d912211481f9101961090ad01e19adb5f Mon Sep 17 00:00:00 2001 From: Will Duan Date: Wed, 30 Oct 2024 10:36:55 -0700 Subject: [PATCH] Fix bug on state based replication code path (#6727) ## What changed? 1. Fix verify replication task sync state error 2. Fix context merge new run task logic ## Why? ## How did you test it? unit test and test on test clusters ## Potential risks no risk, feature not enabled ## Documentation n/a ## Is hotfix candidate? no --- .../history/ndc/workflow_state_replicator.go | 5 +++-- ...cutable_verify_versioned_transition_task.go | 7 +++++-- ...le_verify_versioned_transition_task_test.go | 10 ++++++---- service/history/workflow/context.go | 18 +++++++++++++++--- 4 files changed, 29 insertions(+), 11 deletions(-) diff --git a/service/history/ndc/workflow_state_replicator.go b/service/history/ndc/workflow_state_replicator.go index 72e736209f9..17b2cc83817 100644 --- a/service/history/ndc/workflow_state_replicator.go +++ b/service/history/ndc/workflow_state_replicator.go @@ -429,8 +429,8 @@ func (r *WorkflowStateReplicatorImpl) applySnapshot( versionedTransition *repication.VersionedTransitionArtifact, sourceClusterName string, ) error { - snapshot := versionedTransition.GetSyncWorkflowStateSnapshotAttributes().State - if snapshot == nil { + attribute := versionedTransition.GetSyncWorkflowStateSnapshotAttributes() + if attribute == nil || attribute.State == nil { var versionHistories *history.VersionHistories if localMutableState != nil { versionHistories = localMutableState.GetExecutionInfo().VersionHistories @@ -444,6 +444,7 @@ func (r *WorkflowStateReplicatorImpl) applySnapshot( versionHistories, ) } + snapshot := attribute.State if localMutableState == nil { return r.applySnapshotWhenWorkflowNotExist(ctx, namespaceID, workflowID, runID, wfCtx, releaseFn, snapshot, sourceClusterName, versionedTransition.NewRunInfo, true) } diff --git a/service/history/replication/executable_verify_versioned_transition_task.go b/service/history/replication/executable_verify_versioned_transition_task.go index b737fe47831..848567d5bae 100644 --- a/service/history/replication/executable_verify_versioned_transition_task.go +++ b/service/history/replication/executable_verify_versioned_transition_task.go @@ -128,7 +128,7 @@ func (e *ExecutableVerifyVersionedTransitionTask) Execute() error { e.NamespaceID, e.WorkflowID, e.RunID, - e.ReplicationTask().VersionedTransition, + nil, nil, ) default: @@ -137,6 +137,9 @@ func (e *ExecutableVerifyVersionedTransitionTask) Execute() error { } transitionHistory := ms.GetExecutionInfo().TransitionHistory + if len(transitionHistory) == 0 { + return nil + } err = workflow.TransitionHistoryStalenessCheck(transitionHistory, e.ReplicationTask().VersionedTransition) // case 1: VersionedTransition is up-to-date on current mutable state @@ -155,7 +158,7 @@ func (e *ExecutableVerifyVersionedTransitionTask) Execute() error { e.NamespaceID, e.WorkflowID, e.RunID, - e.ReplicationTask().VersionedTransition, + transitionHistory[len(transitionHistory)-1], ms.GetExecutionInfo().VersionHistories, ) } diff --git a/service/history/replication/executable_verify_versioned_transition_task_test.go b/service/history/replication/executable_verify_versioned_transition_task_test.go index cbaec6dc2e7..2913481a8a9 100644 --- a/service/history/replication/executable_verify_versioned_transition_task_test.go +++ b/service/history/replication/executable_verify_versioned_transition_task_test.go @@ -332,11 +332,12 @@ func (s *executableVerifyVersionedTransitionTaskSuite) TestExecute_CurrentBranch mu := workflow.NewMockMutableState(s.controller) mu.EXPECT().GetNextEventID().Return(taskNextEvent).AnyTimes() + transitionHistory := []*persistencepb.VersionedTransition{ + {NamespaceFailoverVersion: 1, TransitionCount: 3}, + {NamespaceFailoverVersion: 3, TransitionCount: 6}, + } mu.EXPECT().GetExecutionInfo().Return(&persistencepb.WorkflowExecutionInfo{ - TransitionHistory: []*persistencepb.VersionedTransition{ - {NamespaceFailoverVersion: 1, TransitionCount: 3}, - {NamespaceFailoverVersion: 3, TransitionCount: 6}, - }, + TransitionHistory: transitionHistory, }).AnyTimes() s.mockGetMutableState(s.namespaceID, s.workflowID, s.runID, mu, nil) @@ -353,6 +354,7 @@ func (s *executableVerifyVersionedTransitionTaskSuite) TestExecute_CurrentBranch err := task.Execute() s.IsType(&serviceerrors.SyncState{}, err) + s.Equal(transitionHistory[1], err.(*serviceerrors.SyncState).VersionedTransition) } func (s *executableVerifyVersionedTransitionTaskSuite) TestExecute_NonCurrentBranch_VerifySuccess() { diff --git a/service/history/workflow/context.go b/service/history/workflow/context.go index 9f8ef775ed9..1150c56a679 100644 --- a/service/history/workflow/context.go +++ b/service/history/workflow/context.go @@ -741,11 +741,23 @@ func (c *ContextImpl) mergeUpdateWithNewReplicationTasks( // so that they can be applied transactionally in the standby cluster. // TODO: this logic should be more generic so that the first replication task // in the new run doesn't have to be HistoryReplicationTask - newRunTask := newWorkflowSnapshot.Tasks[tasks.CategoryReplication][0].(*tasks.HistoryReplicationTask) + var newRunBranchToken []byte + var newRunID string + newRunTask := newWorkflowSnapshot.Tasks[tasks.CategoryReplication][0] delete(newWorkflowSnapshot.Tasks, tasks.CategoryReplication) - newRunBranchToken := newRunTask.BranchToken - newRunID := newRunTask.RunID + switch task := newRunTask.(type) { + case *tasks.HistoryReplicationTask: + // Handle HistoryReplicationTask specifically + newRunBranchToken = task.BranchToken + newRunID = task.RunID + case *tasks.SyncVersionedTransitionTask: + // Handle SyncVersionedTransitionTask specifically + newRunID = task.RunID + default: + // Handle unexpected types or log an error if this case is not expected + return serviceerror.NewInternal(fmt.Sprintf("unexpected replication task type for new run task %T", newRunTask)) + } taskUpdated := false updateTask := func(task interface{}) bool {