From 2885d7ab94cbf4a404662015e45e1167d0265b28 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 3 Jan 2024 17:08:30 -0800 Subject: [PATCH] Add last BuildID to workflow info --- internal/internal_event_handlers.go | 3 +- internal/internal_task_handlers.go | 11 ++++- internal/workflow.go | 16 +++++- test/worker_versioning_test.go | 75 ++++++++++++++++++++++++++++- test/workflow_test.go | 14 ++++++ 5 files changed, 113 insertions(+), 6 deletions(-) diff --git a/internal/internal_event_handlers.go b/internal/internal_event_handlers.go index d201601f3..7acdaf4a0 100644 --- a/internal/internal_event_handlers.go +++ b/internal/internal_event_handlers.go @@ -1130,7 +1130,8 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent( case enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED: // No Operation case enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED: - // No Operation + weh.workflowInfo.lastCompletedBuildID = event.GetWorkflowTaskCompletedEventAttributes().GetWorkerVersion().GetBuildId() + println("Set to ", weh.workflowInfo.lastCompletedBuildID) case enumspb.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED: weh.commandsHelper.handleActivityTaskScheduled( event.GetActivityTaskScheduledEventAttributes().GetActivityId(), event.GetEventId()) diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index f146e5296..723d6be65 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -179,6 +179,7 @@ type ( next []*historypb.HistoryEvent nextFlags []sdkFlag binaryChecksum string + lastBuildID string sdkVersion string sdkName string } @@ -269,8 +270,9 @@ func (eh *history) isNextWorkflowTaskFailed() (task finishedTask, err error) { var binaryChecksum string var flags []sdkFlag if nextEventType == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED { - binaryChecksum = nextEvent.GetWorkflowTaskCompletedEventAttributes().BinaryChecksum - for _, flag := range nextEvent.GetWorkflowTaskCompletedEventAttributes().GetSdkMetadata().GetLangUsedFlags() { + completedAttrs := nextEvent.GetWorkflowTaskCompletedEventAttributes() + binaryChecksum = completedAttrs.BinaryChecksum + for _, flag := range completedAttrs.GetSdkMetadata().GetLangUsedFlags() { f := sdkFlagFromUint(flag) if !f.isValid() { // If a flag is not recognized (value is too high or not defined), it must fail the workflow task @@ -1299,6 +1301,11 @@ func (w *workflowExecutionContextImpl) CompleteWorkflowTask(workflowTask *workfl completeRequest := w.wth.completeWorkflow(eventHandler, w.currentWorkflowTask, w, w.newCommands, w.newMessages, !waitLocalActivities) w.clearCurrentTask() + if w.wth.workerBuildID != "" { + println("EXISTING", w.workflowInfo.lastCompletedBuildID) + println("SETTING ON COMPLETE TO ", w.wth.workerBuildID) + w.workflowInfo.lastCompletedBuildID = w.wth.workerBuildID + } return completeRequest } diff --git a/internal/workflow.go b/internal/workflow.go index 6f86bd711..e9aaa9d98 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -1005,6 +1005,11 @@ type WorkflowInfo struct { // build-id based versioning, is the explicitly set worker build id. If this is the first worker to operate on the // workflow, it is this worker's current value. BinaryChecksum string + // lastCompletedBuildID, if nonempty, contains the BuildID of the worker which last completed a + // workflow task for this workflow. This value may change over the lifetime of the workflow run, + // but is deterministic and safe to use for branching. The value is empty if the workflow has + // never seen a task completed by a worker with a set BuildId. + lastCompletedBuildID string continueAsNewSuggested bool currentHistorySize int @@ -1016,7 +1021,8 @@ type UpdateInfo struct { ID string } -// GetBinaryChecksum return binary checksum. +// GetBinaryChecksum returns the binary checksum of the last worker to complete a task for this +// workflow, or if this is the first task, this worker's checksum. func (wInfo *WorkflowInfo) GetBinaryChecksum() string { if wInfo.BinaryChecksum == "" { return getBinaryChecksum() @@ -1024,6 +1030,14 @@ func (wInfo *WorkflowInfo) GetBinaryChecksum() string { return wInfo.BinaryChecksum } +// GetLastCompletedBuildID returns the BuildID of the worker which last completed a workflow task +// for this workflow. This value may change over the lifetime of the workflow run, but is +// deterministic and safe to use for branching. The returned value is empty if the workflow has +// never seen a task completed by a worker with a set BuildId. +func (wInfo *WorkflowInfo) GetLastCompletedBuildID() string { + return wInfo.lastCompletedBuildID +} + // GetCurrentHistoryLength returns the current length of history when called. // This value may change throughout the life of the workflow. func (wInfo *WorkflowInfo) GetCurrentHistoryLength() int { diff --git a/test/worker_versioning_test.go b/test/worker_versioning_test.go index d376d34ad..3083585b8 100644 --- a/test/worker_versioning_test.go +++ b/test/worker_versioning_test.go @@ -27,6 +27,9 @@ import ( "testing" "time" + "go.temporal.io/api/common/v1" + "go.temporal.io/api/workflowservice/v1" + "github.com/pborman/uuid" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -149,8 +152,6 @@ func (ts *WorkerVersioningTestSuite) TestTwoWorkersGetDifferentTasks() { ts.NoError(handle12.Get(ctx, nil)) ts.NoError(handle21.Get(ctx, nil)) ts.NoError(handle22.Get(ctx, nil)) - - // TODO: Actually assert they ran on the appropriate workers, once David's changes are ready } func (ts *WorkerVersioningTestSuite) TestReachabilityUnreachable() { @@ -273,3 +274,73 @@ func (ts *WorkerVersioningTestSuite) TestReachabilityVersions() { ts.Equal(1, len(taskQueueReachability.TaskQueueReachability)) ts.Equal([]client.TaskReachability{client.TaskReachabilityNewWorkflows}, taskQueueReachability.TaskQueueReachability) } + +func (ts *WorkerVersioningTestSuite) TestBuildIDChangesOverWorkflowLifetime() { + ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) + defer cancel() + + err := ts.client.UpdateWorkerBuildIdCompatibility(ctx, &client.UpdateWorkerBuildIdCompatibilityOptions{ + TaskQueue: ts.taskQueueName, + Operation: &client.BuildIDOpAddNewIDInNewDefaultSet{ + BuildID: "1.0", + }, + }) + ts.NoError(err) + + worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{BuildID: "1.0", UseBuildIDForVersioning: true}) + ts.workflows.register(worker1) + ts.NoError(worker1.Start()) + + // Start workflow + wfHandle, err := ts.client.ExecuteWorkflow(ctx, ts.startWorkflowOptions("evolving-wf"), ts.workflows.BuildIDWorkflow) + // Query to see that the last build ID becomes 1.0 (we do eventually, because we might + // get the update in the very first task, in which case it'll be empty and that's OK -- see + // workflow for verifying it is always empty in the first task) + ts.Eventually(func() bool { + var lastBuildID string + res, err := ts.client.QueryWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "get-last-build-id", nil) + ts.NoError(err) + ts.NoError(res.Get(&lastBuildID)) + return lastBuildID == "1.0" + }, 5*time.Second, 100*time.Millisecond) + + // Add new compat ver + err = ts.client.UpdateWorkerBuildIdCompatibility(ctx, &client.UpdateWorkerBuildIdCompatibilityOptions{ + TaskQueue: ts.taskQueueName, + Operation: &client.BuildIDOpAddNewCompatibleVersion{ + BuildID: "1.1", + ExistingCompatibleBuildID: "1.0", + }, + }) + ts.NoError(err) + + worker11 := worker.New(ts.client, ts.taskQueueName, worker.Options{BuildID: "1.1", UseBuildIDForVersioning: true}) + ts.workflows.register(worker11) + ts.NoError(worker11.Start()) + defer worker11.Stop() + + _, err = ts.client.WorkflowService().ResetStickyTaskQueue(ctx, &workflowservice.ResetStickyTaskQueueRequest{ + Namespace: ts.config.Namespace, + Execution: &common.WorkflowExecution{ + WorkflowId: wfHandle.GetID(), + }, + }) + ts.NoError(err) + + // The last task, with the new worker, should definitely, immediately, be 1.0 + var lastBuildID string + enval, err := ts.client.QueryWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "get-last-build-id", nil) + ts.NoError(err) + ts.NoError(enval.Get(&lastBuildID)) + ts.Equal("1.0", lastBuildID) + + // finish the workflow under 1.1 + ts.NoError(ts.client.SignalWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "finish", "")) + ts.NoError(wfHandle.Get(ctx, nil)) + + // Post completion it should have the value of the last task + enval, err = ts.client.QueryWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "get-last-build-id", nil) + ts.NoError(err) + ts.NoError(enval.Get(&lastBuildID)) + ts.Equal("1.1", lastBuildID) +} diff --git a/test/workflow_test.go b/test/workflow_test.go index db88e3302..bb5aa0c58 100644 --- a/test/workflow_test.go +++ b/test/workflow_test.go @@ -1882,6 +1882,19 @@ func (w *Workflows) WaitSignalToStart(ctx workflow.Context) (string, error) { return value, nil } +func (w *Workflows) BuildIDWorkflow(ctx workflow.Context) error { + firstBuildID := workflow.GetInfo(ctx).GetLastCompletedBuildID() + + _ = workflow.SetQueryHandler(ctx, "get-last-build-id", func() (string, error) { + // Since the first build ID should always be empty, prepend it here to mess up any + // assertions if it wasn't empty + return firstBuildID + workflow.GetInfo(ctx).GetLastCompletedBuildID(), nil + }) + + workflow.GetSignalChannel(ctx, "finish").Receive(ctx, nil) + return nil +} + func (w *Workflows) SignalsAndQueries(ctx workflow.Context, execChild, execActivity bool) error { // Add query handler err := workflow.SetQueryHandler(ctx, "workflow-query", func() (string, error) { return "query-response", nil }) @@ -2511,6 +2524,7 @@ func (w *Workflows) register(worker worker.Worker) { worker.RegisterWorkflow(w.SleepForDuration) worker.RegisterWorkflow(w.InterceptorCalls) worker.RegisterWorkflow(w.WaitSignalToStart) + worker.RegisterWorkflow(w.BuildIDWorkflow) worker.RegisterWorkflow(w.SignalsAndQueries) worker.RegisterWorkflow(w.CheckOpenTelemetryBaggage) worker.RegisterWorkflow(w.AdvancedPostCancellation)