diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index f146e5296..6bd5b1e2d 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -199,6 +199,7 @@ type ( binaryChecksum string sdkVersion string sdkName string + buildID string } finishedTask struct { @@ -269,8 +270,9 @@ func (eh *history) isNextWorkflowTaskFailed() (task finishedTask, err error) { var binaryChecksum string var flags []sdkFlag if nextEventType == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED { - binaryChecksum = nextEvent.GetWorkflowTaskCompletedEventAttributes().BinaryChecksum - for _, flag := range nextEvent.GetWorkflowTaskCompletedEventAttributes().GetSdkMetadata().GetLangUsedFlags() { + completedAttrs := nextEvent.GetWorkflowTaskCompletedEventAttributes() + binaryChecksum = completedAttrs.BinaryChecksum + for _, flag := range completedAttrs.GetSdkMetadata().GetLangUsedFlags() { f := sdkFlagFromUint(flag) if !f.isValid() { // If a flag is not recognized (value is too high or not defined), it must fail the workflow task @@ -348,6 +350,7 @@ func (eh *history) nextTask() (*preparedTask, error) { var markers []*historypb.HistoryEvent var msgs []*protocolpb.Message + var buildID string if len(result) > 0 { nextTaskEvents, err := eh.prepareTask() if err != nil { @@ -359,6 +362,7 @@ func (eh *history) nextTask() (*preparedTask, error) { eh.sdkVersion = nextTaskEvents.sdkVersion markers = nextTaskEvents.markers msgs = nextTaskEvents.msgs + buildID = nextTaskEvents.buildID } return &preparedTask{ events: result, @@ -368,6 +372,7 @@ func (eh *history) nextTask() (*preparedTask, error) { binaryChecksum: checksum, sdkName: sdkName, sdkVersion: sdkVersion, + buildID: buildID, }, nil } @@ -457,7 +462,10 @@ OrderEvents: enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED: // Skip default: - if isPreloadMarkerEvent(event) { + if event.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED { + taskEvents.buildID = event.GetWorkflowTaskCompletedEventAttributes(). + GetWorkerVersion().GetBuildId() + } else if isPreloadMarkerEvent(event) { taskEvents.markers = append(taskEvents.markers, event) } else if attrs := event.GetWorkflowExecutionUpdateAcceptedEventAttributes(); attrs != nil { taskEvents.msgs = append(taskEvents.msgs, inferMessage(attrs)) @@ -975,6 +983,7 @@ func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflo eventHandler.ResetLAWFTAttemptCounts() eventHandler.sdkFlags.markSDKFlagsSent() + isQueryOnlyTask := workflowTask.task.StartedEventId == 0 ProcessEvents: for { nextTask, err := reorderedHistory.nextTask() @@ -986,6 +995,7 @@ ProcessEvents: historyMessages := nextTask.msgs flags := nextTask.flags binaryChecksum := nextTask.binaryChecksum + currentBuildID := nextTask.buildID // Check if we are replaying so we know if we should use the messages in the WFT or the history isReplay := len(reorderedEvents) > 0 && reorderedHistory.IsReplayEvent(reorderedEvents[len(reorderedEvents)-1]) var msgs *eventMsgIndex @@ -1016,6 +1026,13 @@ ProcessEvents: } else { w.workflowInfo.BinaryChecksum = binaryChecksum } + if isReplay { + w.workflowInfo.currentTaskBuildID = currentBuildID + } else if !isReplay && !isQueryOnlyTask { + // Query only tasks should use the build ID from the workflow task, not the worker's + // build id, since the user cares about what affected workflow state. + w.workflowInfo.currentTaskBuildID = w.wth.workerBuildID + } // Reset the mutable side effect markers recorded eventHandler.mutableSideEffectsRecorded = nil // Markers are from the events that are produced from the current workflow task. diff --git a/internal/workflow.go b/internal/workflow.go index 6f86bd711..0ef1c2635 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -1005,6 +1005,10 @@ type WorkflowInfo struct { // build-id based versioning, is the explicitly set worker build id. If this is the first worker to operate on the // workflow, it is this worker's current value. BinaryChecksum string + // currentTaskBuildID, if nonempty, contains the Build ID of the worker that processed the task + // which is currently or about to be executing. If no longer replaying will be set to the ID of + // this worker + currentTaskBuildID string continueAsNewSuggested bool currentHistorySize int @@ -1016,7 +1020,8 @@ type UpdateInfo struct { ID string } -// GetBinaryChecksum return binary checksum. +// GetBinaryChecksum returns the binary checksum of the last worker to complete a task for this +// workflow, or if this is the first task, this worker's checksum. func (wInfo *WorkflowInfo) GetBinaryChecksum() string { if wInfo.BinaryChecksum == "" { return getBinaryChecksum() @@ -1024,6 +1029,15 @@ func (wInfo *WorkflowInfo) GetBinaryChecksum() string { return wInfo.BinaryChecksum } +// GetCurrentBuildID returns the Build ID of the worker that processed this task, which may be +// empty. During replay this id may not equal the id of the replaying worker. If not replaying and +// this worker has a defined Build ID, it will equal that ID. It is safe to use for branching. +// When used inside a query, the ID of the worker that processed the task which last affected +// the workflow will be returned. +func (wInfo *WorkflowInfo) GetCurrentBuildID() string { + return wInfo.currentTaskBuildID +} + // GetCurrentHistoryLength returns the current length of history when called. // This value may change throughout the life of the workflow. func (wInfo *WorkflowInfo) GetCurrentHistoryLength() int { diff --git a/test/worker_versioning_test.go b/test/worker_versioning_test.go index d376d34ad..31ae497d4 100644 --- a/test/worker_versioning_test.go +++ b/test/worker_versioning_test.go @@ -27,6 +27,9 @@ import ( "testing" "time" + "go.temporal.io/api/common/v1" + "go.temporal.io/api/workflowservice/v1" + "github.com/pborman/uuid" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -38,7 +41,8 @@ type WorkerVersioningTestSuite struct { *require.Assertions suite.Suite ConfigAndClientSuiteBase - workflows *Workflows + workflows *Workflows + activities *Activities } func TestWorkerVersioningTestSuite(t *testing.T) { @@ -48,6 +52,7 @@ func TestWorkerVersioningTestSuite(t *testing.T) { func (ts *WorkerVersioningTestSuite) SetupSuite() { ts.Assertions = require.New(ts.T()) ts.workflows = &Workflows{} + ts.activities = &Activities{} ts.NoError(ts.InitConfigAndNamespace()) ts.NoError(ts.InitClient()) } @@ -149,8 +154,6 @@ func (ts *WorkerVersioningTestSuite) TestTwoWorkersGetDifferentTasks() { ts.NoError(handle12.Get(ctx, nil)) ts.NoError(handle21.Get(ctx, nil)) ts.NoError(handle22.Get(ctx, nil)) - - // TODO: Actually assert they ran on the appropriate workers, once David's changes are ready } func (ts *WorkerVersioningTestSuite) TestReachabilityUnreachable() { @@ -273,3 +276,80 @@ func (ts *WorkerVersioningTestSuite) TestReachabilityVersions() { ts.Equal(1, len(taskQueueReachability.TaskQueueReachability)) ts.Equal([]client.TaskReachability{client.TaskReachabilityNewWorkflows}, taskQueueReachability.TaskQueueReachability) } + +func (ts *WorkerVersioningTestSuite) TestBuildIDChangesOverWorkflowLifetime() { + ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) + defer cancel() + + err := ts.client.UpdateWorkerBuildIdCompatibility(ctx, &client.UpdateWorkerBuildIdCompatibilityOptions{ + TaskQueue: ts.taskQueueName, + Operation: &client.BuildIDOpAddNewIDInNewDefaultSet{ + BuildID: "1.0", + }, + }) + ts.NoError(err) + + worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{BuildID: "1.0", UseBuildIDForVersioning: true}) + ts.workflows.register(worker1) + ts.activities.register(worker1) + ts.NoError(worker1.Start()) + + // Start workflow + wfHandle, err := ts.client.ExecuteWorkflow(ctx, ts.startWorkflowOptions("evolving-wf"), ts.workflows.BuildIDWorkflow) + // Query to see that the build ID is 1.0 + var lastBuildID string + res, err := ts.client.QueryWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "get-last-build-id", nil) + ts.NoError(err) + ts.NoError(res.Get(&lastBuildID)) + ts.Equal("1.0", lastBuildID) + + // Make sure we've got to the activity + ts.Eventually(func() bool { + var didRun bool + res, err := ts.client.QueryWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "activity-ran", nil) + ts.NoError(err) + ts.NoError(res.Get(&didRun)) + return didRun + }, time.Second*10, time.Millisecond*100) + worker1.Stop() + + // Add new compat ver + err = ts.client.UpdateWorkerBuildIdCompatibility(ctx, &client.UpdateWorkerBuildIdCompatibilityOptions{ + TaskQueue: ts.taskQueueName, + Operation: &client.BuildIDOpAddNewCompatibleVersion{ + BuildID: "1.1", + ExistingCompatibleBuildID: "1.0", + }, + }) + ts.NoError(err) + + worker11 := worker.New(ts.client, ts.taskQueueName, worker.Options{BuildID: "1.1", UseBuildIDForVersioning: true}) + ts.workflows.register(worker11) + ts.activities.register(worker11) + ts.NoError(worker11.Start()) + defer worker11.Stop() + + _, err = ts.client.WorkflowService().ResetStickyTaskQueue(ctx, &workflowservice.ResetStickyTaskQueueRequest{ + Namespace: ts.config.Namespace, + Execution: &common.WorkflowExecution{ + WorkflowId: wfHandle.GetID(), + }, + }) + ts.NoError(err) + + // The current task, with the new worker, should still be 1.0 since no new tasks have happened + enval, err := ts.client.QueryWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "get-last-build-id", nil) + ts.NoError(err) + ts.NoError(enval.Get(&lastBuildID)) + ts.Equal("1.0", lastBuildID) + + // finish the workflow under 1.1 + ts.NoError(ts.client.SignalWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "finish", "")) + ts.NoError(wfHandle.Get(ctx, nil)) + + // Post completion it should have the value of the last task + enval, err = ts.client.QueryWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "get-last-build-id", nil) + ts.NoError(err) + ts.NoError(enval.Get(&lastBuildID)) + ts.Equal("1.1", lastBuildID) +} diff --git a/test/workflow_test.go b/test/workflow_test.go index db88e3302..0aa37cd96 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -1882,6 +1882,32 @@ func (w *Workflows) WaitSignalToStart(ctx workflow.Context) (string, error) { return value, nil } +func (w *Workflows) BuildIDWorkflow(ctx workflow.Context) error { + activityRan := false + _ = workflow.SetQueryHandler(ctx, "get-last-build-id", func() (string, error) { + return workflow.GetInfo(ctx).GetCurrentBuildID(), nil + }) + _ = workflow.SetQueryHandler(ctx, "activity-ran", func() (bool, error) { + return activityRan, nil + }) + + if err := workflow.Sleep(ctx, 1*time.Millisecond); err != nil { + return err + } + // Ensure that we are still deterministic when a test using a worker with a different build id + // re-runs this workflow + if workflow.GetInfo(ctx).GetCurrentBuildID() == "1.0" { + ctx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{StartToCloseTimeout: 1 * time.Minute}) + if err := workflow.ExecuteActivity(ctx, new(Activities).Echo, 0, 1).Get(ctx, nil); err != nil { + return err + } + activityRan = true + } + + workflow.GetSignalChannel(ctx, "finish").Receive(ctx, nil) + return nil +} + func (w *Workflows) SignalsAndQueries(ctx workflow.Context, execChild, execActivity bool) error { // Add query handler err := workflow.SetQueryHandler(ctx, "workflow-query", func() (string, error) { return "query-response", nil }) @@ -2511,6 +2537,7 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.SleepForDuration) worker.RegisterWorkflow(w.InterceptorCalls) worker.RegisterWorkflow(w.WaitSignalToStart) + worker.RegisterWorkflow(w.BuildIDWorkflow) worker.RegisterWorkflow(w.SignalsAndQueries) worker.RegisterWorkflow(w.CheckOpenTelemetryBaggage) worker.RegisterWorkflow(w.AdvancedPostCancellation)