Skip to content

Commit

Permalink
Make behavior be what we actually wanted, with current task build id
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Jan 4, 2024
1 parent 4d73ea3 commit 0626a9f
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 43 deletions.
2 changes: 1 addition & 1 deletion internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1130,7 +1130,7 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
case enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED:
// No Operation
case enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED:
weh.workflowInfo.lastCompletedBuildID = event.GetWorkflowTaskCompletedEventAttributes().GetWorkerVersion().GetBuildId()
// No Operation
case enumspb.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED:
weh.commandsHelper.handleActivityTaskScheduled(
event.GetActivityTaskScheduledEventAttributes().GetActivityId(), event.GetEventId())
Expand Down
20 changes: 6 additions & 14 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,6 @@ func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflo
var replayOutbox []outboxEntry
var replayCommands []*commandpb.Command
var respondEvents []*historypb.HistoryEvent
finalBuildID := ""

taskMessages := workflowTask.task.GetMessages()
skipReplayCheck := w.skipReplayCheck()
Expand All @@ -1005,6 +1004,7 @@ func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflo
eventHandler.ResetLAWFTAttemptCounts()
eventHandler.sdkFlags.markSDKFlagsSent()

isQueryOnlyTask := workflowTask.task.StartedEventId == 0
ProcessEvents:
for {
nextTask, err := reorderedHistory.nextTask()
Expand All @@ -1016,6 +1016,7 @@ ProcessEvents:
historyMessages := nextTask.msgs
flags := nextTask.flags
binaryChecksum := nextTask.binaryChecksum
currentBuildID := nextTask.buildID
// Check if we are replaying so we know if we should use the messages in the WFT or the history
isReplay := len(reorderedEvents) > 0 && reorderedHistory.IsReplayEvent(reorderedEvents[len(reorderedEvents)-1])
var msgs *eventMsgIndex
Expand Down Expand Up @@ -1046,8 +1047,10 @@ ProcessEvents:
} else {
w.workflowInfo.BinaryChecksum = binaryChecksum
}
if nextTask.hasTerminalEvent && nextTask.buildID != "" {
finalBuildID = nextTask.buildID
if isReplay {
w.workflowInfo.currentTaskBuildID = currentBuildID
} else if !isReplay && !isQueryOnlyTask {
w.workflowInfo.currentTaskBuildID = w.wth.workerBuildID
}
// Reset the mutable side effect markers recorded
eventHandler.mutableSideEffectsRecorded = nil
Expand Down Expand Up @@ -1166,11 +1169,6 @@ ProcessEvents:
}
}

// If we saw that this is the last task of the execution, we need to use the build id from that
// before query handling.
if finalBuildID != "" {
w.workflowInfo.lastCompletedBuildID = finalBuildID
}
return w.applyWorkflowPanicPolicy(workflowTask, workflowError)
}

