Skip to content

Commit

Permalink
Fix timer resurrection check (#4499)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt committed Sep 20, 2021
1 parent 8b8d8d8 commit 0398bf6
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 9 deletions.
2 changes: 1 addition & 1 deletion common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ const (
// KeyName: history.resurrectionCheckMinDelay
// Value type: Duration
// Default value: 24*time.Hour
// Allowed filters: N/A
// Allowed filters: DomainName
ResurrectionCheckMinDelay
// QueueProcessorEnableSplit is indicates whether processing queue split policy should be enabled
// KeyName: history.queueProcessorEnableSplit
Expand Down
4 changes: 2 additions & 2 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ type Config struct {
TaskRedispatchIntervalJitterCoefficient dynamicconfig.FloatPropertyFn
StandbyTaskReReplicationContextTimeout dynamicconfig.DurationPropertyFnWithDomainIDFilter
EnableDropStuckTaskByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter
ResurrectionCheckMinDelay dynamicconfig.DurationPropertyFn
ResurrectionCheckMinDelay dynamicconfig.DurationPropertyFnWithDomainFilter

// QueueProcessor settings
QueueProcessorEnableSplit dynamicconfig.BoolPropertyFn
Expand Down Expand Up @@ -382,7 +382,7 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, storeType string, isA
TaskRedispatchIntervalJitterCoefficient: dc.GetFloat64Property(dynamicconfig.TaskRedispatchIntervalJitterCoefficient, 0.15),
StandbyTaskReReplicationContextTimeout: dc.GetDurationPropertyFilteredByDomainID(dynamicconfig.StandbyTaskReReplicationContextTimeout, 3*time.Minute),
EnableDropStuckTaskByDomainID: dc.GetBoolPropertyFilteredByDomainID(dynamicconfig.EnableDropStuckTaskByDomainID, false),
ResurrectionCheckMinDelay: dc.GetDurationProperty(dynamicconfig.ResurrectionCheckMinDelay, 24*time.Hour),
ResurrectionCheckMinDelay: dc.GetDurationPropertyFilteredByDomain(dynamicconfig.ResurrectionCheckMinDelay, 24*time.Hour),

QueueProcessorEnableSplit: dc.GetBoolProperty(dynamicconfig.QueueProcessorEnableSplit, false),
QueueProcessorSplitMaxLevel: dc.GetIntProperty(dynamicconfig.QueueProcessorSplitMaxLevel, 2), // 3 levels, start from 0
Expand Down
18 changes: 15 additions & 3 deletions service/history/task/timer_active_task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (t *timerActiveTaskExecutor) executeUserTimerTimeoutTask(

timerSequence := execution.NewTimerSequence(mutableState)
referenceTime := t.shard.GetTimeSource().Now()
resurrectionCheckMinDelay := t.config.ResurrectionCheckMinDelay()
resurrectionCheckMinDelay := t.config.ResurrectionCheckMinDelay(mutableState.GetDomainEntry().GetInfo().Name)
updateMutableState := false

// initialized when a timer with delay >= resurrectionCheckMinDelay
Expand Down Expand Up @@ -237,7 +237,7 @@ func (t *timerActiveTaskExecutor) executeActivityTimeoutTask(

timerSequence := execution.NewTimerSequence(mutableState)
referenceTime := t.shard.GetTimeSource().Now()
resurrectionCheckMinDelay := t.config.ResurrectionCheckMinDelay()
resurrectionCheckMinDelay := t.config.ResurrectionCheckMinDelay(mutableState.GetDomainEntry().GetInfo().Name)
updateMutableState := false
scheduleDecision := false

Expand Down Expand Up @@ -702,6 +702,12 @@ func (t *timerActiveTaskExecutor) getResurrectedTimer(

// 2. scan history from minTimerStartedID and see if any
// TimerFiredEvent or TimerCancelledEvent matches pending timer
// NOTE: since we can't read from middle of an events batch,
// history returned by persistence layer won't actually start
// from minTimerStartedID, but start from the batch whose nodeID is
// larger than minTimerStartedID.
// This is ok since the event types we are interested in must in batches
// later than the timer started events.
resurrectedTimer := make(map[string]struct{})
branchToken, err := mutableState.GetCurrentBranchToken()
if err != nil {
Expand Down Expand Up @@ -747,6 +753,12 @@ func (t *timerActiveTaskExecutor) getResurrectedActivity(

// 2. scan history from minActivityScheduledID and see if any
// activity termination events matches pending activity
// NOTE: since we can't read from middle of an events batch,
// history returned by persistence layer won't actually start
// from minActivityScheduledID, but start from the batch whose nodeID is
// larger than minActivityScheduledID.
// This is ok since the event types we are interested in must in batches
// later than the activity scheduled events.
resurrectedActivity := make(map[int64]struct{})
branchToken, err := mutableState.GetCurrentBranchToken()
if err != nil {
Expand Down Expand Up @@ -797,7 +809,7 @@ func (t *timerActiveTaskExecutor) getHistoryPaginationFn(
branchToken,
firstEventID,
nextEventID,
nil,
token,
execution.NDCDefaultPageSize,
common.IntPtr(t.shard.GetShardID()),
)
Expand Down
21 changes: 18 additions & 3 deletions service/history/task/timer_active_task_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package task

import (
"bytes"
"testing"
"time"

Expand Down Expand Up @@ -366,10 +367,17 @@ func (s *timerActiveTaskExecutorSuite) TestProcessUserTimerTimeout_Resurrected()
TaskStatus: execution.TimerTaskStatusNone,
}
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
nextPageToken := []byte{1, 2, 3}
s.mockHistoryV2Mgr.On("ReadHistoryBranch", mock.Anything, mock.MatchedBy(func(req *persistence.ReadHistoryBranchRequest) bool {
return req.MinEventID == startEvent1.GetEventID() && req.NextPageToken == nil
})).Return(&persistence.ReadHistoryBranchResponse{
HistoryEvents: []*types.HistoryEvent{startEvent1, startEvent2, firedEvent1},
HistoryEvents: []*types.HistoryEvent{startEvent1, startEvent2},
NextPageToken: nextPageToken,
}, nil).Once()
s.mockHistoryV2Mgr.On("ReadHistoryBranch", mock.Anything, mock.MatchedBy(func(req *persistence.ReadHistoryBranchRequest) bool {
return req.MinEventID == startEvent1.GetEventID() && bytes.Equal(req.NextPageToken, nextPageToken)
})).Return(&persistence.ReadHistoryBranchResponse{
HistoryEvents: []*types.HistoryEvent{firedEvent1},
NextPageToken: nil,
}, nil).Once()
// only timer2 should be fired
Expand All @@ -387,7 +395,7 @@ func (s *timerActiveTaskExecutorSuite) TestProcessUserTimerTimeout_Resurrected()
return len(req.UpdateWorkflowMutation.DeleteTimerInfos) == 2
})).Return(&persistence.UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &persistence.MutableStateUpdateSessionStats{}}, nil).Once()

s.timerActiveTaskExecutor.config.ResurrectionCheckMinDelay = dynamicconfig.GetDurationPropertyFn(timerTimeout2 - timerTimeout1)
s.timerActiveTaskExecutor.config.ResurrectionCheckMinDelay = dynamicconfig.GetDurationPropertyFnFilteredByDomain(timerTimeout2 - timerTimeout1)
s.timeSource.Update(s.now.Add(timerTimeout2))
err = s.timerActiveTaskExecutor.Execute(timerTask, true)
s.NoError(err)
Expand Down Expand Up @@ -1043,8 +1051,15 @@ func (s *timerActiveTaskExecutorSuite) TestProcessActivityTimeout_Resurrected()
TaskList: tasklist,
}
s.mockExecutionMgr.On("GetWorkflowExecution", mock.Anything, mock.Anything).Return(&persistence.GetWorkflowExecutionResponse{State: persistenceMutableState}, nil)
nextPageToken := []byte{1, 2, 3}
s.mockHistoryV2Mgr.On("ReadHistoryBranch", mock.Anything, mock.MatchedBy(func(req *persistence.ReadHistoryBranchRequest) bool {
return req.MinEventID == scheduledEvent1.GetEventID() && req.NextPageToken == nil
})).Return(&persistence.ReadHistoryBranchResponse{
HistoryEvents: []*types.HistoryEvent{scheduledEvent1, scheduledEvent2, startedEvent1, completeEvent1},
NextPageToken: nextPageToken,
}, nil).Once()
s.mockHistoryV2Mgr.On("ReadHistoryBranch", mock.Anything, mock.MatchedBy(func(req *persistence.ReadHistoryBranchRequest) bool {
return req.MinEventID == scheduledEvent1.GetEventID() && bytes.Equal(req.NextPageToken, nextPageToken)
})).Return(&persistence.ReadHistoryBranchResponse{
HistoryEvents: []*types.HistoryEvent{scheduledEvent1, scheduledEvent2, startedEvent1, completeEvent1},
NextPageToken: nil,
Expand All @@ -1064,7 +1079,7 @@ func (s *timerActiveTaskExecutorSuite) TestProcessActivityTimeout_Resurrected()
return len(req.UpdateWorkflowMutation.DeleteActivityInfos) == 2
})).Return(&persistence.UpdateWorkflowExecutionResponse{MutableStateUpdateSessionStats: &persistence.MutableStateUpdateSessionStats{}}, nil).Once()

s.timerActiveTaskExecutor.config.ResurrectionCheckMinDelay = dynamicconfig.GetDurationPropertyFn(timerTimeout2 - timerTimeout1)
s.timerActiveTaskExecutor.config.ResurrectionCheckMinDelay = dynamicconfig.GetDurationPropertyFnFilteredByDomain(timerTimeout2 - timerTimeout1)
s.timeSource.Update(s.now.Add(timerTimeout2))
err = s.timerActiveTaskExecutor.Execute(timerTask, true)
s.NoError(err)
Expand Down

0 comments on commit 0398bf6

Please sign in to comment.