From 3e3bb672385a8f96c7e7984f5886edb51d7749c1 Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Mon, 13 Sep 2021 12:37:01 -0700 Subject: [PATCH] Fix startTime in workflow task refresher (#4488) --- .../history/execution/mutable_state_task_refresher.go | 7 +++++-- .../execution/mutable_state_task_refresher_mock.go | 9 +++++---- service/history/execution/state_rebuilder.go | 2 +- service/history/execution/state_rebuilder_test.go | 2 +- service/history/historyEngine.go | 2 +- 5 files changed, 13 insertions(+), 9 deletions(-) diff --git a/service/history/execution/mutable_state_task_refresher.go b/service/history/execution/mutable_state_task_refresher.go index 034b9e4f649..271eebda293 100644 --- a/service/history/execution/mutable_state_task_refresher.go +++ b/service/history/execution/mutable_state_task_refresher.go @@ -41,7 +41,7 @@ var emptyTasks = []persistence.Task{} type ( // MutableStateTaskRefresher refreshes workflow transfer and timer tasks MutableStateTaskRefresher interface { - RefreshTasks(ctx context.Context, mutableState MutableState) error + RefreshTasks(ctx context.Context, startTime time.Time, mutableState MutableState) error } mutableStateTaskRefresherImpl struct { @@ -76,6 +76,7 @@ func NewMutableStateTaskRefresher( func (r *mutableStateTaskRefresherImpl) RefreshTasks( ctx context.Context, + startTime time.Time, mutableState MutableState, ) error { @@ -88,6 +89,7 @@ func (r *mutableStateTaskRefresherImpl) RefreshTasks( if err := r.refreshTasksForWorkflowStart( ctx, + startTime, mutableState, taskGenerator, ); err != nil { @@ -173,6 +175,7 @@ func (r *mutableStateTaskRefresherImpl) RefreshTasks( func (r *mutableStateTaskRefresherImpl) refreshTasksForWorkflowStart( ctx context.Context, + startTime time.Time, mutableState MutableState, taskGenerator MutableStateTaskGenerator, ) error { @@ -183,7 +186,7 @@ func (r *mutableStateTaskRefresherImpl) refreshTasksForWorkflowStart( } if err := taskGenerator.GenerateWorkflowStartTasks( - time.Unix(0, startEvent.GetTimestamp()), + startTime, startEvent, ); err != nil { return err diff --git a/service/history/execution/mutable_state_task_refresher_mock.go b/service/history/execution/mutable_state_task_refresher_mock.go index 0f5256885d0..bca4e011b49 100644 --- a/service/history/execution/mutable_state_task_refresher_mock.go +++ b/service/history/execution/mutable_state_task_refresher_mock.go @@ -29,6 +29,7 @@ package execution import ( context "context" reflect "reflect" + time "time" gomock "github.com/golang/mock/gomock" ) @@ -57,15 +58,15 @@ func (m *MockMutableStateTaskRefresher) EXPECT() *MockMutableStateTaskRefresherM } // RefreshTasks mocks base method -func (m *MockMutableStateTaskRefresher) RefreshTasks(ctx context.Context, mutableState MutableState) error { +func (m *MockMutableStateTaskRefresher) RefreshTasks(ctx context.Context, startTime time.Time, mutableState MutableState) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "RefreshTasks", ctx, mutableState) + ret := m.ctrl.Call(m, "RefreshTasks", ctx, startTime, mutableState) ret0, _ := ret[0].(error) return ret0 } // RefreshTasks indicates an expected call of RefreshTasks -func (mr *MockMutableStateTaskRefresherMockRecorder) RefreshTasks(ctx, mutableState interface{}) *gomock.Call { +func (mr *MockMutableStateTaskRefresherMockRecorder) RefreshTasks(ctx, startTime, mutableState interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RefreshTasks", reflect.TypeOf((*MockMutableStateTaskRefresher)(nil).RefreshTasks), ctx, mutableState) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RefreshTasks", reflect.TypeOf((*MockMutableStateTaskRefresher)(nil).RefreshTasks), ctx, startTime, mutableState) } diff --git a/service/history/execution/state_rebuilder.go b/service/history/execution/state_rebuilder.go index d8f6dfe212e..da91949ad6c 100644 --- a/service/history/execution/state_rebuilder.go +++ b/service/history/execution/state_rebuilder.go @@ -185,7 +185,7 @@ func (r *stateRebuilderImpl) Rebuild( } // refresh tasks to be generated - if err := r.taskRefresher.RefreshTasks(ctx, rebuiltMutableState); err != nil { + if err := r.taskRefresher.RefreshTasks(ctx, now, rebuiltMutableState); err != nil { return nil, 0, err } diff --git a/service/history/execution/state_rebuilder_test.go b/service/history/execution/state_rebuilder_test.go index 7e08a940acd..028c8808c4a 100644 --- a/service/history/execution/state_rebuilder_test.go +++ b/service/history/execution/state_rebuilder_test.go @@ -304,7 +304,7 @@ func (s *stateRebuilderSuite) TestRebuild() { 1234, s.mockClusterMetadata, ), nil).AnyTimes() - s.mockTaskRefresher.EXPECT().RefreshTasks(gomock.Any(), gomock.Any()).Return(nil).Times(1) + s.mockTaskRefresher.EXPECT().RefreshTasks(gomock.Any(), now, gomock.Any()).Return(nil).Times(1) rebuildMutableState, rebuiltHistorySize, err := s.nDCStateRebuilder.Rebuild( context.Background(), diff --git a/service/history/historyEngine.go b/service/history/historyEngine.go index 7a530f46f58..df9f5c668e4 100644 --- a/service/history/historyEngine.go +++ b/service/history/historyEngine.go @@ -3276,7 +3276,7 @@ func (e *historyEngineImpl) RefreshWorkflowTasks( e.shard.GetShardID(), ) - err = mutableStateTaskRefresher.RefreshTasks(ctx, mutableState) + err = mutableStateTaskRefresher.RefreshTasks(ctx, mutableState.GetExecutionInfo().StartTimestamp, mutableState) if err != nil { return err }