Skip to content

Commit

Permalink
Add last BuildID to workflow info
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Jan 4, 2024
1 parent 5a64898 commit 97e6583
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 5 deletions.
18 changes: 16 additions & 2 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ type (
next []*historypb.HistoryEvent
nextFlags []sdkFlag
binaryChecksum string
lastBuildID string
sdkVersion string
sdkName string
}
Expand All @@ -197,6 +198,7 @@ type (
flags []sdkFlag
msgs []*protocolpb.Message
binaryChecksum string
lastBuildID string
sdkVersion string
sdkName string
}
Expand All @@ -207,6 +209,7 @@ type (
flags []sdkFlag
sdkVersion string
sdkName string
buildID string
}
)

Expand Down Expand Up @@ -267,10 +270,16 @@ func (eh *history) isNextWorkflowTaskFailed() (task finishedTask, err error) {
nextEventType := nextEvent.GetEventType()
isFailed := nextEventType == enumspb.EVENT_TYPE_WORKFLOW_TASK_TIMED_OUT || nextEventType == enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED
var binaryChecksum string
buildID := ""
var flags []sdkFlag
if nextEventType == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED {
binaryChecksum = nextEvent.GetWorkflowTaskCompletedEventAttributes().BinaryChecksum
for _, flag := range nextEvent.GetWorkflowTaskCompletedEventAttributes().GetSdkMetadata().GetLangUsedFlags() {
completedAttrs := nextEvent.GetWorkflowTaskCompletedEventAttributes()
binaryChecksum = completedAttrs.BinaryChecksum
workerVersion := completedAttrs.GetWorkerVersion()
if workerVersion != nil {
buildID = workerVersion.GetBuildId()
}
for _, flag := range completedAttrs.GetSdkMetadata().GetLangUsedFlags() {
f := sdkFlagFromUint(flag)
if !f.isValid() {
// If a flag is not recognized (value is too high or not defined), it must fail the workflow task
Expand All @@ -282,6 +291,7 @@ func (eh *history) isNextWorkflowTaskFailed() (task finishedTask, err error) {
return finishedTask{
isFailed: isFailed,
binaryChecksum: binaryChecksum,
buildID: buildID,
flags: flags,
sdkName: nextEvent.GetWorkflowTaskCompletedEventAttributes().GetSdkMetadata().GetSdkName(),
sdkVersion: nextEvent.GetWorkflowTaskCompletedEventAttributes().GetSdkMetadata().GetSdkVersion(),
Expand Down Expand Up @@ -342,6 +352,7 @@ func (eh *history) nextTask() (*preparedTask, error) {

result := eh.next
checksum := eh.binaryChecksum
lastBuildID := eh.lastBuildID
sdkFlags := eh.nextFlags
sdkName := eh.sdkName
sdkVersion := eh.sdkVersion
Expand All @@ -366,6 +377,7 @@ func (eh *history) nextTask() (*preparedTask, error) {
flags: sdkFlags,
msgs: msgs,
binaryChecksum: checksum,
lastBuildID: lastBuildID,
sdkName: sdkName,
sdkVersion: sdkVersion,
}, nil
Expand Down Expand Up @@ -986,6 +998,7 @@ ProcessEvents:
historyMessages := nextTask.msgs
flags := nextTask.flags
binaryChecksum := nextTask.binaryChecksum
lastBuildID := nextTask.lastBuildID
// Check if we are replaying so we know if we should use the messages in the WFT or the history
isReplay := len(reorderedEvents) > 0 && reorderedHistory.IsReplayEvent(reorderedEvents[len(reorderedEvents)-1])
var msgs *eventMsgIndex
Expand Down Expand Up @@ -1016,6 +1029,7 @@ ProcessEvents:
} else {
w.workflowInfo.BinaryChecksum = binaryChecksum
}
w.workflowInfo.lastCompletedBuildID = lastBuildID
// Reset the mutable side effect markers recorded
eventHandler.mutableSideEffectsRecorded = nil
// Markers are from the events that are produced from the current workflow task.
Expand Down
16 changes: 15 additions & 1 deletion internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,6 +1005,11 @@ type WorkflowInfo struct {
// build-id based versioning, is the explicitly set worker build id. If this is the first worker to operate on the
// workflow, it is this worker's current value.
BinaryChecksum string
// lastCompletedBuildID, if nonempty, contains the BuildID of the worker which last completed a
// workflow task for this workflow. This value may change over the lifetime of the workflow run,
// but is deterministic and safe to use for branching. The value is empty if the workflow has
// never seen a task completed by a worker with a set BuildId.
lastCompletedBuildID string

continueAsNewSuggested bool
currentHistorySize int
Expand All @@ -1016,14 +1021,23 @@ type UpdateInfo struct {
ID string
}

// GetBinaryChecksum return binary checksum.
// GetBinaryChecksum returns the binary checksum of the last worker to complete a task for this
// workflow, or if this is the first task, this worker's checksum.
func (wInfo *WorkflowInfo) GetBinaryChecksum() string {
if wInfo.BinaryChecksum == "" {
return getBinaryChecksum()
}
return wInfo.BinaryChecksum
}

// GetLastCompletedBuildID returns the BuildID of the worker which last completed a workflow task
// for this workflow. This value may change over the lifetime of the workflow run, but is
// deterministic and safe to use for branching. The returned value is empty if the workflow has
// never seen a task completed by a worker with a set BuildId.
func (wInfo *WorkflowInfo) GetLastCompletedBuildID() string {
return wInfo.lastCompletedBuildID
}

// GetCurrentHistoryLength returns the current length of history when called.
// This value may change throughout the life of the workflow.
func (wInfo *WorkflowInfo) GetCurrentHistoryLength() int {
Expand Down
65 changes: 63 additions & 2 deletions test/worker_versioning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
"testing"
"time"

"go.temporal.io/api/common/v1"
"go.temporal.io/api/workflowservice/v1"

"github.com/pborman/uuid"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -149,8 +152,6 @@ func (ts *WorkerVersioningTestSuite) TestTwoWorkersGetDifferentTasks() {
ts.NoError(handle12.Get(ctx, nil))
ts.NoError(handle21.Get(ctx, nil))
ts.NoError(handle22.Get(ctx, nil))

// TODO: Actually assert they ran on the appropriate workers, once David's changes are ready
}

func (ts *WorkerVersioningTestSuite) TestReachabilityUnreachable() {
Expand Down Expand Up @@ -273,3 +274,63 @@ func (ts *WorkerVersioningTestSuite) TestReachabilityVersions() {
ts.Equal(1, len(taskQueueReachability.TaskQueueReachability))
ts.Equal([]client.TaskReachability{client.TaskReachabilityNewWorkflows}, taskQueueReachability.TaskQueueReachability)
}

func (ts *WorkerVersioningTestSuite) TestBuildIDChangesOverWorkflowLifetime() {
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
defer cancel()

err := ts.client.UpdateWorkerBuildIdCompatibility(ctx, &client.UpdateWorkerBuildIdCompatibilityOptions{
TaskQueue: ts.taskQueueName,
Operation: &client.BuildIDOpAddNewIDInNewDefaultSet{
BuildID: "1.0",
},
})
ts.NoError(err)

worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{BuildID: "1.0", UseBuildIDForVersioning: true})
ts.workflows.register(worker1)
ts.NoError(worker1.Start())

// Start workflow
wfHandle, err := ts.client.ExecuteWorkflow(ctx, ts.startWorkflowOptions("evolving-wf"), ts.workflows.WaitSignalToStart)
// Send it a made up signal to generate a task
ts.NoError(ts.client.SignalWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "fakesig", ""))
var lastBuildID string
enval, err := ts.client.QueryWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "get-last-build-id", nil)
ts.NoError(err)
ts.NoError(enval.Get(&lastBuildID))
worker1.Stop()
ts.Equal("1.0", lastBuildID)

// Add new compat ver
err = ts.client.UpdateWorkerBuildIdCompatibility(ctx, &client.UpdateWorkerBuildIdCompatibilityOptions{
TaskQueue: ts.taskQueueName,
Operation: &client.BuildIDOpAddNewCompatibleVersion{
BuildID: "1.1",
ExistingCompatibleBuildID: "1.0",
},
})
ts.NoError(err)

worker11 := worker.New(ts.client, ts.taskQueueName, worker.Options{BuildID: "1.1", UseBuildIDForVersioning: true})
ts.workflows.register(worker11)
ts.NoError(worker11.Start())
defer worker11.Stop()

_, err = ts.client.WorkflowService().ResetStickyTaskQueue(ctx, &workflowservice.ResetStickyTaskQueueRequest{
Namespace: ts.config.Namespace,
Execution: &common.WorkflowExecution{
WorkflowId: wfHandle.GetID(),
},
})
ts.NoError(err)

// finish the workflow
ts.NoError(ts.client.SignalWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "start-signal", ""))
ts.NoError(wfHandle.Get(ctx, nil))

enval, err = ts.client.QueryWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "get-last-build-id", nil)
ts.NoError(err)
ts.NoError(enval.Get(&lastBuildID))
ts.Equal("1.1", lastBuildID)
}
4 changes: 4 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1877,6 +1877,10 @@ func (w *Workflows) InterceptorCalls(ctx workflow.Context, someVal string) (stri
}

func (w *Workflows) WaitSignalToStart(ctx workflow.Context) (string, error) {
_ = workflow.SetQueryHandler(ctx, "get-last-build-id", func() (string, error) {
return workflow.GetInfo(ctx).GetLastCompletedBuildID(), nil
})

var value string
workflow.GetSignalChannel(ctx, "start-signal").Receive(ctx, &value)
return value, nil
Expand Down

0 comments on commit 97e6583

Please sign in to comment.