Skip to content

Commit

Permalink
Fix panic when handling version & side effect markers together (#380)
Browse files Browse the repository at this point in the history
When executing a side effect after calling GetVersion, command ids could end up ordered incorrectly. This change fixes that issue by ensuring that every time a command is added, some special-case code regarding version markers is run.
  • Loading branch information
Sushisource authored Mar 11, 2021
1 parent 2c4a82e commit c5bb86d
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 13 deletions.
13 changes: 9 additions & 4 deletions internal/internal_decision_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,14 @@ func (h *commandsHelper) setCurrentWorkflowTaskStartedEventID(workflowTaskStarte

func (h *commandsHelper) getNextID() int64 {
// First check if we have a GetVersion marker in the lookup map
h.incrementNextCommandEventIDIfVersionMarker()
if h.nextCommandEventID == 0 {
panic("Attempt to generate a command before processing WorkflowTaskStarted event")
}
return h.nextCommandEventID
}

func (h *commandsHelper) incrementNextCommandEventIDIfVersionMarker() {
if _, ok := h.versionMarkerLookup[h.nextCommandEventID]; ok {
// Remove the marker from the lookup map and increment nextCommandEventID by 2 because call to GetVersion
// results in 2 events in the history. One is GetVersion marker event for changeID and change version, other
Expand All @@ -819,10 +827,6 @@ func (h *commandsHelper) getNextID() int64 {
h.incrementNextCommandEventID()
h.incrementNextCommandEventID()
}
if h.nextCommandEventID == 0 {
panic("Attempt to generate a command before processing WorkflowTaskStarted event")
}
return h.nextCommandEventID
}

func (h *commandsHelper) getCommand(id commandID) commandStateMachine {
Expand All @@ -844,6 +848,7 @@ func (h *commandsHelper) addCommand(command commandStateMachine) {
h.commands[command.getID()] = element

// Every time new command is added increment the counter used for generating ID
h.incrementNextCommandEventIDIfVersionMarker()
h.incrementNextCommandEventID()
}

Expand Down
2 changes: 1 addition & 1 deletion internal/internal_task_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func createTestEventWorkflowExecutionStarted(eventID int64, attr *historypb.Work
Attributes: &historypb.HistoryEvent_WorkflowExecutionStartedEventAttributes{WorkflowExecutionStartedEventAttributes: attr}}
}

func createTestEventLocalActivity(eventID int64, attr *historypb.MarkerRecordedEventAttributes) *historypb.HistoryEvent {
func createTestEventMarkerRecorded(eventID int64, attr *historypb.MarkerRecordedEventAttributes) *historypb.HistoryEvent {
return &historypb.HistoryEvent{
EventId: eventID,
EventType: enumspb.EVENT_TYPE_MARKER_RECORDED,
Expand Down
99 changes: 91 additions & 8 deletions internal/internal_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,16 @@ func (s *internalWorkerTestSuite) createLocalActivityMarkerDataForTest(activityI
}
}

func (s *internalWorkerTestSuite) createSideEffectMarkerDataForTest(payloads *commonpb.Payloads,
sideEffectID int64) map[string]*commonpb.Payloads {
idPayload, err := s.dataConverter.ToPayloads(sideEffectID)
s.NoError(err)
return map[string]*commonpb.Payloads{
sideEffectMarkerDataName: payloads,
sideEffectMarkerIDName: idPayload,
}
}

func getLogger() log.Logger {
return ilog.NewDefaultLogger()
}
Expand Down Expand Up @@ -306,12 +316,12 @@ func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_LocalActivity() {
createTestEventWorkflowTaskStarted(3),
createTestEventWorkflowTaskCompleted(4, &historypb.WorkflowTaskCompletedEventAttributes{}),

createTestEventLocalActivity(5, &historypb.MarkerRecordedEventAttributes{
createTestEventMarkerRecorded(5, &historypb.MarkerRecordedEventAttributes{
MarkerName: localActivityMarkerName,
Details: s.createLocalActivityMarkerDataForTest("1"),
WorkflowTaskCompletedEventId: 4,
}),
createTestEventLocalActivity(6, &historypb.MarkerRecordedEventAttributes{
createTestEventMarkerRecorded(6, &historypb.MarkerRecordedEventAttributes{
MarkerName: localActivityMarkerName,
Details: s.createLocalActivityMarkerDataForTest("2"),
WorkflowTaskCompletedEventId: 4,
Expand Down Expand Up @@ -428,12 +438,12 @@ func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_LocalAndRemoteActivi
createTestEventVersionMarker(5, 4, "change_id_A", Version(3)),
createTestUpsertWorkflowSearchAttributesForChangeVersion(6, 4, "change_id_A", Version(3)),

createTestEventLocalActivity(7, &historypb.MarkerRecordedEventAttributes{
createTestEventMarkerRecorded(7, &historypb.MarkerRecordedEventAttributes{
MarkerName: localActivityMarkerName,
Details: s.createLocalActivityMarkerDataForTest("1"),
WorkflowTaskCompletedEventId: 4,
}),
createTestEventLocalActivity(8, &historypb.MarkerRecordedEventAttributes{
createTestEventMarkerRecorded(8, &historypb.MarkerRecordedEventAttributes{
MarkerName: localActivityMarkerName,
Details: s.createLocalActivityMarkerDataForTest("2"),
WorkflowTaskCompletedEventId: 4,
Expand All @@ -443,7 +453,7 @@ func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_LocalAndRemoteActivi
ActivityType: &commonpb.ActivityType{Name: "testActivity"},
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue},
}),
createTestEventLocalActivity(10, &historypb.MarkerRecordedEventAttributes{
createTestEventMarkerRecorded(10, &historypb.MarkerRecordedEventAttributes{
MarkerName: localActivityMarkerName,
Details: s.createLocalActivityMarkerDataForTest("3"),
WorkflowTaskCompletedEventId: 4,
Expand Down Expand Up @@ -666,6 +676,79 @@ func createHistoryForGetVersionTests(workflowType string) []*historypb.HistoryEv
}
}

func testReplayWorkflowGetVersionWithSideEffect(ctx Context) error {
var uniqueID *string

v := GetVersion(ctx, "UniqueID", DefaultVersion, 1)
if v == 1 {
encodedUID := SideEffect(ctx, func(ctx Context) interface{} {
return "TEST-UNIQUE-ID"
})
err := encodedUID.Get(&uniqueID)
if err != nil {
return err
}
}

var result string
err := ExecuteActivity(ctx, "testActivityReturnString").Get(ctx, &result)
if err != nil {
return err
}

return nil
}

func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_GetVersionWithSideEffect() {
taskQueue := "taskQueue1"
sideEffectPayloads, seErr := s.dataConverter.ToPayloads("TEST-UNIQUE-ID")
s.NoError(seErr)
testEvents := []*historypb.HistoryEvent{
createTestEventWorkflowExecutionStarted(1, &historypb.WorkflowExecutionStartedEventAttributes{
WorkflowType: &commonpb.WorkflowType{Name: "testReplayWorkflowGetVersionWithSideEffect"},
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue},
Input: testEncodeFunctionArgs(converter.GetDefaultDataConverter()),
}),
createTestEventWorkflowTaskScheduled(2, &historypb.WorkflowTaskScheduledEventAttributes{}),
createTestEventWorkflowTaskStarted(3),
createTestEventWorkflowTaskCompleted(4, &historypb.WorkflowTaskCompletedEventAttributes{}),
createTestEventVersionMarker(5, 4, "UniqueID", Version(1)),
createTestUpsertWorkflowSearchAttributesForChangeVersion(6, 4, "UniqueID", Version(1)),
createTestEventMarkerRecorded(7, &historypb.MarkerRecordedEventAttributes{
MarkerName: sideEffectMarkerName,
Details: s.createSideEffectMarkerDataForTest(sideEffectPayloads, 1),
WorkflowTaskCompletedEventId: 4,
}),
createTestEventActivityTaskScheduled(8, &historypb.ActivityTaskScheduledEventAttributes{
ActivityId: "8",
ActivityType: &commonpb.ActivityType{Name: "testActivityReturnString"},
TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue},
}),
createTestEventActivityTaskStarted(9, &historypb.ActivityTaskStartedEventAttributes{
ScheduledEventId: 8,
}),
createTestEventActivityTaskCompleted(10, &historypb.ActivityTaskCompletedEventAttributes{
ScheduledEventId: 8,
StartedEventId: 9,
}),
createTestEventWorkflowTaskScheduled(11, &historypb.WorkflowTaskScheduledEventAttributes{}),
createTestEventWorkflowTaskStarted(12),
createTestEventWorkflowTaskCompleted(13, &historypb.WorkflowTaskCompletedEventAttributes{
ScheduledEventId: 11,
StartedEventId: 12,
}),
createTestEventWorkflowExecutionCompleted(14, &historypb.WorkflowExecutionCompletedEventAttributes{
WorkflowTaskCompletedEventId: 13,
}),
}
history := &historypb.History{Events: testEvents}
logger := getLogger()
replayer := NewWorkflowReplayer()
replayer.RegisterWorkflow(testReplayWorkflowGetVersionWithSideEffect)
err := replayer.ReplayWorkflowHistory(logger, history)
require.NoError(s.T(), err)
}

func testReplayWorkflowCancelActivity(ctx Context) error {
ctx1, cancelFunc1 := WithCancel(ctx)

Expand Down Expand Up @@ -1090,12 +1173,12 @@ func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_LocalActivity_Result
createTestEventWorkflowTaskStarted(3),
createTestEventWorkflowTaskCompleted(4, &historypb.WorkflowTaskCompletedEventAttributes{}),

createTestEventLocalActivity(5, &historypb.MarkerRecordedEventAttributes{
createTestEventMarkerRecorded(5, &historypb.MarkerRecordedEventAttributes{
MarkerName: localActivityMarkerName,
Details: s.createLocalActivityMarkerDataForTest("1"),
WorkflowTaskCompletedEventId: 4,
}),
createTestEventLocalActivity(6, &historypb.MarkerRecordedEventAttributes{
createTestEventMarkerRecorded(6, &historypb.MarkerRecordedEventAttributes{
MarkerName: localActivityMarkerName,
Details: s.createLocalActivityMarkerDataForTest("2"),
WorkflowTaskCompletedEventId: 4,
Expand Down Expand Up @@ -1132,7 +1215,7 @@ func (s *internalWorkerTestSuite) TestReplayWorkflowHistory_LocalActivity_Activi
createTestEventWorkflowTaskStarted(3),
createTestEventWorkflowTaskCompleted(4, &historypb.WorkflowTaskCompletedEventAttributes{}),

createTestEventLocalActivity(5, &historypb.MarkerRecordedEventAttributes{
createTestEventMarkerRecorded(5, &historypb.MarkerRecordedEventAttributes{
MarkerName: localActivityMarkerName,
Details: s.createLocalActivityMarkerDataForTest("0"),
WorkflowTaskCompletedEventId: 4,
Expand Down
41 changes: 41 additions & 0 deletions internal/internal_workflow_testsuite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,6 +610,47 @@ func (s *WorkflowTestSuiteUnitTest) Test_SideEffect() {
s.Nil(env.GetWorkflowError())
}

func (s *WorkflowTestSuiteUnitTest) Test_SideEffect_WithVersion() {
workflowFn := func(ctx Context) error {
ctx = WithActivityOptions(ctx, s.activityOptions)

qerr := SetQueryHandler(ctx, "test-query", func() (string, error) {
return "queryresult", nil
})

if qerr != nil {
return qerr
}
var uniqueID *string

v := GetVersion(ctx, "UniqueID", DefaultVersion, 1)
if v == 1 {
encodedUID := SideEffect(ctx, func(ctx Context) interface{} {
return "TEST-UNIQUE-ID"
})
err := encodedUID.Get(&uniqueID)
if err != nil {
return err
}
}

f := ExecuteActivity(ctx, testActivityHello, "msg1")
err := f.Get(ctx, nil) // wait for result
return err
}

env := s.NewTestWorkflowEnvironment()
env.RegisterWorkflow(workflowFn)
env.RegisterActivity(testActivityHello)

env.ExecuteWorkflow(workflowFn)
_, err := env.QueryWorkflow("test-query")
s.NoError(err)

s.True(env.IsWorkflowCompleted())
s.Nil(env.GetWorkflowError())
}

func (s *WorkflowTestSuiteUnitTest) Test_ChildWorkflow_Basic() {
workflowFn := func(ctx Context) (string, error) {
ctx = WithActivityOptions(ctx, s.activityOptions)
Expand Down

0 comments on commit c5bb86d

Please sign in to comment.