Expand Down Expand Up @@ -1337,12 +1335,6 @@ func (w *workflowExecutionContextImpl) CompleteWorkflowTask(workflowTask *workfl

completeRequest := w.wth.completeWorkflow(eventHandler, w.currentWorkflowTask, w, w.newCommands, w.newMessages, !waitLocalActivities)
w.clearCurrentTask()
// We need to set the last completed BuildID to our own ID when completing a task, because we
// might service a query before we ever process any events again, and we won't have had a chance
// to update the last completed BuildID - even though it has changed to be our build id.
if w.wth.workerBuildID != "" {
w.workflowInfo.lastCompletedBuildID = w.wth.workerBuildID
}

return completeRequest
}
Expand Down
20 changes: 9 additions & 11 deletions internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,11 +1005,10 @@ type WorkflowInfo struct {
// build-id based versioning, is the explicitly set worker build id. If this is the first worker to operate on the
// workflow, it is this worker's current value.
BinaryChecksum string
// lastCompletedBuildID, if nonempty, contains the BuildID of the worker which last completed a
// workflow task for this workflow. This value may change over the lifetime of the workflow run,
// but is deterministic and safe to use for branching. The value is empty if the workflow has
// never seen a task completed by a worker with a set BuildId.
lastCompletedBuildID string
// currentTaskBuildID, if nonempty, contains the Build ID of the worker that processed the task
// which is currently or about to be executing. If no longer replaying will be set to the ID of
// this worker
currentTaskBuildID string

continueAsNewSuggested bool
currentHistorySize int
Expand All @@ -1030,12 +1029,11 @@ func (wInfo *WorkflowInfo) GetBinaryChecksum() string {
return wInfo.BinaryChecksum
}

// GetLastCompletedBuildID returns the BuildID of the worker which last completed a workflow task
// for this workflow. This value may change over the lifetime of the workflow run, but is
// deterministic and safe to use for branching. The returned value is empty if the workflow has
// never seen a task completed by a worker with a set BuildId.
func (wInfo *WorkflowInfo) GetLastCompletedBuildID() string {
return wInfo.lastCompletedBuildID
// GetCurrentBuildID returns the Build ID of the worker that processed this task, which may be
// empty. During replay this id may not equal the id of the replaying worker. If not replaying and
// this worker has a defined Build ID, it will equal that ID. It is safe to use for branching.
func (wInfo *WorkflowInfo) GetCurrentBuildID() string {
return wInfo.currentTaskBuildID
}

// GetCurrentHistoryLength returns the current length of history when called.
Expand Down
19 changes: 7 additions & 12 deletions test/worker_versioning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,16 +293,12 @@ func (ts *WorkerVersioningTestSuite) TestBuildIDChangesOverWorkflowLifetime() {

// Start workflow
wfHandle, err := ts.client.ExecuteWorkflow(ctx, ts.startWorkflowOptions("evolving-wf"), ts.workflows.BuildIDWorkflow)
// Query to see that the last build ID becomes 1.0 (we do eventually, because we might
// get the update in the very first task, in which case it'll be empty and that's OK -- see
// workflow for verifying it is always empty in the first task)
ts.Eventually(func() bool {
var lastBuildID string
res, err := ts.client.QueryWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "get-last-build-id", nil)
ts.NoError(err)
ts.NoError(res.Get(&lastBuildID))
return lastBuildID == "1.0"
}, 5*time.Second, 100*time.Millisecond)
// Query to see that the build ID is 1.0
var lastBuildID string
res, err := ts.client.QueryWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "get-last-build-id", nil)
ts.NoError(err)
ts.NoError(res.Get(&lastBuildID))
ts.Equal("1.0", lastBuildID)

// Add new compat ver
err = ts.client.UpdateWorkerBuildIdCompatibility(ctx, &client.UpdateWorkerBuildIdCompatibilityOptions{
Expand All @@ -328,8 +324,7 @@ func (ts *WorkerVersioningTestSuite) TestBuildIDChangesOverWorkflowLifetime() {
})
ts.NoError(err)

// The last task, with the new worker, should definitely, immediately, be 1.0
var lastBuildID string
// The current task, with the new worker, should still be 1.0 since no new tasks have happened
enval, err := ts.client.QueryWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "get-last-build-id", nil)
ts.NoError(err)
ts.NoError(enval.Get(&lastBuildID))
Expand Down
6 changes: 1 addition & 5 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1883,12 +1883,8 @@ func (w *Workflows) WaitSignalToStart(ctx workflow.Context) (string, error) {
}

func (w *Workflows) BuildIDWorkflow(ctx workflow.Context) error {
firstBuildID := workflow.GetInfo(ctx).GetLastCompletedBuildID()

_ = workflow.SetQueryHandler(ctx, "get-last-build-id", func() (string, error) {
// Since the first build ID should always be empty, prepend it here to mess up any
// assertions if it wasn't empty
return firstBuildID + workflow.GetInfo(ctx).GetLastCompletedBuildID(), nil
return workflow.GetInfo(ctx).GetCurrentBuildID(), nil
})

workflow.GetSignalChannel(ctx, "finish").Receive(ctx, nil)
Expand Down

0 comments on commit 0626a9f

Please sign in to comment.