Skip to content

Commit

Permalink
Add last BuildID to workflow info
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Jan 4, 2024
1 parent 5a64898 commit 2885d7a
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 6 deletions.
3 changes: 2 additions & 1 deletion internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1130,7 +1130,8 @@ func (weh *workflowExecutionEventHandlerImpl) ProcessEvent(
case enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED:
// No Operation
case enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED:
// No Operation
weh.workflowInfo.lastCompletedBuildID = event.GetWorkflowTaskCompletedEventAttributes().GetWorkerVersion().GetBuildId()
println("Set to ", weh.workflowInfo.lastCompletedBuildID)
case enumspb.EVENT_TYPE_ACTIVITY_TASK_SCHEDULED:
weh.commandsHelper.handleActivityTaskScheduled(
event.GetActivityTaskScheduledEventAttributes().GetActivityId(), event.GetEventId())
Expand Down
11 changes: 9 additions & 2 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ type (
next []*historypb.HistoryEvent
nextFlags []sdkFlag
binaryChecksum string
lastBuildID string

Check failure on line 182 in internal/internal_task_handlers.go

View workflow job for this annotation

GitHub Actions / build-and-test (ubuntu-latest, 1.20)

field lastBuildID is unused (U1000)

Check failure on line 182 in internal/internal_task_handlers.go

View workflow job for this annotation

GitHub Actions / build-and-test (ubuntu-latest, 1.21)

field lastBuildID is unused (U1000)
sdkVersion string
sdkName string
}
Expand Down Expand Up @@ -269,8 +270,9 @@ func (eh *history) isNextWorkflowTaskFailed() (task finishedTask, err error) {
var binaryChecksum string
var flags []sdkFlag
if nextEventType == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED {
binaryChecksum = nextEvent.GetWorkflowTaskCompletedEventAttributes().BinaryChecksum
for _, flag := range nextEvent.GetWorkflowTaskCompletedEventAttributes().GetSdkMetadata().GetLangUsedFlags() {
completedAttrs := nextEvent.GetWorkflowTaskCompletedEventAttributes()
binaryChecksum = completedAttrs.BinaryChecksum
for _, flag := range completedAttrs.GetSdkMetadata().GetLangUsedFlags() {
f := sdkFlagFromUint(flag)
if !f.isValid() {
// If a flag is not recognized (value is too high or not defined), it must fail the workflow task
Expand Down Expand Up @@ -1299,6 +1301,11 @@ func (w *workflowExecutionContextImpl) CompleteWorkflowTask(workflowTask *workfl

completeRequest := w.wth.completeWorkflow(eventHandler, w.currentWorkflowTask, w, w.newCommands, w.newMessages, !waitLocalActivities)
w.clearCurrentTask()
if w.wth.workerBuildID != "" {
println("EXISTING", w.workflowInfo.lastCompletedBuildID)
println("SETTING ON COMPLETE TO ", w.wth.workerBuildID)
w.workflowInfo.lastCompletedBuildID = w.wth.workerBuildID
}

return completeRequest
}
Expand Down
16 changes: 15 additions & 1 deletion internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,6 +1005,11 @@ type WorkflowInfo struct {
// build-id based versioning, is the explicitly set worker build id. If this is the first worker to operate on the
// workflow, it is this worker's current value.
BinaryChecksum string
// lastCompletedBuildID, if nonempty, contains the BuildID of the worker which last completed a
// workflow task for this workflow. This value may change over the lifetime of the workflow run,
// but is deterministic and safe to use for branching. The value is empty if the workflow has
// never seen a task completed by a worker with a set BuildId.
lastCompletedBuildID string

continueAsNewSuggested bool
currentHistorySize int
Expand All @@ -1016,14 +1021,23 @@ type UpdateInfo struct {
ID string
}

// GetBinaryChecksum return binary checksum.
// GetBinaryChecksum returns the binary checksum of the last worker to complete a task for this
// workflow, or if this is the first task, this worker's checksum.
func (wInfo *WorkflowInfo) GetBinaryChecksum() string {
if wInfo.BinaryChecksum == "" {
return getBinaryChecksum()
}
return wInfo.BinaryChecksum
}

// GetLastCompletedBuildID returns the BuildID of the worker which last completed a workflow task
// for this workflow. This value may change over the lifetime of the workflow run, but is
// deterministic and safe to use for branching. The returned value is empty if the workflow has
// never seen a task completed by a worker with a set BuildId.
func (wInfo *WorkflowInfo) GetLastCompletedBuildID() string {
return wInfo.lastCompletedBuildID
}

// GetCurrentHistoryLength returns the current length of history when called.
// This value may change throughout the life of the workflow.
func (wInfo *WorkflowInfo) GetCurrentHistoryLength() int {
Expand Down
75 changes: 73 additions & 2 deletions test/worker_versioning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
"testing"
"time"

"go.temporal.io/api/common/v1"
"go.temporal.io/api/workflowservice/v1"

"github.com/pborman/uuid"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -149,8 +152,6 @@ func (ts *WorkerVersioningTestSuite) TestTwoWorkersGetDifferentTasks() {
ts.NoError(handle12.Get(ctx, nil))
ts.NoError(handle21.Get(ctx, nil))
ts.NoError(handle22.Get(ctx, nil))

// TODO: Actually assert they ran on the appropriate workers, once David's changes are ready
}

func (ts *WorkerVersioningTestSuite) TestReachabilityUnreachable() {
Expand Down Expand Up @@ -273,3 +274,73 @@ func (ts *WorkerVersioningTestSuite) TestReachabilityVersions() {
ts.Equal(1, len(taskQueueReachability.TaskQueueReachability))
ts.Equal([]client.TaskReachability{client.TaskReachabilityNewWorkflows}, taskQueueReachability.TaskQueueReachability)
}

func (ts *WorkerVersioningTestSuite) TestBuildIDChangesOverWorkflowLifetime() {
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
defer cancel()

err := ts.client.UpdateWorkerBuildIdCompatibility(ctx, &client.UpdateWorkerBuildIdCompatibilityOptions{
TaskQueue: ts.taskQueueName,
Operation: &client.BuildIDOpAddNewIDInNewDefaultSet{
BuildID: "1.0",
},
})
ts.NoError(err)

worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{BuildID: "1.0", UseBuildIDForVersioning: true})
ts.workflows.register(worker1)
ts.NoError(worker1.Start())

// Start workflow
wfHandle, err := ts.client.ExecuteWorkflow(ctx, ts.startWorkflowOptions("evolving-wf"), ts.workflows.BuildIDWorkflow)
// Query to see that the last build ID becomes 1.0 (we do eventually, because we might
// get the update in the very first task, in which case it'll be empty and that's OK -- see
// workflow for verifying it is always empty in the first task)
ts.Eventually(func() bool {
var lastBuildID string
res, err := ts.client.QueryWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "get-last-build-id", nil)
ts.NoError(err)
ts.NoError(res.Get(&lastBuildID))
return lastBuildID == "1.0"
}, 5*time.Second, 100*time.Millisecond)

// Add new compat ver
err = ts.client.UpdateWorkerBuildIdCompatibility(ctx, &client.UpdateWorkerBuildIdCompatibilityOptions{
TaskQueue: ts.taskQueueName,
Operation: &client.BuildIDOpAddNewCompatibleVersion{
BuildID: "1.1",
ExistingCompatibleBuildID: "1.0",
},
})
ts.NoError(err)

worker11 := worker.New(ts.client, ts.taskQueueName, worker.Options{BuildID: "1.1", UseBuildIDForVersioning: true})
ts.workflows.register(worker11)
ts.NoError(worker11.Start())
defer worker11.Stop()

_, err = ts.client.WorkflowService().ResetStickyTaskQueue(ctx, &workflowservice.ResetStickyTaskQueueRequest{
Namespace: ts.config.Namespace,
Execution: &common.WorkflowExecution{
WorkflowId: wfHandle.GetID(),
},
})
ts.NoError(err)

// The last task, with the new worker, should definitely, immediately, be 1.0
var lastBuildID string
enval, err := ts.client.QueryWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "get-last-build-id", nil)
ts.NoError(err)
ts.NoError(enval.Get(&lastBuildID))
ts.Equal("1.0", lastBuildID)

// finish the workflow under 1.1
ts.NoError(ts.client.SignalWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "finish", ""))
ts.NoError(wfHandle.Get(ctx, nil))

// Post completion it should have the value of the last task
enval, err = ts.client.QueryWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "get-last-build-id", nil)
ts.NoError(err)
ts.NoError(enval.Get(&lastBuildID))
ts.Equal("1.1", lastBuildID)
}
14 changes: 14 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1882,6 +1882,19 @@ func (w *Workflows) WaitSignalToStart(ctx workflow.Context) (string, error) {
return value, nil
}

func (w *Workflows) BuildIDWorkflow(ctx workflow.Context) error {
firstBuildID := workflow.GetInfo(ctx).GetLastCompletedBuildID()

_ = workflow.SetQueryHandler(ctx, "get-last-build-id", func() (string, error) {
// Since the first build ID should always be empty, prepend it here to mess up any
// assertions if it wasn't empty
return firstBuildID + workflow.GetInfo(ctx).GetLastCompletedBuildID(), nil
})

workflow.GetSignalChannel(ctx, "finish").Receive(ctx, nil)
return nil
}

func (w *Workflows) SignalsAndQueries(ctx workflow.Context, execChild, execActivity bool) error {
// Add query handler
err := workflow.SetQueryHandler(ctx, "workflow-query", func() (string, error) { return "query-response", nil })
Expand Down Expand Up @@ -2511,6 +2524,7 @@ func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.SleepForDuration)
worker.RegisterWorkflow(w.InterceptorCalls)
worker.RegisterWorkflow(w.WaitSignalToStart)
worker.RegisterWorkflow(w.BuildIDWorkflow)
worker.RegisterWorkflow(w.SignalsAndQueries)
worker.RegisterWorkflow(w.CheckOpenTelemetryBaggage)
worker.RegisterWorkflow(w.AdvancedPostCancellation)
Expand Down

0 comments on commit 2885d7a

Please sign in to comment.