diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index f146e5296..f59c71c93 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -179,6 +179,7 @@ type ( next []*historypb.HistoryEvent nextFlags []sdkFlag binaryChecksum string + lastBuildID string sdkVersion string sdkName string } @@ -197,6 +198,7 @@ type ( flags []sdkFlag msgs []*protocolpb.Message binaryChecksum string + lastBuildID string sdkVersion string sdkName string } @@ -207,6 +209,7 @@ type ( flags []sdkFlag sdkVersion string sdkName string + buildID string } ) @@ -267,10 +270,16 @@ func (eh *history) isNextWorkflowTaskFailed() (task finishedTask, err error) { nextEventType := nextEvent.GetEventType() isFailed := nextEventType == enumspb.EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT || nextEventType == enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED var binaryChecksum string + buildID := "" var flags []sdkFlag if nextEventType == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED { - binaryChecksum = nextEvent.GetWorkflowTaskCompletedEventAttributes().BinaryChecksum - for _, flag := range nextEvent.GetWorkflowTaskCompletedEventAttributes().GetSdkMetadata().GetLangUsedFlags() { + completedAttrs := nextEvent.GetWorkflowTaskCompletedEventAttributes() + binaryChecksum = completedAttrs.BinaryChecksum + workerVersion := completedAttrs.GetWorkerVersion() + if workerVersion != nil { + buildID = workerVersion.GetBuildId() + } + for _, flag := range completedAttrs.GetSdkMetadata().GetLangUsedFlags() { f := sdkFlagFromUint(flag) if !f.isValid() { // If a flag is not recognized (value is too high or not defined), it must fail the workflow task @@ -282,6 +291,7 @@ func (eh *history) isNextWorkflowTaskFailed() (task finishedTask, err error) { return finishedTask{ isFailed: isFailed, binaryChecksum: binaryChecksum, + buildID: buildID, flags: flags, sdkName: nextEvent.GetWorkflowTaskCompletedEventAttributes().GetSdkMetadata().GetSdkName(), sdkVersion: nextEvent.GetWorkflowTaskCompletedEventAttributes().GetSdkMetadata().GetSdkVersion(), @@ -342,6 +352,7 @@ func (eh *history) nextTask() (*preparedTask, error) { result := eh.next checksum := eh.binaryChecksum + lastBuildID := eh.lastBuildID sdkFlags := eh.nextFlags sdkName := eh.sdkName sdkVersion := eh.sdkVersion @@ -366,6 +377,7 @@ func (eh *history) nextTask() (*preparedTask, error) { flags: sdkFlags, msgs: msgs, binaryChecksum: checksum, + lastBuildID: lastBuildID, sdkName: sdkName, sdkVersion: sdkVersion, }, nil @@ -986,6 +998,7 @@ ProcessEvents: historyMessages := nextTask.msgs flags := nextTask.flags binaryChecksum := nextTask.binaryChecksum + lastBuildID := nextTask.lastBuildID // Check if we are replaying so we know if we should use the messages in the WFT or the history isReplay := len(reorderedEvents) > 0 && reorderedHistory.IsReplayEvent(reorderedEvents[len(reorderedEvents)-1]) var msgs *eventMsgIndex @@ -1016,6 +1029,7 @@ ProcessEvents: } else { w.workflowInfo.BinaryChecksum = binaryChecksum } + w.workflowInfo.lastCompletedBuildID = lastBuildID // Reset the mutable side effect markers recorded eventHandler.mutableSideEffectsRecorded = nil // Markers are from the events that are produced from the current workflow task. diff --git a/internal/workflow.go b/internal/workflow.go index 6f86bd711..e9aaa9d98 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -1005,6 +1005,11 @@ type WorkflowInfo struct { // build-id based versioning, is the explicitly set worker build id. If this is the first worker to operate on the // workflow, it is this worker's current value. BinaryChecksum string + // lastCompletedBuildID, if nonempty, contains the BuildID of the worker which last completed a + // workflow task for this workflow. This value may change over the lifetime of the workflow run, + // but is deterministic and safe to use for branching. The value is empty if the workflow has + // never seen a task completed by a worker with a set BuildId. + lastCompletedBuildID string continueAsNewSuggested bool currentHistorySize int @@ -1016,7 +1021,8 @@ type UpdateInfo struct { ID string } -// GetBinaryChecksum return binary checksum. +// GetBinaryChecksum returns the binary checksum of the last worker to complete a task for this +// workflow, or if this is the first task, this worker's checksum. func (wInfo *WorkflowInfo) GetBinaryChecksum() string { if wInfo.BinaryChecksum == "" { return getBinaryChecksum() @@ -1024,6 +1030,14 @@ func (wInfo *WorkflowInfo) GetBinaryChecksum() string { return wInfo.BinaryChecksum } +// GetLastCompletedBuildID returns the BuildID of the worker which last completed a workflow task +// for this workflow. This value may change over the lifetime of the workflow run, but is +// deterministic and safe to use for branching. The returned value is empty if the workflow has +// never seen a task completed by a worker with a set BuildId. +func (wInfo *WorkflowInfo) GetLastCompletedBuildID() string { + return wInfo.lastCompletedBuildID +} + // GetCurrentHistoryLength returns the current length of history when called. // This value may change throughout the life of the workflow. func (wInfo *WorkflowInfo) GetCurrentHistoryLength() int { diff --git a/test/worker_versioning_test.go b/test/worker_versioning_test.go index d376d34ad..f943f977a 100644 --- a/test/worker_versioning_test.go +++ b/test/worker_versioning_test.go @@ -27,6 +27,9 @@ import ( "testing" "time" + "go.temporal.io/api/common/v1" + "go.temporal.io/api/workflowservice/v1" + "github.com/pborman/uuid" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -149,8 +152,6 @@ func (ts *WorkerVersioningTestSuite) TestTwoWorkersGetDifferentTasks() { ts.NoError(handle12.Get(ctx, nil)) ts.NoError(handle21.Get(ctx, nil)) ts.NoError(handle22.Get(ctx, nil)) - - // TODO: Actually assert they ran on the appropriate workers, once David's changes are ready } func (ts *WorkerVersioningTestSuite) TestReachabilityUnreachable() { @@ -273,3 +274,63 @@ func (ts *WorkerVersioningTestSuite) TestReachabilityVersions() { ts.Equal(1, len(taskQueueReachability.TaskQueueReachability)) ts.Equal([]client.TaskReachability{client.TaskReachabilityNewWorkflows}, taskQueueReachability.TaskQueueReachability) } + +func (ts *WorkerVersioningTestSuite) TestBuildIDChangesOverWorkflowLifetime() { + ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) + defer cancel() + + err := ts.client.UpdateWorkerBuildIdCompatibility(ctx, &client.UpdateWorkerBuildIdCompatibilityOptions{ + TaskQueue: ts.taskQueueName, + Operation: &client.BuildIDOpAddNewIDInNewDefaultSet{ + BuildID: "1.0", + }, + }) + ts.NoError(err) + + worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{BuildID: "1.0", UseBuildIDForVersioning: true}) + ts.workflows.register(worker1) + ts.NoError(worker1.Start()) + + // Start workflow + wfHandle, err := ts.client.ExecuteWorkflow(ctx, ts.startWorkflowOptions("evolving-wf"), ts.workflows.WaitSignalToStart) + // Send it a made up signal to generate a task + ts.NoError(ts.client.SignalWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "fakesig", "")) + var lastBuildID string + enval, err := ts.client.QueryWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "get-last-build-id", nil) + ts.NoError(err) + ts.NoError(enval.Get(&lastBuildID)) + worker1.Stop() + ts.Equal("1.0", lastBuildID) + + // Add new compat ver + err = ts.client.UpdateWorkerBuildIdCompatibility(ctx, &client.UpdateWorkerBuildIdCompatibilityOptions{ + TaskQueue: ts.taskQueueName, + Operation: &client.BuildIDOpAddNewCompatibleVersion{ + BuildID: "1.1", + ExistingCompatibleBuildID: "1.0", + }, + }) + ts.NoError(err) + + worker11 := worker.New(ts.client, ts.taskQueueName, worker.Options{BuildID: "1.1", UseBuildIDForVersioning: true}) + ts.workflows.register(worker11) + ts.NoError(worker11.Start()) + defer worker11.Stop() + + _, err = ts.client.WorkflowService().ResetStickyTaskQueue(ctx, &workflowservice.ResetStickyTaskQueueRequest{ + Namespace: ts.config.Namespace, + Execution: &common.WorkflowExecution{ + WorkflowId: wfHandle.GetID(), + }, + }) + ts.NoError(err) + + // finish the workflow + ts.NoError(ts.client.SignalWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "start-signal", "")) + ts.NoError(wfHandle.Get(ctx, nil)) + + enval, err = ts.client.QueryWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "get-last-build-id", nil) + ts.NoError(err) + ts.NoError(enval.Get(&lastBuildID)) + ts.Equal("1.1", lastBuildID) +} diff --git a/test/workflow_test.go b/test/workflow_test.go index db88e3302..81b5dd3e9 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -1877,6 +1877,10 @@ func (w *Workflows) InterceptorCalls(ctx workflow.Context, someVal string) (stri } func (w *Workflows) WaitSignalToStart(ctx workflow.Context) (string, error) { + _ = workflow.SetQueryHandler(ctx, "get-last-build-id", func() (string, error) { + return workflow.GetInfo(ctx).GetLastCompletedBuildID(), nil + }) + var value string workflow.GetSignalChannel(ctx, "start-signal").Receive(ctx, &value) return value, nil