Skip to content

Commit

Permalink
Message inference for more WF ctx reset paths (#1073)
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt McShane authored Mar 29, 2023
1 parent 88a40de commit 5c0e091
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 13 deletions.
3 changes: 2 additions & 1 deletion internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,7 @@ func (wth *workflowTaskHandlerImpl) createWorkflowContext(task *workflowservice.
if taskQueue == nil || taskQueue.Name == "" {
return nil, errors.New("nil or empty TaskQueue in WorkflowExecutionStarted event")
}
task.Messages = append(inferMessages(task.GetHistory().GetEvents()), task.Messages...)

runID := task.WorkflowExecution.GetRunId()
workflowID := task.WorkflowExecution.GetWorkflowId()
Expand Down Expand Up @@ -702,7 +703,7 @@ func (w *workflowExecutionContextImpl) resetStateIfDestroyed(task *workflowservi
return err
}
}
task.Messages = inferMessages(task.GetHistory().GetEvents())
task.Messages = append(inferMessages(task.GetHistory().GetEvents()), task.Messages...)
if w.workflowInfo != nil {
// Reset the search attributes and memos from the WorkflowExecutionStartedEvent.
// The search attributes and memo may have been modified by calls like UpsertMemo
Expand Down
74 changes: 62 additions & 12 deletions internal/internal_task_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
commonpb "go.temporal.io/api/common/v1"
enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
protocolpb "go.temporal.io/api/protocol/v1"
querypb "go.temporal.io/api/query/v1"
"go.temporal.io/api/serviceerror"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
Expand All @@ -51,6 +52,7 @@ import (

"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/internal/common"
"go.temporal.io/sdk/internal/common/cache"
"go.temporal.io/sdk/internal/common/metrics"
ilog "go.temporal.io/sdk/internal/log"
"go.temporal.io/sdk/log"
Expand Down Expand Up @@ -1939,6 +1941,7 @@ func TestHeartbeatThrottleInterval(t *testing.T) {
}

type MockHistoryIterator struct {
HistoryIterator
GetNextPageImpl func() (*historypb.History, error)
ResetImpl func()
HasNextPageImpl func() bool
Expand All @@ -1957,11 +1960,15 @@ func (mhi MockHistoryIterator) HasNextPage() bool {
}

func TestResetIfDestroyedTaskPrep(t *testing.T) {
historyAcceptedMsgID := t.Name() + "-historyAcceptedMsgID"
// a plausible full history that includes an update accepted event to also
// test for lookahead event inference
fullHist := &historypb.History{
Events: []*historypb.HistoryEvent{
createTestEventWorkflowExecutionStarted(1, &historypb.WorkflowExecutionStartedEventAttributes{}),
createTestEventWorkflowExecutionStarted(1,
&historypb.WorkflowExecutionStartedEventAttributes{
TaskQueue: &taskqueuepb.TaskQueue{Name: t.Name() + "-queue"},
}),
createTestEventWorkflowTaskScheduled(2, nil),
createTestEventWorkflowTaskStarted(3),
createTestEventWorkflowTaskCompleted(4, nil),
Expand All @@ -1971,7 +1978,7 @@ func TestResetIfDestroyedTaskPrep(t *testing.T) {
Attributes: &historypb.HistoryEvent_WorkflowExecutionUpdateAcceptedEventAttributes{
WorkflowExecutionUpdateAcceptedEventAttributes: &historypb.WorkflowExecutionUpdateAcceptedEventAttributes{
ProtocolInstanceId: "123",
AcceptedRequestMessageId: "MSG.001",
AcceptedRequestMessageId: historyAcceptedMsgID,
AcceptedRequest: &updatepb.Request{},
},
},
Expand Down Expand Up @@ -2002,8 +2009,7 @@ func TestResetIfDestroyedTaskPrep(t *testing.T) {
return iterHist, nil
},
}
task := &workflowservice.PollWorkflowTaskQueueResponse{History: taskHist}

cache := cache.NewLRU(1)
// values of these fields are not important to the test but some of these
// pointers are dereferenced as part of constructing a new event handler so
// they need to be non-nil
Expand All @@ -2012,15 +2018,59 @@ func TestResetIfDestroyedTaskPrep(t *testing.T) {
WorkflowExecution: WorkflowExecution{},
WorkflowType: WorkflowType{Name: t.Name()},
},
wth: &workflowTaskHandlerImpl{logger: ilog.NewNopLogger()},
wth: &workflowTaskHandlerImpl{
metricsHandler: metrics.NopHandler,
logger: ilog.NewNopLogger(),
cache: &WorkerCache{
sharedCache: &sharedWorkerCache{workflowCache: &cache},
},
},
}

err := weci.resetStateIfDestroyed(task, histIter)
// assertion helper for use below
requireContainsMsgWithID := func(t *testing.T, msgs []*protocolpb.Message, id string) {
t.Helper()
for _, msg := range msgs {
if msg.GetId() == id {
return
}
}
require.FailNow(t, "expected message not found",
"message with id %q not found in %v", id, msgs)
}

require.NoError(t, err)
require.Len(t, task.History.Events, len(fullHist.Events),
"expected task to be mutated to carry full WF history (all events)")
require.Len(t, task.Messages, 1,
"expected task to be mutated to carry the update request implied by the "+
"WorkflowExecutionUpdateAccepted event")
wftNewMsgID := t.Name() + "-wftNewMsgID"
t.Run("cache miss", func(t *testing.T) {
task := &workflowservice.PollWorkflowTaskQueueResponse{
History: taskHist,
Messages: []*protocolpb.Message{&protocolpb.Message{Id: wftNewMsgID}},
}

require.EqualValues(t, 0, cache.Size())
// cache is empty so this should miss and build a new context with a
// full history
_, err := weci.wth.getOrCreateWorkflowContext(task, histIter)

require.NoError(t, err)
require.Len(t, task.History.Events, len(fullHist.Events),
"expected task to be mutated to carry full WF history (all events)")
requireContainsMsgWithID(t, task.Messages, wftNewMsgID)
requireContainsMsgWithID(t, task.Messages, historyAcceptedMsgID)
})
t.Run("cache hit but destroyed", func(t *testing.T) {
task := &workflowservice.PollWorkflowTaskQueueResponse{
History: taskHist,
Messages: []*protocolpb.Message{&protocolpb.Message{Id: wftNewMsgID}},
}

// trick the execution context into thinking it has been destroyed
weci.eventHandler = nil
err := weci.resetStateIfDestroyed(task, histIter)

require.NoError(t, err)
require.Len(t, task.History.Events, len(fullHist.Events),
"expected task to be mutated to carry full WF history (all events)")
requireContainsMsgWithID(t, task.Messages, wftNewMsgID)
requireContainsMsgWithID(t, task.Messages, historyAcceptedMsgID)
})
}

0 comments on commit 5c0e091

Please sign in to comment.