Skip to content

Commit

Permalink
Fix startTime in workflow task refresher (#4488)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Sep 13, 2021
1 parent 9e99272 commit 3e3bb67
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 9 deletions.
7 changes: 5 additions & 2 deletions service/history/execution/mutable_state_task_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var emptyTasks = []persistence.Task{}
type (
// MutableStateTaskRefresher refreshes workflow transfer and timer tasks
MutableStateTaskRefresher interface {
RefreshTasks(ctx context.Context, mutableState MutableState) error
RefreshTasks(ctx context.Context, startTime time.Time, mutableState MutableState) error
}

mutableStateTaskRefresherImpl struct {
Expand Down Expand Up @@ -76,6 +76,7 @@ func NewMutableStateTaskRefresher(

func (r *mutableStateTaskRefresherImpl) RefreshTasks(
ctx context.Context,
startTime time.Time,
mutableState MutableState,
) error {

Expand All @@ -88,6 +89,7 @@ func (r *mutableStateTaskRefresherImpl) RefreshTasks(

if err := r.refreshTasksForWorkflowStart(
ctx,
startTime,
mutableState,
taskGenerator,
); err != nil {
Expand Down Expand Up @@ -173,6 +175,7 @@ func (r *mutableStateTaskRefresherImpl) RefreshTasks(

func (r *mutableStateTaskRefresherImpl) refreshTasksForWorkflowStart(
ctx context.Context,
startTime time.Time,
mutableState MutableState,
taskGenerator MutableStateTaskGenerator,
) error {
Expand All @@ -183,7 +186,7 @@ func (r *mutableStateTaskRefresherImpl) refreshTasksForWorkflowStart(
}

if err := taskGenerator.GenerateWorkflowStartTasks(
time.Unix(0, startEvent.GetTimestamp()),
startTime,
startEvent,
); err != nil {
return err
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion service/history/execution/state_rebuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (r *stateRebuilderImpl) Rebuild(
}

// refresh tasks to be generated
if err := r.taskRefresher.RefreshTasks(ctx, rebuiltMutableState); err != nil {
if err := r.taskRefresher.RefreshTasks(ctx, now, rebuiltMutableState); err != nil {
return nil, 0, err
}

Expand Down
2 changes: 1 addition & 1 deletion service/history/execution/state_rebuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func (s *stateRebuilderSuite) TestRebuild() {
1234,
s.mockClusterMetadata,
), nil).AnyTimes()
s.mockTaskRefresher.EXPECT().RefreshTasks(gomock.Any(), gomock.Any()).Return(nil).Times(1)
s.mockTaskRefresher.EXPECT().RefreshTasks(gomock.Any(), now, gomock.Any()).Return(nil).Times(1)

rebuildMutableState, rebuiltHistorySize, err := s.nDCStateRebuilder.Rebuild(
context.Background(),
Expand Down
2 changes: 1 addition & 1 deletion service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -3276,7 +3276,7 @@ func (e *historyEngineImpl) RefreshWorkflowTasks(
e.shard.GetShardID(),
)

err = mutableStateTaskRefresher.RefreshTasks(ctx, mutableState)
err = mutableStateTaskRefresher.RefreshTasks(ctx, mutableState.GetExecutionInfo().StartTimestamp, mutableState)
if err != nil {
return err
}
Expand Down

0 comments on commit 3e3bb67

Please sign in to comment.