Skip to content

Commit

Permalink
Fix tombstone tracking for state based replication (#6863)
Browse files Browse the repository at this point in the history
## What changed?
<!-- Describe what has changed in this PR -->
Fix tombstone tracking for state based replication
## Why?
<!-- Tell your future self why have you made these changes -->
bug fix
## How did you test it?
<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
uni test
## Potential risks
<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
no risk
## Documentation
<!-- Have you made sure this change doesn't falsify anything currently
stated in `docs/`? If significant
new behavior is added, have you described that in `docs/`? -->
n/a
## Is hotfix candidate?
<!-- Is this PR a hotfix candidate or does it require a notification to
be sent to the broader community? (Yes/No) -->
no
  • Loading branch information
xwduan authored Nov 21, 2024
1 parent 6b23acf commit 6934dcd
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 46 deletions.
2 changes: 1 addition & 1 deletion service/history/replication/sync_state_retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (s *SyncStateRetrieverImpl) getSyncStateResult(
}
tombstoneBatch := mutableState.GetExecutionInfo().SubStateMachineTombstoneBatches
if len(tombstoneBatch) == 0 {
return true
return false
}
if workflow.CompareVersionedTransition(tombstoneBatch[0].VersionedTransition, targetCurrentVersionedTransition) <= 0 {
return true
Expand Down
125 changes: 81 additions & 44 deletions service/history/replication/sync_state_retriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,55 +206,92 @@ func (s *syncWorkflowStateSuite) TestSyncWorkflowState_ReturnMutation() {
}

func (s *syncWorkflowStateSuite) TestSyncWorkflowState_ReturnSnapshot() {
mu := workflow.NewMockMutableState(s.controller)
s.workflowConsistencyChecker.EXPECT().GetWorkflowLeaseWithConsistencyCheck(gomock.Any(), nil, gomock.Any(), definition.WorkflowKey{
NamespaceID: s.namespaceID,
WorkflowID: s.execution.WorkflowId,
RunID: s.execution.RunId,
}, locks.PriorityLow).Return(
api.NewWorkflowLease(nil, func(err error) {}, mu), nil)
versionHistories := &history.VersionHistories{
CurrentVersionHistoryIndex: 0,
Histories: []*history.VersionHistory{
{
BranchToken: []byte("branchToken1"),
Items: []*history.VersionHistoryItem{
{EventId: 1, Version: 10},
{EventId: 2, Version: 13},
},
testCases := []struct {
name string
infoFn func() (*history.VersionHistories, []*persistencespb.VersionedTransition, []*persistencespb.StateMachineTombstoneBatch)
}{
{
name: "tombstone batch is empty",
infoFn: func() (*history.VersionHistories, []*persistencespb.VersionedTransition, []*persistencespb.StateMachineTombstoneBatch) {
versionHistories := &history.VersionHistories{
CurrentVersionHistoryIndex: 0,
Histories: []*history.VersionHistory{
{
BranchToken: []byte("branchToken1"),
Items: []*history.VersionHistoryItem{
{EventId: 1, Version: 10},
{EventId: 2, Version: 13},
},
},
},
}
return versionHistories, []*persistencespb.VersionedTransition{
{NamespaceFailoverVersion: 1, TransitionCount: 12},
{NamespaceFailoverVersion: 2, TransitionCount: 15},
}, nil
},
},
}
executionInfo := &persistencespb.WorkflowExecutionInfo{
TransitionHistory: []*persistencespb.VersionedTransition{
{NamespaceFailoverVersion: 1, TransitionCount: 12},
{NamespaceFailoverVersion: 2, TransitionCount: 15},
},
SubStateMachineTombstoneBatches: []*persistencespb.StateMachineTombstoneBatch{
{
VersionedTransition: &persistencespb.VersionedTransition{NamespaceFailoverVersion: 1, TransitionCount: 12},
{
name: "tombstone batch is not empty",
infoFn: func() (*history.VersionHistories, []*persistencespb.VersionedTransition, []*persistencespb.StateMachineTombstoneBatch) {
versionHistories := &history.VersionHistories{
CurrentVersionHistoryIndex: 0,
Histories: []*history.VersionHistory{
{
BranchToken: []byte("branchToken1"),
Items: []*history.VersionHistoryItem{
{EventId: 1, Version: 10},
{EventId: 2, Version: 13},
},
},
},
}
return versionHistories, []*persistencespb.VersionedTransition{
{NamespaceFailoverVersion: 1, TransitionCount: 12},
{NamespaceFailoverVersion: 2, TransitionCount: 15},
}, []*persistencespb.StateMachineTombstoneBatch{
{
VersionedTransition: &persistencespb.VersionedTransition{NamespaceFailoverVersion: 1, TransitionCount: 12},
},
}
},
},
VersionHistories: versionHistories,
}
mu.EXPECT().GetExecutionInfo().Return(executionInfo).AnyTimes()
mu.EXPECT().CloneToProto().Return(&persistencespb.WorkflowMutableState{
ExecutionInfo: executionInfo,
})
result, err := s.syncStateRetriever.GetSyncWorkflowStateArtifact(
context.Background(),
s.namespaceID,
s.execution,
&persistencespb.VersionedTransition{
NamespaceFailoverVersion: 1,
TransitionCount: 13,
},
versionHistories)
s.NoError(err)
s.NotNil(result)
s.NotNil(result.VersionedTransitionArtifact.GetSyncWorkflowStateSnapshotAttributes())
s.Nil(result.VersionedTransitionArtifact.EventBatches)
s.Nil(result.VersionedTransitionArtifact.NewRunInfo)
for _, tc := range testCases {
s.T().Run(tc.name, func(t *testing.T) {
mu := workflow.NewMockMutableState(s.controller)
s.workflowConsistencyChecker.EXPECT().GetWorkflowLeaseWithConsistencyCheck(gomock.Any(), nil, gomock.Any(), definition.WorkflowKey{
NamespaceID: s.namespaceID,
WorkflowID: s.execution.WorkflowId,
RunID: s.execution.RunId,
}, locks.PriorityLow).Return(
api.NewWorkflowLease(nil, func(err error) {}, mu), nil)
versionHistories, transitions, tombstoneBatches := tc.infoFn()
executionInfo := &persistencespb.WorkflowExecutionInfo{
TransitionHistory: transitions,
SubStateMachineTombstoneBatches: tombstoneBatches,
VersionHistories: versionHistories,
}
mu.EXPECT().GetExecutionInfo().Return(executionInfo).AnyTimes()
mu.EXPECT().CloneToProto().Return(&persistencespb.WorkflowMutableState{
ExecutionInfo: executionInfo,
})
result, err := s.syncStateRetriever.GetSyncWorkflowStateArtifact(
context.Background(),
s.namespaceID,
s.execution,
&persistencespb.VersionedTransition{
NamespaceFailoverVersion: 1,
TransitionCount: 13,
},
versionHistories)
s.NoError(err)
s.NotNil(result)
s.NotNil(result.VersionedTransitionArtifact.GetSyncWorkflowStateSnapshotAttributes())
s.Nil(result.VersionedTransitionArtifact.EventBatches)
s.Nil(result.VersionedTransitionArtifact.NewRunInfo)
})
}
}

func (s *syncWorkflowStateSuite) TestSyncWorkflowState_NoVersionTransitionProvided_ReturnSnapshot() {
Expand Down
6 changes: 5 additions & 1 deletion service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -5586,6 +5586,7 @@ func (ms *MutableStateImpl) closeTransactionHandleUnknownVersionedTransition() {
// We are in unknown versioned transition state, clear the transition history.
ms.executionInfo.TransitionHistory = nil
ms.executionInfo.SubStateMachineTombstoneBatches = nil
ms.totalTombstones = 0

for _, activityInfo := range ms.updateActivityInfos {
activityInfo.LastUpdateVersionedTransition = nil
Expand Down Expand Up @@ -5693,7 +5694,10 @@ func (ms *MutableStateImpl) closeTransactionTrackTombstones(
VersionedTransition: ms.executionInfo.TransitionHistory[len(ms.executionInfo.TransitionHistory)-1],
StateMachineTombstones: tombstones,
}
ms.executionInfo.SubStateMachineTombstoneBatches = append(ms.executionInfo.SubStateMachineTombstoneBatches, tombstoneBatch)
// As an optimization, we only track the first empty tombstone batch. So we can know the start point of the tombstone batch
if len(tombstones) > 0 || len(ms.executionInfo.SubStateMachineTombstoneBatches) == 0 {
ms.executionInfo.SubStateMachineTombstoneBatches = append(ms.executionInfo.SubStateMachineTombstoneBatches, tombstoneBatch)
}

ms.totalTombstones += len(tombstones)
ms.capTombstoneCount()
Expand Down
82 changes: 82 additions & 0 deletions service/history/workflow/mutable_state_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3331,6 +3331,88 @@ func tombstoneExists(
return false
}

func (s *mutableStateSuite) TestCloseTransactionTrackTombstones_CapIfLargerThanLimit() {
dbState := s.buildWorkflowMutableState()

mutableState, err := NewMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, s.namespaceEntry, dbState, 123)
s.NoError(err)
mutableState.executionInfo.SubStateMachineTombstoneBatches = []*persistencespb.StateMachineTombstoneBatch{
{
VersionedTransition: &persistencespb.VersionedTransition{
NamespaceFailoverVersion: s.namespaceEntry.FailoverVersion(),
TransitionCount: 1,
},
},
}

transitionHistory := mutableState.GetExecutionInfo().TransitionHistory
currentVersionedTransition := transitionHistory[len(transitionHistory)-1]
newVersionedTranstion := common.CloneProto(currentVersionedTransition)
newVersionedTranstion.TransitionCount += 1
signalMap := mutableState.GetPendingSignalExternalInfos()
for i := 0; i < s.mockConfig.MutableStateTombstoneCountLimit(); i++ {
signalMap[int64(76+i)] = &persistencespb.SignalInfo{

Version: s.namespaceEntry.FailoverVersion(),
InitiatedEventId: int64(76 + i),
InitiatedEventBatchId: 17,
RequestId: uuid.New(),
}
}

_, err = mutableState.StartTransaction(s.namespaceEntry)
s.NoError(err)
var initiatedEventId int64
for initiatedEventId = range signalMap {
_, err := mutableState.AddSignalExternalWorkflowExecutionFailedEvent(
initiatedEventId,
s.namespaceEntry.Name(),
s.namespaceEntry.ID(),
uuid.New(),
uuid.New(),
"",
enumspb.SIGNAL_EXTERNAL_WORKFLOW_EXECUTION_FAILED_CAUSE_EXTERNAL_WORKFLOW_EXECUTION_NOT_FOUND,
)
s.NoError(err)
}

_, _, err = mutableState.CloseTransactionAsMutation(TransactionPolicyActive)
s.NoError(err)

tombstoneBatches := mutableState.GetExecutionInfo().SubStateMachineTombstoneBatches
s.Len(tombstoneBatches, 0)
}

func (s *mutableStateSuite) TestCloseTransactionTrackTombstones_OnlyTrackFirstEmpty() {
dbState := s.buildWorkflowMutableState()

mutableState, err := NewMutableStateFromDB(s.mockShard, s.mockEventsCache, s.logger, s.namespaceEntry, dbState, 123)
s.NoError(err)
mutableState.executionInfo.SubStateMachineTombstoneBatches = []*persistencespb.StateMachineTombstoneBatch{
{
VersionedTransition: &persistencespb.VersionedTransition{
NamespaceFailoverVersion: s.namespaceEntry.FailoverVersion(),
TransitionCount: 1,
},
},
}

transitionHistory := mutableState.GetExecutionInfo().TransitionHistory
currentVersionedTransition := transitionHistory[len(transitionHistory)-1]
newVersionedTranstion := common.CloneProto(currentVersionedTransition)
newVersionedTranstion.TransitionCount += 1

_, err = mutableState.StartTransaction(s.namespaceEntry)
s.NoError(err)

_, _, err = mutableState.CloseTransactionAsMutation(TransactionPolicyActive)
s.NoError(err)

tombstoneBatches := mutableState.GetExecutionInfo().SubStateMachineTombstoneBatches
s.Len(tombstoneBatches, 1)
s.Equal(int64(1), tombstoneBatches[0].VersionedTransition.TransitionCount)
}

func (s *mutableStateSuite) TestExecutionInfoClone() {
newInstance := reflect.New(reflect.TypeOf(s.mutableState.executionInfo).Elem()).Interface()
clone, ok := newInstance.(*persistencespb.WorkflowExecutionInfo)
Expand Down

0 comments on commit 6934dcd

Please sign in to comment.