From 2885d7ab94cbf4a404662015e45e1167d0265b28 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 3 Jan 2024 17:08:30 -0800 Subject: [PATCH 1/5] Add last BuildID to workflow info --- internal/internal_event_handlers.go | 3 +- internal/internal_task_handlers.go | 11 ++++- internal/workflow.go | 16 +++++- test/worker_versioning_test.go | 75 ++++++++++++++++++++++++++++- test/workflow_test.go | 14 ++++++ 5 files changed, 113 insertions(+), 6 deletions(-) diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index d201601f3..7acdaf4a0 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -1130,7 +1130,8 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent( case enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED: // No Operation case enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED: - // No Operation + weh.workflowInfo.lastCompletedBuildID = event.GetWorkflowTaskCompletedEventAttributes().GetWorkerVersion().GetBuildId() + println("Set to ", weh.workflowInfo.lastCompletedBuildID) case enumspb.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED: weh.commandsHelper.handleActivityTaskScheduled( event.GetActivityTaskScheduledEventAttributes().GetActivityId(), event.GetEventId()) diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index f146e5296..723d6be65 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -179,6 +179,7 @@ type ( next []*historypb.HistoryEvent nextFlags []sdkFlag binaryChecksum string + lastBuildID string sdkVersion string sdkName string } @@ -269,8 +270,9 @@ func (eh *history) isNextWorkflowTaskFailed() (task finishedTask, err error) { var binaryChecksum string var flags []sdkFlag if nextEventType == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED { - binaryChecksum = nextEvent.GetWorkflowTaskCompletedEventAttributes().BinaryChecksum - for _, flag := range nextEvent.GetWorkflowTaskCompletedEventAttributes().GetSdkMetadata().GetLangUsedFlags() { + completedAttrs := nextEvent.GetWorkflowTaskCompletedEventAttributes() + binaryChecksum = completedAttrs.BinaryChecksum + for _, flag := range completedAttrs.GetSdkMetadata().GetLangUsedFlags() { f := sdkFlagFromUint(flag) if !f.isValid() { // If a flag is not recognized (value is too high or not defined), it must fail the workflow task @@ -1299,6 +1301,11 @@ func (w *workflowExecutionContextImpl) CompleteWorkflowTask(workflowTask *workfl completeRequest := w.wth.completeWorkflow(eventHandler, w.currentWorkflowTask, w, w.newCommands, w.newMessages, !waitLocalActivities) w.clearCurrentTask() + if w.wth.workerBuildID != "" { + println("EXISTING", w.workflowInfo.lastCompletedBuildID) + println("SETTING ON COMPLETE TO ", w.wth.workerBuildID) + w.workflowInfo.lastCompletedBuildID = w.wth.workerBuildID + } return completeRequest } diff --git a/internal/workflow.go b/internal/workflow.go index 6f86bd711..e9aaa9d98 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -1005,6 +1005,11 @@ type WorkflowInfo struct { // build-id based versioning, is the explicitly set worker build id. If this is the first worker to operate on the // workflow, it is this worker's current value. BinaryChecksum string + // lastCompletedBuildID, if nonempty, contains the BuildID of the worker which last completed a + // workflow task for this workflow. This value may change over the lifetime of the workflow run, + // but is deterministic and safe to use for branching. The value is empty if the workflow has + // never seen a task completed by a worker with a set BuildId. + lastCompletedBuildID string continueAsNewSuggested bool currentHistorySize int @@ -1016,7 +1021,8 @@ type UpdateInfo struct { ID string } -// GetBinaryChecksum return binary checksum. +// GetBinaryChecksum returns the binary checksum of the last worker to complete a task for this +// workflow, or if this is the first task, this worker's checksum. func (wInfo *WorkflowInfo) GetBinaryChecksum() string { if wInfo.BinaryChecksum == "" { return getBinaryChecksum() @@ -1024,6 +1030,14 @@ func (wInfo *WorkflowInfo) GetBinaryChecksum() string { return wInfo.BinaryChecksum } +// GetLastCompletedBuildID returns the BuildID of the worker which last completed a workflow task +// for this workflow. This value may change over the lifetime of the workflow run, but is +// deterministic and safe to use for branching. The returned value is empty if the workflow has +// never seen a task completed by a worker with a set BuildId. +func (wInfo *WorkflowInfo) GetLastCompletedBuildID() string { + return wInfo.lastCompletedBuildID +} + // GetCurrentHistoryLength returns the current length of history when called. // This value may change throughout the life of the workflow. func (wInfo *WorkflowInfo) GetCurrentHistoryLength() int { diff --git a/test/worker_versioning_test.go b/test/worker_versioning_test.go index d376d34ad..3083585b8 100644 --- a/test/worker_versioning_test.go +++ b/test/worker_versioning_test.go @@ -27,6 +27,9 @@ import ( "testing" "time" + "go.temporal.io/api/common/v1" + "go.temporal.io/api/workflowservice/v1" + "github.com/pborman/uuid" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -149,8 +152,6 @@ func (ts *WorkerVersioningTestSuite) TestTwoWorkersGetDifferentTasks() { ts.NoError(handle12.Get(ctx, nil)) ts.NoError(handle21.Get(ctx, nil)) ts.NoError(handle22.Get(ctx, nil)) - - // TODO: Actually assert they ran on the appropriate workers, once David's changes are ready } func (ts *WorkerVersioningTestSuite) TestReachabilityUnreachable() { @@ -273,3 +274,73 @@ func (ts *WorkerVersioningTestSuite) TestReachabilityVersions() { ts.Equal(1, len(taskQueueReachability.TaskQueueReachability)) ts.Equal([]client.TaskReachability{client.TaskReachabilityNewWorkflows}, taskQueueReachability.TaskQueueReachability) } + +func (ts *WorkerVersioningTestSuite) TestBuildIDChangesOverWorkflowLifetime() { + ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) + defer cancel() + + err := ts.client.UpdateWorkerBuildIdCompatibility(ctx, &client.UpdateWorkerBuildIdCompatibilityOptions{ + TaskQueue: ts.taskQueueName, + Operation: &client.BuildIDOpAddNewIDInNewDefaultSet{ + BuildID: "1.0", + }, + }) + ts.NoError(err) + + worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{BuildID: "1.0", UseBuildIDForVersioning: true}) + ts.workflows.register(worker1) + ts.NoError(worker1.Start()) + + // Start workflow + wfHandle, err := ts.client.ExecuteWorkflow(ctx, ts.startWorkflowOptions("evolving-wf"), ts.workflows.BuildIDWorkflow) + // Query to see that the last build ID becomes 1.0 (we do eventually, because we might + // get the update in the very first task, in which case it'll be empty and that's OK -- see + // workflow for verifying it is always empty in the first task) + ts.Eventually(func() bool { + var lastBuildID string + res, err := ts.client.QueryWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "get-last-build-id", nil) + ts.NoError(err) + ts.NoError(res.Get(&lastBuildID)) + return lastBuildID == "1.0" + }, 5*time.Second, 100*time.Millisecond) + + // Add new compat ver + err = ts.client.UpdateWorkerBuildIdCompatibility(ctx, &client.UpdateWorkerBuildIdCompatibilityOptions{ + TaskQueue: ts.taskQueueName, + Operation: &client.BuildIDOpAddNewCompatibleVersion{ + BuildID: "1.1", + ExistingCompatibleBuildID: "1.0", + }, + }) + ts.NoError(err) + + worker11 := worker.New(ts.client, ts.taskQueueName, worker.Options{BuildID: "1.1", UseBuildIDForVersioning: true}) + ts.workflows.register(worker11) + ts.NoError(worker11.Start()) + defer worker11.Stop() + + _, err = ts.client.WorkflowService().ResetStickyTaskQueue(ctx, &workflowservice.ResetStickyTaskQueueRequest{ + Namespace: ts.config.Namespace, + Execution: &common.WorkflowExecution{ + WorkflowId: wfHandle.GetID(), + }, + }) + ts.NoError(err) + + // The last task, with the new worker, should definitely, immediately, be 1.0 + var lastBuildID string + enval, err := ts.client.QueryWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "get-last-build-id", nil) + ts.NoError(err) + ts.NoError(enval.Get(&lastBuildID)) + ts.Equal("1.0", lastBuildID) + + // finish the workflow under 1.1 + ts.NoError(ts.client.SignalWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "finish", "")) + ts.NoError(wfHandle.Get(ctx, nil)) + + // Post completion it should have the value of the last task + enval, err = ts.client.QueryWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "get-last-build-id", nil) + ts.NoError(err) + ts.NoError(enval.Get(&lastBuildID)) + ts.Equal("1.1", lastBuildID) +} diff --git a/test/workflow_test.go b/test/workflow_test.go index db88e3302..bb5aa0c58 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -1882,6 +1882,19 @@ func (w *Workflows) WaitSignalToStart(ctx workflow.Context) (string, error) { return value, nil } +func (w *Workflows) BuildIDWorkflow(ctx workflow.Context) error { + firstBuildID := workflow.GetInfo(ctx).GetLastCompletedBuildID() + + _ = workflow.SetQueryHandler(ctx, "get-last-build-id", func() (string, error) { + // Since the first build ID should always be empty, prepend it here to mess up any + // assertions if it wasn't empty + return firstBuildID + workflow.GetInfo(ctx).GetLastCompletedBuildID(), nil + }) + + workflow.GetSignalChannel(ctx, "finish").Receive(ctx, nil) + return nil +} + func (w *Workflows) SignalsAndQueries(ctx workflow.Context, execChild, execActivity bool) error { // Add query handler err := workflow.SetQueryHandler(ctx, "workflow-query", func() (string, error) { return "query-response", nil }) @@ -2511,6 +2524,7 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.SleepForDuration) worker.RegisterWorkflow(w.InterceptorCalls) worker.RegisterWorkflow(w.WaitSignalToStart) + worker.RegisterWorkflow(w.BuildIDWorkflow) worker.RegisterWorkflow(w.SignalsAndQueries) worker.RegisterWorkflow(w.CheckOpenTelemetryBaggage) worker.RegisterWorkflow(w.AdvancedPostCancellation) From 4d73ea34b2222aa9d964093f7c1a4ded9eb34ab5 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 4 Jan 2024 10:29:53 -0800 Subject: [PATCH 2/5] Make sure queries see the right id --- internal/internal_event_handlers.go | 1 - internal/internal_task_handlers.go | 59 +++++++++++++++++++++++------ test/worker_versioning_test.go | 1 + 3 files changed, 49 insertions(+), 12 deletions(-) diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index 7acdaf4a0..02da59184 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -1131,7 +1131,6 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent( // No Operation case enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED: weh.workflowInfo.lastCompletedBuildID = event.GetWorkflowTaskCompletedEventAttributes().GetWorkerVersion().GetBuildId() - println("Set to ", weh.workflowInfo.lastCompletedBuildID) case enumspb.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED: weh.commandsHelper.handleActivityTaskScheduled( event.GetActivityTaskScheduledEventAttributes().GetActivityId(), event.GetEventId()) diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index 723d6be65..28f14aca9 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -179,7 +179,6 @@ type ( next []*historypb.HistoryEvent nextFlags []sdkFlag binaryChecksum string - lastBuildID string sdkVersion string sdkName string } @@ -200,6 +199,9 @@ type ( binaryChecksum string sdkVersion string sdkName string + buildID string + // true if the workflow execution finishes in this task + hasTerminalEvent bool } finishedTask struct { @@ -329,6 +331,20 @@ func isCommandEvent(eventType enumspb.EventType) bool { } } +func isTerminalWorkflowEvent(eventType enumspb.EventType) bool { + switch eventType { + case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED, + enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED, + enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED, + enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW, + enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED, + enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT: + return true + default: + return false + } +} + // nextTask returns the next task to be processed. func (eh *history) nextTask() (*preparedTask, error) { if eh.next == nil { @@ -350,6 +366,8 @@ func (eh *history) nextTask() (*preparedTask, error) { var markers []*historypb.HistoryEvent var msgs []*protocolpb.Message + var buildID string + var hasTerminalEvent bool if len(result) > 0 { nextTaskEvents, err := eh.prepareTask() if err != nil { @@ -361,15 +379,19 @@ func (eh *history) nextTask() (*preparedTask, error) { eh.sdkVersion = nextTaskEvents.sdkVersion markers = nextTaskEvents.markers msgs = nextTaskEvents.msgs + buildID = nextTaskEvents.buildID + hasTerminalEvent = nextTaskEvents.hasTerminalEvent } return &preparedTask{ - events: result, - markers: markers, - flags: sdkFlags, - msgs: msgs, - binaryChecksum: checksum, - sdkName: sdkName, - sdkVersion: sdkVersion, + events: result, + markers: markers, + flags: sdkFlags, + msgs: msgs, + binaryChecksum: checksum, + sdkName: sdkName, + sdkVersion: sdkVersion, + buildID: buildID, + hasTerminalEvent: hasTerminalEvent, }, nil } @@ -459,7 +481,12 @@ OrderEvents: enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED: // Skip default: - if isPreloadMarkerEvent(event) { + if isTerminalWorkflowEvent(event.GetEventType()) { + taskEvents.hasTerminalEvent = true + } else if event.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED { + taskEvents.buildID = event.GetWorkflowTaskCompletedEventAttributes(). + GetWorkerVersion().GetBuildId() + } else if isPreloadMarkerEvent(event) { taskEvents.markers = append(taskEvents.markers, event) } else if attrs := event.GetWorkflowExecutionUpdateAcceptedEventAttributes(); attrs != nil { taskEvents.msgs = append(taskEvents.msgs, inferMessage(attrs)) @@ -958,6 +985,7 @@ func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflo var replayOutbox []outboxEntry var replayCommands []*commandpb.Command var respondEvents []*historypb.HistoryEvent + finalBuildID := "" taskMessages := workflowTask.task.GetMessages() skipReplayCheck := w.skipReplayCheck() @@ -1018,6 +1046,9 @@ ProcessEvents: } else { w.workflowInfo.BinaryChecksum = binaryChecksum } + if nextTask.hasTerminalEvent && nextTask.buildID != "" { + finalBuildID = nextTask.buildID + } // Reset the mutable side effect markers recorded eventHandler.mutableSideEffectsRecorded = nil // Markers are from the events that are produced from the current workflow task. @@ -1135,6 +1166,11 @@ ProcessEvents: } } + // If we saw that this is the last task of the execution, we need to use the build id from that + // before query handling. + if finalBuildID != "" { + w.workflowInfo.lastCompletedBuildID = finalBuildID + } return w.applyWorkflowPanicPolicy(workflowTask, workflowError) } @@ -1301,9 +1337,10 @@ func (w *workflowExecutionContextImpl) CompleteWorkflowTask(workflowTask *workfl completeRequest := w.wth.completeWorkflow(eventHandler, w.currentWorkflowTask, w, w.newCommands, w.newMessages, !waitLocalActivities) w.clearCurrentTask() + // We need to set the last completed BuildID to our own ID when completing a task, because we + // might service a query before we ever process any events again, and we won't have had a chance + // to update the last completed BuildID - even though it has changed to be our build id. if w.wth.workerBuildID != "" { - println("EXISTING", w.workflowInfo.lastCompletedBuildID) - println("SETTING ON COMPLETE TO ", w.wth.workerBuildID) w.workflowInfo.lastCompletedBuildID = w.wth.workerBuildID } diff --git a/test/worker_versioning_test.go b/test/worker_versioning_test.go index 3083585b8..87837c5ec 100644 --- a/test/worker_versioning_test.go +++ b/test/worker_versioning_test.go @@ -314,6 +314,7 @@ func (ts *WorkerVersioningTestSuite) TestBuildIDChangesOverWorkflowLifetime() { }) ts.NoError(err) + worker1.Stop() worker11 := worker.New(ts.client, ts.taskQueueName, worker.Options{BuildID: "1.1", UseBuildIDForVersioning: true}) ts.workflows.register(worker11) ts.NoError(worker11.Start()) From fb4f81122d7b55b2d8453798951aeaea0f249c64 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 4 Jan 2024 14:09:31 -0800 Subject: [PATCH 3/5] Make behavior be what we actually wanted, with current task build id --- internal/internal_event_handlers.go | 2 +- internal/internal_task_handlers.go | 59 ++++++++--------------------- internal/workflow.go | 20 +++++----- test/worker_versioning_test.go | 19 ++++------ test/workflow_test.go | 6 +-- 5 files changed, 33 insertions(+), 73 deletions(-) diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index 02da59184..d201601f3 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -1130,7 +1130,7 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent( case enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED: // No Operation case enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED: - weh.workflowInfo.lastCompletedBuildID = event.GetWorkflowTaskCompletedEventAttributes().GetWorkerVersion().GetBuildId() + // No Operation case enumspb.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED: weh.commandsHelper.handleActivityTaskScheduled( event.GetActivityTaskScheduledEventAttributes().GetActivityId(), event.GetEventId()) diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index 28f14aca9..d9dcf00c3 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -200,8 +200,6 @@ type ( sdkVersion string sdkName string buildID string - // true if the workflow execution finishes in this task - hasTerminalEvent bool } finishedTask struct { @@ -331,20 +329,6 @@ func isCommandEvent(eventType enumspb.EventType) bool { } } -func isTerminalWorkflowEvent(eventType enumspb.EventType) bool { - switch eventType { - case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED, - enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_FAILED, - enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_CANCELED, - enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_CONTINUED_AS_NEW, - enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TERMINATED, - enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_TIMED_OUT: - return true - default: - return false - } -} - // nextTask returns the next task to be processed. func (eh *history) nextTask() (*preparedTask, error) { if eh.next == nil { @@ -367,7 +351,6 @@ func (eh *history) nextTask() (*preparedTask, error) { var markers []*historypb.HistoryEvent var msgs []*protocolpb.Message var buildID string - var hasTerminalEvent bool if len(result) > 0 { nextTaskEvents, err := eh.prepareTask() if err != nil { @@ -380,18 +363,16 @@ func (eh *history) nextTask() (*preparedTask, error) { markers = nextTaskEvents.markers msgs = nextTaskEvents.msgs buildID = nextTaskEvents.buildID - hasTerminalEvent = nextTaskEvents.hasTerminalEvent } return &preparedTask{ - events: result, - markers: markers, - flags: sdkFlags, - msgs: msgs, - binaryChecksum: checksum, - sdkName: sdkName, - sdkVersion: sdkVersion, - buildID: buildID, - hasTerminalEvent: hasTerminalEvent, + events: result, + markers: markers, + flags: sdkFlags, + msgs: msgs, + binaryChecksum: checksum, + sdkName: sdkName, + sdkVersion: sdkVersion, + buildID: buildID, }, nil } @@ -481,9 +462,7 @@ OrderEvents: enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED: // Skip default: - if isTerminalWorkflowEvent(event.GetEventType()) { - taskEvents.hasTerminalEvent = true - } else if event.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED { + if event.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED { taskEvents.buildID = event.GetWorkflowTaskCompletedEventAttributes(). GetWorkerVersion().GetBuildId() } else if isPreloadMarkerEvent(event) { @@ -985,7 +964,6 @@ func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflo var replayOutbox []outboxEntry var replayCommands []*commandpb.Command var respondEvents []*historypb.HistoryEvent - finalBuildID := "" taskMessages := workflowTask.task.GetMessages() skipReplayCheck := w.skipReplayCheck() @@ -1005,6 +983,7 @@ func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflo eventHandler.ResetLAWFTAttemptCounts() eventHandler.sdkFlags.markSDKFlagsSent() + isQueryOnlyTask := workflowTask.task.StartedEventId == 0 ProcessEvents: for { nextTask, err := reorderedHistory.nextTask() @@ -1016,6 +995,7 @@ ProcessEvents: historyMessages := nextTask.msgs flags := nextTask.flags binaryChecksum := nextTask.binaryChecksum + currentBuildID := nextTask.buildID // Check if we are replaying so we know if we should use the messages in the WFT or the history isReplay := len(reorderedEvents) > 0 && reorderedHistory.IsReplayEvent(reorderedEvents[len(reorderedEvents)-1]) var msgs *eventMsgIndex @@ -1046,8 +1026,10 @@ ProcessEvents: } else { w.workflowInfo.BinaryChecksum = binaryChecksum } - if nextTask.hasTerminalEvent && nextTask.buildID != "" { - finalBuildID = nextTask.buildID + if isReplay { + w.workflowInfo.currentTaskBuildID = currentBuildID + } else if !isReplay && !isQueryOnlyTask { + w.workflowInfo.currentTaskBuildID = w.wth.workerBuildID } // Reset the mutable side effect markers recorded eventHandler.mutableSideEffectsRecorded = nil @@ -1166,11 +1148,6 @@ ProcessEvents: } } - // If we saw that this is the last task of the execution, we need to use the build id from that - // before query handling. - if finalBuildID != "" { - w.workflowInfo.lastCompletedBuildID = finalBuildID - } return w.applyWorkflowPanicPolicy(workflowTask, workflowError) } @@ -1337,12 +1314,6 @@ func (w *workflowExecutionContextImpl) CompleteWorkflowTask(workflowTask *workfl completeRequest := w.wth.completeWorkflow(eventHandler, w.currentWorkflowTask, w, w.newCommands, w.newMessages, !waitLocalActivities) w.clearCurrentTask() - // We need to set the last completed BuildID to our own ID when completing a task, because we - // might service a query before we ever process any events again, and we won't have had a chance - // to update the last completed BuildID - even though it has changed to be our build id. - if w.wth.workerBuildID != "" { - w.workflowInfo.lastCompletedBuildID = w.wth.workerBuildID - } return completeRequest } diff --git a/internal/workflow.go b/internal/workflow.go index e9aaa9d98..03dbdde84 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -1005,11 +1005,10 @@ type WorkflowInfo struct { // build-id based versioning, is the explicitly set worker build id. If this is the first worker to operate on the // workflow, it is this worker's current value. BinaryChecksum string - // lastCompletedBuildID, if nonempty, contains the BuildID of the worker which last completed a - // workflow task for this workflow. This value may change over the lifetime of the workflow run, - // but is deterministic and safe to use for branching. The value is empty if the workflow has - // never seen a task completed by a worker with a set BuildId. - lastCompletedBuildID string + // currentTaskBuildID, if nonempty, contains the Build ID of the worker that processed the task + // which is currently or about to be executing. If no longer replaying will be set to the ID of + // this worker + currentTaskBuildID string continueAsNewSuggested bool currentHistorySize int @@ -1030,12 +1029,11 @@ func (wInfo *WorkflowInfo) GetBinaryChecksum() string { return wInfo.BinaryChecksum } -// GetLastCompletedBuildID returns the BuildID of the worker which last completed a workflow task -// for this workflow. This value may change over the lifetime of the workflow run, but is -// deterministic and safe to use for branching. The returned value is empty if the workflow has -// never seen a task completed by a worker with a set BuildId. -func (wInfo *WorkflowInfo) GetLastCompletedBuildID() string { - return wInfo.lastCompletedBuildID +// GetCurrentBuildID returns the Build ID of the worker that processed this task, which may be +// empty. During replay this id may not equal the id of the replaying worker. If not replaying and +// this worker has a defined Build ID, it will equal that ID. It is safe to use for branching. +func (wInfo *WorkflowInfo) GetCurrentBuildID() string { + return wInfo.currentTaskBuildID } // GetCurrentHistoryLength returns the current length of history when called. diff --git a/test/worker_versioning_test.go b/test/worker_versioning_test.go index 87837c5ec..e03d2dce5 100644 --- a/test/worker_versioning_test.go +++ b/test/worker_versioning_test.go @@ -293,16 +293,12 @@ func (ts *WorkerVersioningTestSuite) TestBuildIDChangesOverWorkflowLifetime() { // Start workflow wfHandle, err := ts.client.ExecuteWorkflow(ctx, ts.startWorkflowOptions("evolving-wf"), ts.workflows.BuildIDWorkflow) - // Query to see that the last build ID becomes 1.0 (we do eventually, because we might - // get the update in the very first task, in which case it'll be empty and that's OK -- see - // workflow for verifying it is always empty in the first task) - ts.Eventually(func() bool { - var lastBuildID string - res, err := ts.client.QueryWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "get-last-build-id", nil) - ts.NoError(err) - ts.NoError(res.Get(&lastBuildID)) - return lastBuildID == "1.0" - }, 5*time.Second, 100*time.Millisecond) + // Query to see that the build ID is 1.0 + var lastBuildID string + res, err := ts.client.QueryWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "get-last-build-id", nil) + ts.NoError(err) + ts.NoError(res.Get(&lastBuildID)) + ts.Equal("1.0", lastBuildID) // Add new compat ver err = ts.client.UpdateWorkerBuildIdCompatibility(ctx, &client.UpdateWorkerBuildIdCompatibilityOptions{ @@ -328,8 +324,7 @@ func (ts *WorkerVersioningTestSuite) TestBuildIDChangesOverWorkflowLifetime() { }) ts.NoError(err) - // The last task, with the new worker, should definitely, immediately, be 1.0 - var lastBuildID string + // The current task, with the new worker, should still be 1.0 since no new tasks have happened enval, err := ts.client.QueryWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "get-last-build-id", nil) ts.NoError(err) ts.NoError(enval.Get(&lastBuildID)) diff --git a/test/workflow_test.go b/test/workflow_test.go index bb5aa0c58..6738b0db2 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -1883,12 +1883,8 @@ func (w *Workflows) WaitSignalToStart(ctx workflow.Context) (string, error) { } func (w *Workflows) BuildIDWorkflow(ctx workflow.Context) error { - firstBuildID := workflow.GetInfo(ctx).GetLastCompletedBuildID() - _ = workflow.SetQueryHandler(ctx, "get-last-build-id", func() (string, error) { - // Since the first build ID should always be empty, prepend it here to mess up any - // assertions if it wasn't empty - return firstBuildID + workflow.GetInfo(ctx).GetLastCompletedBuildID(), nil + return workflow.GetInfo(ctx).GetCurrentBuildID(), nil }) workflow.GetSignalChannel(ctx, "finish").Receive(ctx, nil) From ea4c0836e7fc128ddeb7cea42c6be6ed44d6976e Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 4 Jan 2024 14:22:31 -0800 Subject: [PATCH 4/5] Include branching in workflow --- test/worker_versioning_test.go | 17 +++++++++++++++-- test/workflow_test.go | 17 +++++++++++++++++ 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/test/worker_versioning_test.go b/test/worker_versioning_test.go index e03d2dce5..31ae497d4 100644 --- a/test/worker_versioning_test.go +++ b/test/worker_versioning_test.go @@ -41,7 +41,8 @@ type WorkerVersioningTestSuite struct { *require.Assertions suite.Suite ConfigAndClientSuiteBase - workflows *Workflows + workflows *Workflows + activities *Activities } func TestWorkerVersioningTestSuite(t *testing.T) { @@ -51,6 +52,7 @@ func TestWorkerVersioningTestSuite(t *testing.T) { func (ts *WorkerVersioningTestSuite) SetupSuite() { ts.Assertions = require.New(ts.T()) ts.workflows = &Workflows{} + ts.activities = &Activities{} ts.NoError(ts.InitConfigAndNamespace()) ts.NoError(ts.InitClient()) } @@ -289,6 +291,7 @@ func (ts *WorkerVersioningTestSuite) TestBuildIDChangesOverWorkflowLifetime() { worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{BuildID: "1.0", UseBuildIDForVersioning: true}) ts.workflows.register(worker1) + ts.activities.register(worker1) ts.NoError(worker1.Start()) // Start workflow @@ -300,6 +303,16 @@ func (ts *WorkerVersioningTestSuite) TestBuildIDChangesOverWorkflowLifetime() { ts.NoError(res.Get(&lastBuildID)) ts.Equal("1.0", lastBuildID) + // Make sure we've got to the activity + ts.Eventually(func() bool { + var didRun bool + res, err := ts.client.QueryWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "activity-ran", nil) + ts.NoError(err) + ts.NoError(res.Get(&didRun)) + return didRun + }, time.Second*10, time.Millisecond*100) + worker1.Stop() + // Add new compat ver err = ts.client.UpdateWorkerBuildIdCompatibility(ctx, &client.UpdateWorkerBuildIdCompatibilityOptions{ TaskQueue: ts.taskQueueName, @@ -310,9 +323,9 @@ func (ts *WorkerVersioningTestSuite) TestBuildIDChangesOverWorkflowLifetime() { }) ts.NoError(err) - worker1.Stop() worker11 := worker.New(ts.client, ts.taskQueueName, worker.Options{BuildID: "1.1", UseBuildIDForVersioning: true}) ts.workflows.register(worker11) + ts.activities.register(worker11) ts.NoError(worker11.Start()) defer worker11.Stop() diff --git a/test/workflow_test.go b/test/workflow_test.go index 6738b0db2..0aa37cd96 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -1883,9 +1883,26 @@ func (w *Workflows) WaitSignalToStart(ctx workflow.Context) (string, error) { } func (w *Workflows) BuildIDWorkflow(ctx workflow.Context) error { + activityRan := false _ = workflow.SetQueryHandler(ctx, "get-last-build-id", func() (string, error) { return workflow.GetInfo(ctx).GetCurrentBuildID(), nil }) + _ = workflow.SetQueryHandler(ctx, "activity-ran", func() (bool, error) { + return activityRan, nil + }) + + if err := workflow.Sleep(ctx, 1*time.Millisecond); err != nil { + return err + } + // Ensure that we are still deterministic when a test using a worker with a different build id + // re-runs this workflow + if workflow.GetInfo(ctx).GetCurrentBuildID() == "1.0" { + ctx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{StartToCloseTimeout: 1 * time.Minute}) + if err := workflow.ExecuteActivity(ctx, new(Activities).Echo, 0, 1).Get(ctx, nil); err != nil { + return err + } + activityRan = true + } workflow.GetSignalChannel(ctx, "finish").Receive(ctx, nil) return nil From cec9ee9fec78d3bdef3517d42509aba87c713865 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 4 Jan 2024 15:36:39 -0800 Subject: [PATCH 5/5] Add clarifying comments --- internal/internal_task_handlers.go | 2 ++ internal/workflow.go | 2 ++ 2 files changed, 4 insertions(+) diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index d9dcf00c3..6bd5b1e2d 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -1029,6 +1029,8 @@ ProcessEvents: if isReplay { w.workflowInfo.currentTaskBuildID = currentBuildID } else if !isReplay && !isQueryOnlyTask { + // Query only tasks should use the build ID from the workflow task, not the worker's + // build id, since the user cares about what affected workflow state. w.workflowInfo.currentTaskBuildID = w.wth.workerBuildID } // Reset the mutable side effect markers recorded diff --git a/internal/workflow.go b/internal/workflow.go index 03dbdde84..0ef1c2635 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -1032,6 +1032,8 @@ func (wInfo *WorkflowInfo) GetBinaryChecksum() string { // GetCurrentBuildID returns the Build ID of the worker that processed this task, which may be // empty. During replay this id may not equal the id of the replaying worker. If not replaying and // this worker has a defined Build ID, it will equal that ID. It is safe to use for branching. +// When used inside a query, the ID of the worker that processed the task which last affected +// the workflow will be returned. func (wInfo *WorkflowInfo) GetCurrentBuildID() string { return wInfo.currentTaskBuildID }