Skip to content

Commit

Permalink
Fix bug on state based replication code path (#6727)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->
1. Fix verify replication task sync state error
2. Fix context merge new run task logic
## Why?
<!-- Tell your future self why have you made these changes -->

## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
unit test and test on test clusters
## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
no risk, feature not enabled
## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->
n/a
## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
no
  • Loading branch information
xwduan authored Oct 30, 2024
1 parent 571b0ba commit a9f3a89
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 11 deletions.
5 changes: 3 additions & 2 deletions service/history/ndc/workflow_state_replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,8 +429,8 @@ func (r *WorkflowStateReplicatorImpl) applySnapshot(
versionedTransition *repication.VersionedTransitionArtifact,
sourceClusterName string,
) error {
snapshot := versionedTransition.GetSyncWorkflowStateSnapshotAttributes().State
if snapshot == nil {
attribute := versionedTransition.GetSyncWorkflowStateSnapshotAttributes()
if attribute == nil || attribute.State == nil {
var versionHistories *history.VersionHistories
if localMutableState != nil {
versionHistories = localMutableState.GetExecutionInfo().VersionHistories
Expand All @@ -444,6 +444,7 @@ func (r *WorkflowStateReplicatorImpl) applySnapshot(
versionHistories,
)
}
snapshot := attribute.State
if localMutableState == nil {
return r.applySnapshotWhenWorkflowNotExist(ctx, namespaceID, workflowID, runID, wfCtx, releaseFn, snapshot, sourceClusterName, versionedTransition.NewRunInfo, true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (e *ExecutableVerifyVersionedTransitionTask) Execute() error {
e.NamespaceID,
e.WorkflowID,
e.RunID,
e.ReplicationTask().VersionedTransition,
nil,
nil,
)
default:
Expand All @@ -137,6 +137,9 @@ func (e *ExecutableVerifyVersionedTransitionTask) Execute() error {
}

transitionHistory := ms.GetExecutionInfo().TransitionHistory
if len(transitionHistory) == 0 {
return nil
}
err = workflow.TransitionHistoryStalenessCheck(transitionHistory, e.ReplicationTask().VersionedTransition)

// case 1: VersionedTransition is up-to-date on current mutable state
Expand All @@ -155,7 +158,7 @@ func (e *ExecutableVerifyVersionedTransitionTask) Execute() error {
e.NamespaceID,
e.WorkflowID,
e.RunID,
e.ReplicationTask().VersionedTransition,
transitionHistory[len(transitionHistory)-1],
ms.GetExecutionInfo().VersionHistories,
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,11 +332,12 @@ func (s *executableVerifyVersionedTransitionTaskSuite) TestExecute_CurrentBranch

mu := workflow.NewMockMutableState(s.controller)
mu.EXPECT().GetNextEventID().Return(taskNextEvent).AnyTimes()
transitionHistory := []*persistencepb.VersionedTransition{
{NamespaceFailoverVersion: 1, TransitionCount: 3},
{NamespaceFailoverVersion: 3, TransitionCount: 6},
}
mu.EXPECT().GetExecutionInfo().Return(&persistencepb.WorkflowExecutionInfo{
TransitionHistory: []*persistencepb.VersionedTransition{
{NamespaceFailoverVersion: 1, TransitionCount: 3},
{NamespaceFailoverVersion: 3, TransitionCount: 6},
},
TransitionHistory: transitionHistory,
}).AnyTimes()

s.mockGetMutableState(s.namespaceID, s.workflowID, s.runID, mu, nil)
Expand All @@ -353,6 +354,7 @@ func (s *executableVerifyVersionedTransitionTaskSuite) TestExecute_CurrentBranch

err := task.Execute()
s.IsType(&serviceerrors.SyncState{}, err)
s.Equal(transitionHistory[1], err.(*serviceerrors.SyncState).VersionedTransition)
}

func (s *executableVerifyVersionedTransitionTaskSuite) TestExecute_NonCurrentBranch_VerifySuccess() {
Expand Down
18 changes: 15 additions & 3 deletions service/history/workflow/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,11 +741,23 @@ func (c *ContextImpl) mergeUpdateWithNewReplicationTasks(
// so that they can be applied transactionally in the standby cluster.
// TODO: this logic should be more generic so that the first replication task
// in the new run doesn't have to be HistoryReplicationTask
newRunTask := newWorkflowSnapshot.Tasks[tasks.CategoryReplication][0].(*tasks.HistoryReplicationTask)
var newRunBranchToken []byte
var newRunID string
newRunTask := newWorkflowSnapshot.Tasks[tasks.CategoryReplication][0]
delete(newWorkflowSnapshot.Tasks, tasks.CategoryReplication)

newRunBranchToken := newRunTask.BranchToken
newRunID := newRunTask.RunID
switch task := newRunTask.(type) {
case *tasks.HistoryReplicationTask:
// Handle HistoryReplicationTask specifically
newRunBranchToken = task.BranchToken
newRunID = task.RunID
case *tasks.SyncVersionedTransitionTask:
// Handle SyncVersionedTransitionTask specifically
newRunID = task.RunID
default:
// Handle unexpected types or log an error if this case is not expected
return serviceerror.NewInternal(fmt.Sprintf("unexpected replication task type for new run task %T", newRunTask))
}
taskUpdated := false

updateTask := func(task interface{}) bool {
Expand Down

0 comments on commit a9f3a89

Please sign in to comment.