diff --git a/service/history/ndc/workflow_state_replicator.go b/service/history/ndc/workflow_state_replicator.go index 72e736209f9..17b2cc83817 100644 --- a/service/history/ndc/workflow_state_replicator.go +++ b/service/history/ndc/workflow_state_replicator.go @@ -429,8 +429,8 @@ func (r *WorkflowStateReplicatorImpl) applySnapshot( versionedTransition *repication.VersionedTransitionArtifact, sourceClusterName string, ) error { - snapshot := versionedTransition.GetSyncWorkflowStateSnapshotAttributes().State - if snapshot == nil { + attribute := versionedTransition.GetSyncWorkflowStateSnapshotAttributes() + if attribute == nil || attribute.State == nil { var versionHistories *history.VersionHistories if localMutableState != nil { versionHistories = localMutableState.GetExecutionInfo().VersionHistories @@ -444,6 +444,7 @@ func (r *WorkflowStateReplicatorImpl) applySnapshot( versionHistories, ) } + snapshot := attribute.State if localMutableState == nil { return r.applySnapshotWhenWorkflowNotExist(ctx, namespaceID, workflowID, runID, wfCtx, releaseFn, snapshot, sourceClusterName, versionedTransition.NewRunInfo, true) } diff --git a/service/history/replication/executable_verify_versioned_transition_task.go b/service/history/replication/executable_verify_versioned_transition_task.go index b737fe47831..848567d5bae 100644 --- a/service/history/replication/executable_verify_versioned_transition_task.go +++ b/service/history/replication/executable_verify_versioned_transition_task.go @@ -128,7 +128,7 @@ func (e *ExecutableVerifyVersionedTransitionTask) Execute() error { e.NamespaceID, e.WorkflowID, e.RunID, - e.ReplicationTask().VersionedTransition, + nil, nil, ) default: @@ -137,6 +137,9 @@ func (e *ExecutableVerifyVersionedTransitionTask) Execute() error { } transitionHistory := ms.GetExecutionInfo().TransitionHistory + if len(transitionHistory) == 0 { + return nil + } err = workflow.TransitionHistoryStalenessCheck(transitionHistory, e.ReplicationTask().VersionedTransition) // case 1: VersionedTransition is up-to-date on current mutable state @@ -155,7 +158,7 @@ func (e *ExecutableVerifyVersionedTransitionTask) Execute() error { e.NamespaceID, e.WorkflowID, e.RunID, - e.ReplicationTask().VersionedTransition, + transitionHistory[len(transitionHistory)-1], ms.GetExecutionInfo().VersionHistories, ) } diff --git a/service/history/replication/executable_verify_versioned_transition_task_test.go b/service/history/replication/executable_verify_versioned_transition_task_test.go index cbaec6dc2e7..2913481a8a9 100644 --- a/service/history/replication/executable_verify_versioned_transition_task_test.go +++ b/service/history/replication/executable_verify_versioned_transition_task_test.go @@ -332,11 +332,12 @@ func (s *executableVerifyVersionedTransitionTaskSuite) TestExecute_CurrentBranch mu := workflow.NewMockMutableState(s.controller) mu.EXPECT().GetNextEventID().Return(taskNextEvent).AnyTimes() + transitionHistory := []*persistencepb.VersionedTransition{ + {NamespaceFailoverVersion: 1, TransitionCount: 3}, + {NamespaceFailoverVersion: 3, TransitionCount: 6}, + } mu.EXPECT().GetExecutionInfo().Return(&persistencepb.WorkflowExecutionInfo{ - TransitionHistory: []*persistencepb.VersionedTransition{ - {NamespaceFailoverVersion: 1, TransitionCount: 3}, - {NamespaceFailoverVersion: 3, TransitionCount: 6}, - }, + TransitionHistory: transitionHistory, }).AnyTimes() s.mockGetMutableState(s.namespaceID, s.workflowID, s.runID, mu, nil) @@ -353,6 +354,7 @@ func (s *executableVerifyVersionedTransitionTaskSuite) TestExecute_CurrentBranch err := task.Execute() s.IsType(&serviceerrors.SyncState{}, err) + s.Equal(transitionHistory[1], err.(*serviceerrors.SyncState).VersionedTransition) } func (s *executableVerifyVersionedTransitionTaskSuite) TestExecute_NonCurrentBranch_VerifySuccess() { diff --git a/service/history/workflow/context.go b/service/history/workflow/context.go index 9f8ef775ed9..1150c56a679 100644 --- a/service/history/workflow/context.go +++ b/service/history/workflow/context.go @@ -741,11 +741,23 @@ func (c *ContextImpl) mergeUpdateWithNewReplicationTasks( // so that they can be applied transactionally in the standby cluster. // TODO: this logic should be more generic so that the first replication task // in the new run doesn't have to be HistoryReplicationTask - newRunTask := newWorkflowSnapshot.Tasks[tasks.CategoryReplication][0].(*tasks.HistoryReplicationTask) + var newRunBranchToken []byte + var newRunID string + newRunTask := newWorkflowSnapshot.Tasks[tasks.CategoryReplication][0] delete(newWorkflowSnapshot.Tasks, tasks.CategoryReplication) - newRunBranchToken := newRunTask.BranchToken - newRunID := newRunTask.RunID + switch task := newRunTask.(type) { + case *tasks.HistoryReplicationTask: + // Handle HistoryReplicationTask specifically + newRunBranchToken = task.BranchToken + newRunID = task.RunID + case *tasks.SyncVersionedTransitionTask: + // Handle SyncVersionedTransitionTask specifically + newRunID = task.RunID + default: + // Handle unexpected types or log an error if this case is not expected + return serviceerror.NewInternal(fmt.Sprintf("unexpected replication task type for new run task %T", newRunTask)) + } taskUpdated := false updateTask := func(task interface{}) bool {