Skip to content

Commit

Permalink
Fix cancellation of a command before it is sent to server causing pan…
Browse files Browse the repository at this point in the history
…ic (#302)

Fixes the bug by introducing a couple new state machines which handle cancelling timers and activities. This eliminates the need to re-order commands in an ad-hoc way.
  • Loading branch information
Sushisource authored Dec 11, 2020
1 parent 574fd84 commit 4e4813b
Show file tree
Hide file tree
Showing 9 changed files with 847 additions and 71 deletions.
136 changes: 103 additions & 33 deletions internal/internal_decision_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package internal
import (
"container/list"
"fmt"
"strconv"

commandpb "go.temporal.io/api/command/v1"
commonpb "go.temporal.io/api/common/v1"
Expand Down Expand Up @@ -82,10 +83,19 @@ type (
attributes *commandpb.ScheduleActivityTaskCommandAttributes
}

cancelActivityStateMachine struct {
*commandStateMachineBase
attributes *commandpb.RequestCancelActivityTaskCommandAttributes
}

timerCommandStateMachine struct {
*commandStateMachineBase
attributes *commandpb.StartTimerCommandAttributes
canceled bool
}

cancelTimerCommandStateMachine struct {
*commandStateMachineBase
attributes *commandpb.CancelTimerCommandAttributes
}

childWorkflowCommandStateMachine struct {
Expand Down Expand Up @@ -144,16 +154,19 @@ const (
commandStateCancellationCommandSent commandState = 7
commandStateCompletedAfterCancellationCommandSent commandState = 8
commandStateCompleted commandState = 9
commandStateCanceledBeforeSent commandState = 10
)

const (
commandTypeActivity commandType = 0
commandTypeChildWorkflow commandType = 1
commandTypeCancellation commandType = 2
commandTypeMarker commandType = 3
commandTypeTimer commandType = 4
commandTypeSignal commandType = 5
commandTypeUpsertSearchAttributes commandType = 6
commandTypeActivity commandType = 0
commandTypeChildWorkflow commandType = 1
commandTypeCancellation commandType = 2
commandTypeMarker commandType = 3
commandTypeTimer commandType = 4
commandTypeSignal commandType = 5
commandTypeUpsertSearchAttributes commandType = 6
commandTypeCancelTimer commandType = 7
commandTypeRequestCancelActivityTask commandType = 8
)

const (
Expand Down Expand Up @@ -204,6 +217,8 @@ func (d commandState) String() string {
return "CompletedAfterCancellationCommandSent"
case commandStateCompleted:
return "Completed"
case commandStateCanceledBeforeSent:
return "CanceledBeforeSent"
default:
return "Unknown"
}
Expand All @@ -223,6 +238,10 @@ func (d commandType) String() string {
return "Timer"
case commandTypeSignal:
return "Signal"
case commandTypeCancelTimer:
return "CancelTimer"
case commandTypeRequestCancelActivityTask:
return "RequestCancelActivityTask"
default:
return "Unknown"
}
Expand Down Expand Up @@ -257,6 +276,14 @@ func (h *commandsHelper) newActivityCommandStateMachine(
}
}

func (h *commandsHelper) newCancelActivityStateMachine(attributes *commandpb.RequestCancelActivityTaskCommandAttributes) *cancelActivityStateMachine {
base := h.newCommandStateMachineBase(commandTypeRequestCancelActivityTask, strconv.FormatInt(attributes.GetScheduledEventId(), 10))
return &cancelActivityStateMachine{
commandStateMachineBase: base,
attributes: attributes,
}
}

func (h *commandsHelper) newTimerCommandStateMachine(attributes *commandpb.StartTimerCommandAttributes) *timerCommandStateMachine {
base := h.newCommandStateMachineBase(commandTypeTimer, attributes.GetTimerId())
return &timerCommandStateMachine{
Expand All @@ -265,6 +292,14 @@ func (h *commandsHelper) newTimerCommandStateMachine(attributes *commandpb.Start
}
}

func (h *commandsHelper) newCancelTimerCommandStateMachine(attributes *commandpb.CancelTimerCommandAttributes) *cancelTimerCommandStateMachine {
base := h.newCommandStateMachineBase(commandTypeCancelTimer, attributes.GetTimerId())
return &cancelTimerCommandStateMachine{
commandStateMachineBase: base,
attributes: attributes,
}
}

func (h *commandsHelper) newChildWorkflowCommandStateMachine(attributes *commandpb.StartChildWorkflowExecutionCommandAttributes) *childWorkflowCommandStateMachine {
base := h.newCommandStateMachineBase(commandTypeChildWorkflow, attributes.GetWorkflowId())
return &childWorkflowCommandStateMachine{
Expand Down Expand Up @@ -371,14 +406,11 @@ func (d *commandStateMachineBase) cancel() {
case commandStateCompleted, commandStateCompletedAfterCancellationCommandSent:
// No op. This is legit. People could cancel context after timer/activity is done.
case commandStateCreated:
d.moveState(commandStateCompleted, eventCancel)
d.moveState(commandStateCanceledBeforeSent, eventCancel)
case commandStateCommandSent:
d.moveState(commandStateCanceledBeforeInitiated, eventCancel)
d.moveState(commandStateCancellationCommandSent, eventCancel)
case commandStateInitiated:
d.moveState(commandStateCanceledAfterInitiated, eventCancel)
// cancel doesn't add new command, therefore addCommand is not called.
// But *CancelRequested event is still being added to the history, therefore counter needs to be incremented.
d.helper.incrementNextCommandEventID()
default:
d.failStateTransition(eventCancel)
}
Expand All @@ -388,7 +420,7 @@ func (d *commandStateMachineBase) handleInitiatedEvent() {
switch d.state {
case commandStateCommandSent:
d.moveState(commandStateInitiated, eventInitiated)
case commandStateCanceledBeforeInitiated:
case commandStateCanceledBeforeInitiated, commandStateCanceledBeforeSent, commandStateCancellationCommandSent:
d.moveState(commandStateCanceledAfterInitiated, eventInitiated)
default:
d.failStateTransition(eventInitiated)
Expand All @@ -397,7 +429,7 @@ func (d *commandStateMachineBase) handleInitiatedEvent() {

func (d *commandStateMachineBase) handleInitiationFailedEvent() {
switch d.state {
case commandStateInitiated, commandStateCommandSent, commandStateCanceledBeforeInitiated:
case commandStateInitiated, commandStateCommandSent, commandStateCanceledBeforeInitiated, commandStateCancellationCommandSent:
d.moveState(commandStateCompleted, eventInitiationFailed)
default:
d.failStateTransition(eventInitiationFailed)
Expand Down Expand Up @@ -440,7 +472,7 @@ func (d *commandStateMachineBase) handleCancelFailedEvent() {

func (d *commandStateMachineBase) handleCanceledEvent() {
switch d.state {
case commandStateCancellationCommandSent:
case commandStateCancellationCommandSent, commandStateCanceledAfterInitiated:
d.moveState(commandStateCompleted, eventCanceled)
default:
d.failStateTransition(eventCanceled)
Expand All @@ -454,16 +486,10 @@ func (d *commandStateMachineBase) String() string {

func (d *activityCommandStateMachine) getCommand() *commandpb.Command {
switch d.state {
case commandStateCreated:
case commandStateCreated, commandStateCanceledBeforeSent:
command := createNewCommand(enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK)
command.Attributes = &commandpb.Command_ScheduleActivityTaskCommandAttributes{ScheduleActivityTaskCommandAttributes: d.attributes}
return command
case commandStateCanceledAfterInitiated:
command := createNewCommand(enumspb.COMMAND_TYPE_REQUEST_CANCEL_ACTIVITY_TASK)
command.Attributes = &commandpb.Command_RequestCancelActivityTaskCommandAttributes{RequestCancelActivityTaskCommandAttributes: &commandpb.RequestCancelActivityTaskCommandAttributes{
ScheduledEventId: d.scheduleID,
}}
return command
default:
return nil
}
Expand All @@ -485,13 +511,34 @@ func (d *activityCommandStateMachine) handleCancelFailedEvent() {
d.failStateTransition(eventCancelFailed)
}

func (d *activityCommandStateMachine) cancel() {
switch d.state {
case commandStateCreated, commandStateCommandSent, commandStateInitiated:
attribs := &commandpb.RequestCancelActivityTaskCommandAttributes{
ScheduledEventId: d.scheduleID,
}
cancelCmd := d.helper.newCancelActivityStateMachine(attribs)
d.helper.addCommand(cancelCmd)
}

d.commandStateMachineBase.cancel()
}

func (d *timerCommandStateMachine) cancel() {
d.canceled = true
switch d.state {
case commandStateCreated, commandStateCommandSent, commandStateInitiated:
attribs := &commandpb.CancelTimerCommandAttributes{
TimerId: d.attributes.TimerId,
}
cancelCmd := d.helper.newCancelTimerCommandStateMachine(attribs)
d.helper.addCommand(cancelCmd)
}

d.commandStateMachineBase.cancel()
}

func (d *timerCommandStateMachine) isDone() bool {
return d.state == commandStateCompleted || d.canceled
return d.state == commandStateCompleted
}

func (d *timerCommandStateMachine) handleCommandSent() {
Expand All @@ -503,17 +550,33 @@ func (d *timerCommandStateMachine) handleCommandSent() {
}
}

func (d *timerCommandStateMachine) getCommand() *commandpb.Command {
func (d *cancelActivityStateMachine) getCommand() *commandpb.Command {
switch d.state {
case commandStateCreated:
command := createNewCommand(enumspb.COMMAND_TYPE_REQUEST_CANCEL_ACTIVITY_TASK)
command.Attributes = &commandpb.Command_RequestCancelActivityTaskCommandAttributes{RequestCancelActivityTaskCommandAttributes: d.attributes}
return command
default:
return nil
}
}

func (d *timerCommandStateMachine) getCommand() *commandpb.Command {
switch d.state {
case commandStateCreated, commandStateCanceledBeforeSent:
command := createNewCommand(enumspb.COMMAND_TYPE_START_TIMER)
command.Attributes = &commandpb.Command_StartTimerCommandAttributes{StartTimerCommandAttributes: d.attributes}
return command
case commandStateCanceledAfterInitiated:
default:
return nil
}
}

func (d *cancelTimerCommandStateMachine) getCommand() *commandpb.Command {
switch d.state {
case commandStateCreated:
command := createNewCommand(enumspb.COMMAND_TYPE_CANCEL_TIMER)
command.Attributes = &commandpb.Command_CancelTimerCommandAttributes{CancelTimerCommandAttributes: &commandpb.CancelTimerCommandAttributes{
TimerId: d.attributes.TimerId,
}}
command.Attributes = &commandpb.Command_CancelTimerCommandAttributes{CancelTimerCommandAttributes: d.attributes}
return command
default:
return nil
Expand Down Expand Up @@ -559,6 +622,15 @@ func (d *childWorkflowCommandStateMachine) handleStartedEvent() {
}
}

func (d *childWorkflowCommandStateMachine) handleInitiatedEvent() {
switch d.state {
case commandStateCancellationCommandSent:
d.failStateTransition(eventInitiated)
default:
d.commandStateMachineBase.handleInitiatedEvent()
}
}

func (d *childWorkflowCommandStateMachine) handleCancelFailedEvent() {
switch d.state {
case commandStateCancellationCommandSent:
Expand Down Expand Up @@ -741,9 +813,6 @@ func (h *commandsHelper) getCommand(id commandID) commandStateMachine {
" or incompatible change in the workflow definition", id)
panicIllegalState(panicMsg)
}
// Move the last update command state machine to the back of the list.
// Otherwise commands (like timer cancellations) can end up out of order.
h.orderedCommands.MoveToBack(command)
return command.Value.(commandStateMachine)
}

Expand Down Expand Up @@ -1093,6 +1162,7 @@ func (h *commandsHelper) startTimer(attributes *commandpb.StartTimerCommandAttri
func (h *commandsHelper) cancelTimer(timerID TimerID) commandStateMachine {
command := h.getCommand(makeCommandID(commandTypeTimer, timerID.id))
command.cancel()

return command
}

Expand Down
Loading

0 comments on commit 4e4813b

Please sign in to comment.