Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lookahead to infer messages on all replay paths #1055

Merged
merged 1 commit into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,7 @@ func (w *workflowExecutionContextImpl) resetStateIfDestroyed(task *workflowservi
return err
}
}
task.Messages = inferMessages(task.GetHistory().GetEvents())
if w.workflowInfo != nil {
// Reset the search attributes and memos from the WorkflowExecutionStartedEvent.
// The search attributes and memo may have been modified by calls like UpsertMemo
Expand Down
88 changes: 88 additions & 0 deletions internal/internal_task_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
querypb "go.temporal.io/api/query/v1"
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
updatepb "go.temporal.io/api/update/v1"
"go.temporal.io/api/workflowservice/v1"
"go.temporal.io/api/workflowservicemock/v1"

Expand Down Expand Up @@ -1915,3 +1916,90 @@ func TestHeartbeatThrottleInterval(t *testing.T) {
// Default max to 60 if not set
assertInterval(5000, 2, 0, 60)
}

type MockHistoryIterator struct {
GetNextPageImpl func() (*historypb.History, error)
ResetImpl func()
HasNextPageImpl func() bool
}

func (mhi MockHistoryIterator) GetNextPage() (*historypb.History, error) {
return mhi.GetNextPageImpl()
}

func (mhi MockHistoryIterator) Reset() {
mhi.ResetImpl()
}

func (mhi MockHistoryIterator) HasNextPage() bool {
return mhi.HasNextPageImpl()
}

func TestResetIfDestroyedTaskPrep(t *testing.T) {
// a plausible full history that includes an update accepted event to also
// test for lookahead event inference
fullHist := &historypb.History{
Events: []*historypb.HistoryEvent{
createTestEventWorkflowExecutionStarted(1, &historypb.WorkflowExecutionStartedEventAttributes{}),
createTestEventWorkflowTaskScheduled(2, nil),
createTestEventWorkflowTaskStarted(3),
createTestEventWorkflowTaskCompleted(4, nil),
&historypb.HistoryEvent{
EventId: 5,
EventType: enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED,
Attributes: &historypb.HistoryEvent_WorkflowExecutionUpdateAcceptedEventAttributes{
WorkflowExecutionUpdateAcceptedEventAttributes: &historypb.WorkflowExecutionUpdateAcceptedEventAttributes{
ProtocolInstanceId: "123",
AcceptedRequestMessageId: "MSG.001",
AcceptedRequest: &updatepb.Request{},
},
},
},
},
}

// start the task out with a partial history to verify that we reset the
// history iterator back to the start
taskHist := &historypb.History{
Events: []*historypb.HistoryEvent{
createTestEventWorkflowTaskScheduled(6, nil),
createTestEventWorkflowTaskStarted(7),
},
}

// iterator implementation uses partial history until HistoryIterator.Reset
// is called
iterHist := taskHist

histIter := MockHistoryIterator{
ResetImpl: func() {
// if impl calls reset, switch to full history
iterHist = fullHist
},

GetNextPageImpl: func() (*historypb.History, error) {
return iterHist, nil
},
}
task := &workflowservice.PollWorkflowTaskQueueResponse{History: taskHist}

// values of these fields are not important to the test but some of these
// pointers are dereferenced as part of constructing a new event handler so
// they need to be non-nil
weci := &workflowExecutionContextImpl{
workflowInfo: &WorkflowInfo{
WorkflowExecution: WorkflowExecution{},
WorkflowType: WorkflowType{Name: t.Name()},
},
wth: &workflowTaskHandlerImpl{logger: ilog.NewNopLogger()},
}

err := weci.resetStateIfDestroyed(task, histIter)

require.NoError(t, err)
require.Len(t, task.History.Events, len(fullHist.Events),
"expected task to be mutated to carry full WF history (all events)")
require.Len(t, task.Messages, 1,
"expected task to be mutated to carry the update request implied by the "+
"WorkflowExecutionUpdateAccepted event")
}