Skip to content

Commit

Permalink
Fix history replayer
Browse files Browse the repository at this point in the history
 * Remove protobuf binary comparisons because they are not deterministic

 * Check histroy during a replay even if the workflow is complete

 * Remove dead code around strictMode
  • Loading branch information
Quinn-With-Two-Ns committed Dec 30, 2022
1 parent 8e0a972 commit 64701b9
Show file tree
Hide file tree
Showing 25 changed files with 3,959 additions and 454 deletions.
16 changes: 10 additions & 6 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ const (
)

// Assert that structs do indeed implement the interfaces
var _ WorkflowEnvironment = (*workflowEnvironmentImpl)(nil)
var _ workflowExecutionEventHandler = (*workflowExecutionEventHandlerImpl)(nil)
var (
_ WorkflowEnvironment = (*workflowEnvironmentImpl)(nil)
_ workflowExecutionEventHandler = (*workflowExecutionEventHandlerImpl)(nil)
)

type (
// completionHandler Handler to indicate completion result
Expand Down Expand Up @@ -331,7 +333,6 @@ func (wc *workflowEnvironmentImpl) SignalExternalWorkflow(
childWorkflowOnly bool,
callback ResultHandler,
) {

signalID := wc.GenerateSequenceID()
command := wc.commandsHelper.signalExternalWorkflowExecution(namespace, workflowID, runID, signalName, input,
header, signalID, childWorkflowOnly)
Expand Down Expand Up @@ -444,7 +445,8 @@ func (wc *workflowEnvironmentImpl) RegisterCancelHandler(handler func()) {
}

func (wc *workflowEnvironmentImpl) ExecuteChildWorkflow(
params ExecuteWorkflowParams, callback ResultHandler, startedHandler func(r WorkflowExecution, e error)) {
params ExecuteWorkflowParams, callback ResultHandler, startedHandler func(r WorkflowExecution, e error),
) {
if params.WorkflowID == "" {
params.WorkflowID = wc.workflowInfo.WorkflowExecution.RunID + "_" + wc.GenerateSequenceID()
}
Expand Down Expand Up @@ -1103,7 +1105,8 @@ func (weh *workflowExecutionEventHandlerImpl) Close() {
}

func (weh *workflowExecutionEventHandlerImpl) handleWorkflowExecutionStarted(
attributes *historypb.WorkflowExecutionStartedEventAttributes) (err error) {
attributes *historypb.WorkflowExecutionStartedEventAttributes,
) (err error) {
weh.workflowDefinition, err = weh.registry.getWorkflowDefinition(
weh.workflowInfo.WorkflowType,
)
Expand Down Expand Up @@ -1381,7 +1384,8 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessLocalActivityResult(lar *lo
}

func (weh *workflowExecutionEventHandlerImpl) handleWorkflowExecutionSignaled(
attributes *historypb.WorkflowExecutionSignaledEventAttributes) error {
attributes *historypb.WorkflowExecutionSignaledEventAttributes,
) error {
return weh.signalHandler(attributes.GetSignalName(), attributes.Input, attributes.Header)
}

Expand Down
85 changes: 26 additions & 59 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"sync"
"time"

"github.com/gogo/protobuf/proto"
"github.com/gogo/status"
commandpb "go.temporal.io/api/command/v1"
commonpb "go.temporal.io/api/common/v1"
Expand Down Expand Up @@ -845,6 +844,13 @@ func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflo
invocations := indexInvocations(workflowTask)

skipReplayCheck := w.skipReplayCheck()
shouldForceReplayCheck := func() bool {
isInReplayer := IsReplayNamespace(w.wth.namespace)
// If we are in the replayer we should always check the history replay, even if the workflow is completed
// Skip if the workflow paniced to avoid potentially breaking old histories
_, wfPanicked := w.err.(*workflowPanicError)
return !wfPanicked && isInReplayer
}

metricsHandler := w.wth.metricsHandler.WithTags(metrics.WorkflowTags(task.WorkflowType.GetName()))
start := time.Now()
Expand Down Expand Up @@ -877,7 +883,7 @@ ProcessEvents:
if err != nil {
return nil, err
}
if w.isWorkflowCompleted {
if w.isWorkflowCompleted && !shouldForceReplayCheck() {
break ProcessEvents
}
}
Expand Down Expand Up @@ -918,7 +924,7 @@ ProcessEvents:
}
}

if w.isWorkflowCompleted {
if w.isWorkflowCompleted && !shouldForceReplayCheck() {
break ProcessEvents
}
}
Expand All @@ -930,7 +936,7 @@ ProcessEvents:
if err != nil {
return nil, err
}
if w.isWorkflowCompleted {
if w.isWorkflowCompleted && !shouldForceReplayCheck() {
break ProcessEvents
}
}
Expand All @@ -957,7 +963,7 @@ ProcessEvents:
// the replay of that event will panic on the command state machine and the workflow will be marked as completed
// with the panic error.
var workflowError error
if !skipReplayCheck && !w.isWorkflowCompleted {
if !skipReplayCheck && (!w.isWorkflowCompleted || shouldForceReplayCheck()) {
// check if commands from reply matches to the history events
if err := matchReplayWithHistory(replayCommands, respondEvents); err != nil {
workflowError = err
Expand Down Expand Up @@ -1197,8 +1203,16 @@ func skipDeterministicCheckForEvent(e *historypb.HistoryEvent) bool {
if markerName == versionMarkerName || markerName == mutableSideEffectMarkerName {
return true
}
// case enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED:
// return true
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED:
return true
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW:
return true
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED:
return true
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED:
return true
case enumspb.EVENT_TYPE_ACTIVITY_TASK_TIMED_OUT:
return true
}
return false
}
Expand Down Expand Up @@ -1254,7 +1268,7 @@ matchLoop:
return historyMismatchErrorf("nondeterministic workflow: extra replay command for %s", util.CommandToString(d))
}

if !isCommandMatchEvent(d, e, false) {
if !isCommandMatchEvent(d, e) {
return historyMismatchErrorf("nondeterministic workflow: history event is %s, replay command is %s",
util.HistoryEventToString(e), util.CommandToString(d))
}
Expand All @@ -1273,7 +1287,7 @@ func lastPartOfName(name string) string {
return name[lastDotIdx+1:]
}

func isCommandMatchEvent(d *commandpb.Command, e *historypb.HistoryEvent, strictMode bool) bool {
func isCommandMatchEvent(d *commandpb.Command, e *historypb.HistoryEvent) bool {
switch d.GetCommandType() {
case enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK:
if e.GetEventType() != enumspb.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED {
Expand All @@ -1283,9 +1297,7 @@ func isCommandMatchEvent(d *commandpb.Command, e *historypb.HistoryEvent, strict
commandAttributes := d.GetScheduleActivityTaskCommandAttributes()

if eventAttributes.GetActivityId() != commandAttributes.GetActivityId() ||
lastPartOfName(eventAttributes.ActivityType.GetName()) != lastPartOfName(commandAttributes.ActivityType.GetName()) ||
(strictMode && eventAttributes.TaskQueue.GetName() != commandAttributes.TaskQueue.GetName()) ||
(strictMode && !proto.Equal(eventAttributes.GetInput(), commandAttributes.GetInput())) {
lastPartOfName(eventAttributes.ActivityType.GetName()) != lastPartOfName(commandAttributes.ActivityType.GetName()) {
return false
}

Expand All @@ -1310,8 +1322,7 @@ func isCommandMatchEvent(d *commandpb.Command, e *historypb.HistoryEvent, strict
eventAttributes := e.GetTimerStartedEventAttributes()
commandAttributes := d.GetStartTimerCommandAttributes()

if eventAttributes.GetTimerId() != commandAttributes.GetTimerId() ||
(strictMode && common.DurationValue(eventAttributes.GetStartToFireTimeout()) != common.DurationValue(commandAttributes.GetStartToFireTimeout())) {
if eventAttributes.GetTimerId() != commandAttributes.GetTimerId() {
return false
}

Expand All @@ -1335,29 +1346,13 @@ func isCommandMatchEvent(d *commandpb.Command, e *historypb.HistoryEvent, strict
if e.GetEventType() != enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED {
return false
}
if strictMode {
eventAttributes := e.GetWorkflowExecutionCompletedEventAttributes()
commandAttributes := d.GetCompleteWorkflowExecutionCommandAttributes()

if !proto.Equal(eventAttributes.GetResult(), commandAttributes.GetResult()) {
return false
}
}

return true

case enumspb.COMMAND_TYPE_FAIL_WORKFLOW_EXECUTION:
if e.GetEventType() != enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED {
return false
}
if strictMode {
eventAttributes := e.GetWorkflowExecutionFailedEventAttributes()
commandAttributes := d.GetFailWorkflowExecutionCommandAttributes()

if !proto.Equal(eventAttributes.GetFailure(), commandAttributes.GetFailure()) {
return false
}
}

return true

Expand Down Expand Up @@ -1404,13 +1399,6 @@ func isCommandMatchEvent(d *commandpb.Command, e *historypb.HistoryEvent, strict
if e.GetEventType() != enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED {
return false
}
if strictMode {
eventAttributes := e.GetWorkflowExecutionCanceledEventAttributes()
commandAttributes := d.GetCancelWorkflowExecutionCommandAttributes()
if !proto.Equal(eventAttributes.GetDetails(), commandAttributes.GetDetails()) {
return false
}
}
return true

case enumspb.COMMAND_TYPE_CONTINUE_AS_NEW_WORKFLOW_EXECUTION:
Expand All @@ -1426,9 +1414,7 @@ func isCommandMatchEvent(d *commandpb.Command, e *historypb.HistoryEvent, strict
}
eventAttributes := e.GetStartChildWorkflowExecutionInitiatedEventAttributes()
commandAttributes := d.GetStartChildWorkflowExecutionCommandAttributes()
if lastPartOfName(eventAttributes.WorkflowType.GetName()) != lastPartOfName(commandAttributes.WorkflowType.GetName()) ||
(strictMode && checkNamespacesInCommandAndEvent(eventAttributes.GetNamespace(), commandAttributes.GetNamespace())) ||
(strictMode && eventAttributes.TaskQueue.GetName() != commandAttributes.TaskQueue.GetName()) {
if lastPartOfName(eventAttributes.WorkflowType.GetName()) != lastPartOfName(commandAttributes.WorkflowType.GetName()) {
return false
}

Expand All @@ -1438,11 +1424,6 @@ func isCommandMatchEvent(d *commandpb.Command, e *historypb.HistoryEvent, strict
if e.GetEventType() != enumspb.EVENT_TYPE_UPSERT_WORKFLOW_SEARCH_ATTRIBUTES {
return false
}
eventAttributes := e.GetUpsertWorkflowSearchAttributesEventAttributes()
commandAttributes := d.GetUpsertWorkflowSearchAttributesCommandAttributes()
if strictMode && !isSearchAttributesMatched(eventAttributes.SearchAttributes, commandAttributes.SearchAttributes) {
return false
}
return true

case enumspb.COMMAND_TYPE_ACCEPT_WORKFLOW_UPDATE:
Expand All @@ -1456,26 +1437,12 @@ func isCommandMatchEvent(d *commandpb.Command, e *historypb.HistoryEvent, strict
return false
}

if strictMode {
eventAttributes := e.GetWorkflowUpdateCompletedEventAttributes()
commandAttributes := d.GetCompleteWorkflowUpdateCommandAttributes()

if !proto.Equal(eventAttributes.GetOutput().GetSuccess(), commandAttributes.GetOutput().GetSuccess()) ||
!proto.Equal(eventAttributes.GetOutput().GetFailure(), commandAttributes.GetOutput().GetFailure()) {
return false
}
}
return true

case enumspb.COMMAND_TYPE_MODIFY_WORKFLOW_PROPERTIES:
if e.GetEventType() != enumspb.EVENT_TYPE_WORKFLOW_PROPERTIES_MODIFIED {
return false
}
eventAttributes := e.GetWorkflowPropertiesModifiedEventAttributes()
commandAttributes := d.GetModifyWorkflowPropertiesCommandAttributes()
if strictMode && !isMemoMatched(eventAttributes.UpsertedMemo, commandAttributes.UpsertedMemo) {
return false
}
return true
}

Expand Down
Loading

0 comments on commit 64701b9

Please sign in to comment.