Skip to content

Commit

Permalink
Use GetCloseTime for generate close event task (#4797)
Browse files Browse the repository at this point in the history
<!-- Describe what has changed in this PR -->
**What changed?**
Use GetCloseTime for generate close event task

<!-- Tell your future self why have you made these changes -->
**Why?**
As we store the closed time in mutable state, we can get the close time
from mutable state instead of reading it from history event.

<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
**How did you test it?**


<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
**Potential risks**


<!-- Is this PR a hotfix candidate or require that a notification be
sent to the broader community? (Yes/No) -->
**Is hotfix candidate?**
  • Loading branch information
yux0 authored Aug 24, 2023
1 parent 4430add commit 5795ad2
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 39 deletions.
12 changes: 6 additions & 6 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -2789,7 +2789,7 @@ func (ms *MutableStateImpl) AddCompletedWorkflowEvent(
}
// TODO merge active & passive task generation
if err := ms.taskGenerator.GenerateWorkflowCloseTasks(
event,
event.GetEventTime(),
false,
); err != nil {
return nil, err
Expand Down Expand Up @@ -2834,7 +2834,7 @@ func (ms *MutableStateImpl) AddFailWorkflowEvent(
}
// TODO merge active & passive task generation
if err := ms.taskGenerator.GenerateWorkflowCloseTasks(
event,
event.GetEventTime(),
false,
); err != nil {
return nil, err
Expand Down Expand Up @@ -2878,7 +2878,7 @@ func (ms *MutableStateImpl) AddTimeoutWorkflowEvent(
}
// TODO merge active & passive task generation
if err := ms.taskGenerator.GenerateWorkflowCloseTasks(
event,
event.GetEventTime(),
false,
); err != nil {
return nil, err
Expand Down Expand Up @@ -2959,7 +2959,7 @@ func (ms *MutableStateImpl) AddWorkflowExecutionCanceledEvent(
}
// TODO merge active & passive task generation
if err := ms.taskGenerator.GenerateWorkflowCloseTasks(
event,
event.GetEventTime(),
false,
); err != nil {
return nil, err
Expand Down Expand Up @@ -3509,7 +3509,7 @@ func (ms *MutableStateImpl) AddWorkflowExecutionTerminatedEvent(
}
// TODO merge active & passive task generation
if err := ms.taskGenerator.GenerateWorkflowCloseTasks(
event,
event.GetEventTime(),
deleteAfterTerminate,
); err != nil {
return nil, err
Expand Down Expand Up @@ -3752,7 +3752,7 @@ func (ms *MutableStateImpl) AddContinueAsNewEvent(
}
// TODO merge active & passive task generation
if err := ms.taskGenerator.GenerateWorkflowCloseTasks(
continueAsNewEvent,
continueAsNewEvent.GetEventTime(),
false,
); err != nil {
return nil, nil, err
Expand Down
12 changes: 6 additions & 6 deletions service/history/workflow/mutable_state_rebuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ func (b *MutableStateRebuilderImpl) applyEvents(
}

if err := taskGenerator.GenerateWorkflowCloseTasks(
event,
event.GetEventTime(),
false,
); err != nil {
return nil, err
Expand All @@ -575,7 +575,7 @@ func (b *MutableStateRebuilderImpl) applyEvents(
}

if err := taskGenerator.GenerateWorkflowCloseTasks(
event,
event.GetEventTime(),
false,
); err != nil {
return nil, err
Expand All @@ -590,7 +590,7 @@ func (b *MutableStateRebuilderImpl) applyEvents(
}

if err := taskGenerator.GenerateWorkflowCloseTasks(
event,
event.GetEventTime(),
false,
); err != nil {
return nil, err
Expand All @@ -605,7 +605,7 @@ func (b *MutableStateRebuilderImpl) applyEvents(
}

if err := taskGenerator.GenerateWorkflowCloseTasks(
event,
event.GetEventTime(),
false,
); err != nil {
return nil, err
Expand All @@ -620,7 +620,7 @@ func (b *MutableStateRebuilderImpl) applyEvents(
}

if err := taskGenerator.GenerateWorkflowCloseTasks(
event,
event.GetEventTime(),
false,
); err != nil {
return nil, err
Expand Down Expand Up @@ -667,7 +667,7 @@ func (b *MutableStateRebuilderImpl) applyEvents(
}

if err := taskGenerator.GenerateWorkflowCloseTasks(
event,
event.GetEventTime(),
false,
); err != nil {
return nil, err
Expand Down
14 changes: 7 additions & 7 deletions service/history/workflow/mutable_state_rebuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionTimedOut()
s.mockMutableState.EXPECT().ReplicateWorkflowExecutionTimedoutEvent(event.GetEventId(), event).Return(nil)
s.mockUpdateVersion(event)
s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks(
event,
&now,
false,
).Return(nil)
s.mockMutableState.EXPECT().ClearStickyTaskQueue()
Expand Down Expand Up @@ -310,7 +310,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionTerminated
s.mockMutableState.EXPECT().ReplicateWorkflowExecutionTerminatedEvent(event.GetEventId(), event).Return(nil)
s.mockUpdateVersion(event)
s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks(
event,
&now,
false,
).Return(nil)
s.mockMutableState.EXPECT().ClearStickyTaskQueue()
Expand Down Expand Up @@ -341,7 +341,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionFailed() {
s.mockMutableState.EXPECT().ReplicateWorkflowExecutionFailedEvent(event.GetEventId(), event).Return(nil)
s.mockUpdateVersion(event)
s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks(
event,
&now,
false,
).Return(nil)
s.mockMutableState.EXPECT().ClearStickyTaskQueue()
Expand Down Expand Up @@ -373,7 +373,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionCompleted(
s.mockMutableState.EXPECT().ReplicateWorkflowExecutionCompletedEvent(event.GetEventId(), event).Return(nil)
s.mockUpdateVersion(event)
s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks(
event,
&now,
false,
).Return(nil)
s.mockMutableState.EXPECT().ClearStickyTaskQueue()
Expand Down Expand Up @@ -405,7 +405,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionCanceled()
s.mockMutableState.EXPECT().ReplicateWorkflowExecutionCanceledEvent(event.GetEventId(), event).Return(nil)
s.mockUpdateVersion(event)
s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks(
event,
&now,
false,
).Return(nil)
s.mockMutableState.EXPECT().ClearStickyTaskQueue()
Expand Down Expand Up @@ -503,7 +503,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionContinuedA
s.mockMutableState.EXPECT().GetNamespaceEntry().Return(tests.GlobalNamespaceEntry).AnyTimes()
s.mockUpdateVersion(continueAsNewEvent)
s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks(
continueAsNewEvent,
&now,
false,
).Return(nil)
s.mockMutableState.EXPECT().ClearStickyTaskQueue()
Expand Down Expand Up @@ -561,7 +561,7 @@ func (s *stateBuilderSuite) TestApplyEvents_EventTypeWorkflowExecutionContinuedA
s.mockMutableState.EXPECT().GetNamespaceEntry().Return(tests.GlobalNamespaceEntry).AnyTimes()
s.mockUpdateVersion(continueAsNewEvent)
s.mockTaskGenerator.EXPECT().GenerateWorkflowCloseTasks(
continueAsNewEvent,
&now,
false,
).Return(nil)
s.mockMutableState.EXPECT().ClearStickyTaskQueue()
Expand Down
11 changes: 4 additions & 7 deletions service/history/workflow/task_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,7 @@ type (
startEvent *historypb.HistoryEvent,
) error
GenerateWorkflowCloseTasks(
// TODO: remove closeEvent parameter
// when deprecating the backward compatible logic
// for getting close time from close event.
closeEvent *historypb.HistoryEvent,
closedTime *time.Time,
deleteAfterClose bool,
) error
// GenerateDeleteHistoryEventTask adds a tasks.DeleteHistoryEventTask to the mutable state.
Expand Down Expand Up @@ -154,7 +151,7 @@ func (r *TaskGeneratorImpl) GenerateWorkflowStartTasks(
}

func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks(
closeEvent *historypb.HistoryEvent,
closedTime *time.Time,
deleteAfterClose bool,
) error {

Expand Down Expand Up @@ -206,7 +203,7 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks(
delay = retention
}
// archiveTime is the time when the archival queue recognizes the ArchiveExecutionTask as ready-to-process
archiveTime := closeEvent.GetEventTime().Add(delay)
archiveTime := timestamp.TimeValue(closedTime).Add(delay)

// This flag is only untrue for old server versions which were using the archival workflow instead of the
// archival queue.
Expand All @@ -219,7 +216,7 @@ func (r *TaskGeneratorImpl) GenerateWorkflowCloseTasks(
}
closeTasks = append(closeTasks, task)
} else {
closeTime := timestamp.TimeValue(closeEvent.GetEventTime())
closeTime := timestamp.TimeValue(closedTime)
if err := r.GenerateDeleteHistoryEventTask(closeTime, false); err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions service/history/workflow/task_generator_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 1 addition & 7 deletions service/history/workflow/task_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"

"go.temporal.io/server/api/persistence/v1"
"go.temporal.io/server/common/archiver"
Expand Down Expand Up @@ -277,12 +276,7 @@ func TestTaskGeneratorImpl_GenerateWorkflowCloseTasks(t *testing.T) {
}).AnyTimes()

taskGenerator := NewTaskGenerator(namespaceRegistry, mutableState, cfg, archivalMetadata)
err := taskGenerator.GenerateWorkflowCloseTasks(&historypb.HistoryEvent{
Attributes: &historypb.HistoryEvent_WorkflowExecutionCompletedEventAttributes{
WorkflowExecutionCompletedEventAttributes: &historypb.WorkflowExecutionCompletedEventAttributes{},
},
EventTime: timestamp.TimePtr(p.CloseEventTime),
}, p.DeleteAfterClose)
err := taskGenerator.GenerateWorkflowCloseTasks(timestamp.TimePtr(p.CloseEventTime), p.DeleteAfterClose)
require.NoError(t, err)

var (
Expand Down
4 changes: 2 additions & 2 deletions service/history/workflow/task_refresher.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,13 +188,13 @@ func (r *TaskRefresherImpl) refreshTasksForWorkflowClose(

executionState := mutableState.GetExecutionState()
if executionState.Status != enumspb.WORKFLOW_EXECUTION_STATUS_RUNNING {
closeEvent, err := mutableState.GetCompletionEvent(ctx)
closeEventTime, err := mutableState.GetWorkflowCloseTime(ctx)
if err != nil {
return err
}

return taskGenerator.GenerateWorkflowCloseTasks(
closeEvent,
closeEventTime,
false,
)
}
Expand Down

0 comments on commit 5795ad2

Please sign in to comment.