Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add last BuildID to workflow info #1335

Merged
merged 6 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ type (
binaryChecksum string
sdkVersion string
sdkName string
buildID string
}

finishedTask struct {
Expand Down Expand Up @@ -269,8 +270,9 @@ func (eh *history) isNextWorkflowTaskFailed() (task finishedTask, err error) {
var binaryChecksum string
var flags []sdkFlag
if nextEventType == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED {
binaryChecksum = nextEvent.GetWorkflowTaskCompletedEventAttributes().BinaryChecksum
for _, flag := range nextEvent.GetWorkflowTaskCompletedEventAttributes().GetSdkMetadata().GetLangUsedFlags() {
completedAttrs := nextEvent.GetWorkflowTaskCompletedEventAttributes()
binaryChecksum = completedAttrs.BinaryChecksum
for _, flag := range completedAttrs.GetSdkMetadata().GetLangUsedFlags() {
f := sdkFlagFromUint(flag)
if !f.isValid() {
// If a flag is not recognized (value is too high or not defined), it must fail the workflow task
Expand Down Expand Up @@ -348,6 +350,7 @@ func (eh *history) nextTask() (*preparedTask, error) {

var markers []*historypb.HistoryEvent
var msgs []*protocolpb.Message
var buildID string
if len(result) > 0 {
nextTaskEvents, err := eh.prepareTask()
if err != nil {
Expand All @@ -359,6 +362,7 @@ func (eh *history) nextTask() (*preparedTask, error) {
eh.sdkVersion = nextTaskEvents.sdkVersion
markers = nextTaskEvents.markers
msgs = nextTaskEvents.msgs
buildID = nextTaskEvents.buildID
}
return &preparedTask{
events: result,
Expand All @@ -368,6 +372,7 @@ func (eh *history) nextTask() (*preparedTask, error) {
binaryChecksum: checksum,
sdkName: sdkName,
sdkVersion: sdkVersion,
buildID: buildID,
}, nil
}

Expand Down Expand Up @@ -457,7 +462,10 @@ OrderEvents:
enumspb.EVENT_TYPE_WORKFLOW_TASK_FAILED:
// Skip
default:
if isPreloadMarkerEvent(event) {
if event.GetEventType() == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED {
taskEvents.buildID = event.GetWorkflowTaskCompletedEventAttributes().
GetWorkerVersion().GetBuildId()
} else if isPreloadMarkerEvent(event) {
taskEvents.markers = append(taskEvents.markers, event)
} else if attrs := event.GetWorkflowExecutionUpdateAcceptedEventAttributes(); attrs != nil {
taskEvents.msgs = append(taskEvents.msgs, inferMessage(attrs))
Expand Down Expand Up @@ -975,6 +983,7 @@ func (w *workflowExecutionContextImpl) ProcessWorkflowTask(workflowTask *workflo
eventHandler.ResetLAWFTAttemptCounts()
eventHandler.sdkFlags.markSDKFlagsSent()

isQueryOnlyTask := workflowTask.task.StartedEventId == 0
ProcessEvents:
for {
nextTask, err := reorderedHistory.nextTask()
Expand All @@ -986,6 +995,7 @@ ProcessEvents:
historyMessages := nextTask.msgs
flags := nextTask.flags
binaryChecksum := nextTask.binaryChecksum
currentBuildID := nextTask.buildID
// Check if we are replaying so we know if we should use the messages in the WFT or the history
isReplay := len(reorderedEvents) > 0 && reorderedHistory.IsReplayEvent(reorderedEvents[len(reorderedEvents)-1])
var msgs *eventMsgIndex
Expand Down Expand Up @@ -1016,6 +1026,13 @@ ProcessEvents:
} else {
w.workflowInfo.BinaryChecksum = binaryChecksum
}
if isReplay {
w.workflowInfo.currentTaskBuildID = currentBuildID
} else if !isReplay && !isQueryOnlyTask {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a comment here explaining why a query only task is treated special.

// Query only tasks should use the build ID from the workflow task, not the worker's
// build id, since the user cares about what affected workflow state.
w.workflowInfo.currentTaskBuildID = w.wth.workerBuildID
}
// Reset the mutable side effect markers recorded
eventHandler.mutableSideEffectsRecorded = nil
// Markers are from the events that are produced from the current workflow task.
Expand Down
16 changes: 15 additions & 1 deletion internal/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,6 +1005,10 @@ type WorkflowInfo struct {
// build-id based versioning, is the explicitly set worker build id. If this is the first worker to operate on the
// workflow, it is this worker's current value.
BinaryChecksum string
// currentTaskBuildID, if nonempty, contains the Build ID of the worker that processed the task
// which is currently or about to be executing. If no longer replaying will be set to the ID of
// this worker
currentTaskBuildID string

continueAsNewSuggested bool
currentHistorySize int
Expand All @@ -1016,14 +1020,24 @@ type UpdateInfo struct {
ID string
}

// GetBinaryChecksum return binary checksum.
// GetBinaryChecksum returns the binary checksum of the last worker to complete a task for this
// workflow, or if this is the first task, this worker's checksum.
func (wInfo *WorkflowInfo) GetBinaryChecksum() string {
if wInfo.BinaryChecksum == "" {
return getBinaryChecksum()
}
return wInfo.BinaryChecksum
}

// GetCurrentBuildID returns the Build ID of the worker that processed this task, which may be
// empty. During replay this id may not equal the id of the replaying worker. If not replaying and
// this worker has a defined Build ID, it will equal that ID. It is safe to use for branching.
// When used inside a query, the ID of the worker that processed the task which last affected
// the workflow will be returned.
func (wInfo *WorkflowInfo) GetCurrentBuildID() string {
Copy link
Contributor

@Quinn-With-Two-Ns Quinn-With-Two-Ns Jan 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May want to add a note for queries. Since they are not replaying, but also not using the current workers build ID.

return wInfo.currentTaskBuildID
}

// GetCurrentHistoryLength returns the current length of history when called.
// This value may change throughout the life of the workflow.
func (wInfo *WorkflowInfo) GetCurrentHistoryLength() int {
Expand Down
86 changes: 83 additions & 3 deletions test/worker_versioning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
"testing"
"time"

"go.temporal.io/api/common/v1"
"go.temporal.io/api/workflowservice/v1"

"github.com/pborman/uuid"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand All @@ -38,7 +41,8 @@ type WorkerVersioningTestSuite struct {
*require.Assertions
suite.Suite
ConfigAndClientSuiteBase
workflows *Workflows
workflows *Workflows
activities *Activities
}

func TestWorkerVersioningTestSuite(t *testing.T) {
Expand All @@ -48,6 +52,7 @@ func TestWorkerVersioningTestSuite(t *testing.T) {
func (ts *WorkerVersioningTestSuite) SetupSuite() {
ts.Assertions = require.New(ts.T())
ts.workflows = &Workflows{}
ts.activities = &Activities{}
ts.NoError(ts.InitConfigAndNamespace())
ts.NoError(ts.InitClient())
}
Expand Down Expand Up @@ -149,8 +154,6 @@ func (ts *WorkerVersioningTestSuite) TestTwoWorkersGetDifferentTasks() {
ts.NoError(handle12.Get(ctx, nil))
ts.NoError(handle21.Get(ctx, nil))
ts.NoError(handle22.Get(ctx, nil))

// TODO: Actually assert they ran on the appropriate workers, once David's changes are ready
}

func (ts *WorkerVersioningTestSuite) TestReachabilityUnreachable() {
Expand Down Expand Up @@ -273,3 +276,80 @@ func (ts *WorkerVersioningTestSuite) TestReachabilityVersions() {
ts.Equal(1, len(taskQueueReachability.TaskQueueReachability))
ts.Equal([]client.TaskReachability{client.TaskReachabilityNewWorkflows}, taskQueueReachability.TaskQueueReachability)
}

func (ts *WorkerVersioningTestSuite) TestBuildIDChangesOverWorkflowLifetime() {
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
defer cancel()

err := ts.client.UpdateWorkerBuildIdCompatibility(ctx, &client.UpdateWorkerBuildIdCompatibilityOptions{
TaskQueue: ts.taskQueueName,
Operation: &client.BuildIDOpAddNewIDInNewDefaultSet{
BuildID: "1.0",
},
})
ts.NoError(err)

worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{BuildID: "1.0", UseBuildIDForVersioning: true})
ts.workflows.register(worker1)
ts.activities.register(worker1)
ts.NoError(worker1.Start())

// Start workflow
wfHandle, err := ts.client.ExecuteWorkflow(ctx, ts.startWorkflowOptions("evolving-wf"), ts.workflows.BuildIDWorkflow)
// Query to see that the build ID is 1.0
var lastBuildID string
res, err := ts.client.QueryWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "get-last-build-id", nil)
ts.NoError(err)
ts.NoError(res.Get(&lastBuildID))
ts.Equal("1.0", lastBuildID)

// Make sure we've got to the activity
ts.Eventually(func() bool {
var didRun bool
res, err := ts.client.QueryWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "activity-ran", nil)
ts.NoError(err)
ts.NoError(res.Get(&didRun))
return didRun
}, time.Second*10, time.Millisecond*100)
worker1.Stop()

// Add new compat ver
err = ts.client.UpdateWorkerBuildIdCompatibility(ctx, &client.UpdateWorkerBuildIdCompatibilityOptions{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this doesn't require versioning to test, you can use build ID with unversioned workers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could make sense to test with and without versioning just to be safe.

Copy link
Member Author

@Sushisource Sushisource Jan 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The bit you reviewed is subtly wrong in terms of querying on a cached worker after the worker has completed a task with its' ID. New test also makes sure the first task doesn't have an ID, which covers this path.

(will need another update to fix the problem I mention)

TaskQueue: ts.taskQueueName,
Operation: &client.BuildIDOpAddNewCompatibleVersion{
BuildID: "1.1",
ExistingCompatibleBuildID: "1.0",
},
})
ts.NoError(err)

worker11 := worker.New(ts.client, ts.taskQueueName, worker.Options{BuildID: "1.1", UseBuildIDForVersioning: true})
ts.workflows.register(worker11)
ts.activities.register(worker11)
ts.NoError(worker11.Start())
defer worker11.Stop()

_, err = ts.client.WorkflowService().ResetStickyTaskQueue(ctx, &workflowservice.ResetStickyTaskQueueRequest{
Namespace: ts.config.Namespace,
Execution: &common.WorkflowExecution{
WorkflowId: wfHandle.GetID(),
},
})
ts.NoError(err)

// The current task, with the new worker, should still be 1.0 since no new tasks have happened
enval, err := ts.client.QueryWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "get-last-build-id", nil)
ts.NoError(err)
ts.NoError(enval.Get(&lastBuildID))
ts.Equal("1.0", lastBuildID)

// finish the workflow under 1.1
ts.NoError(ts.client.SignalWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "finish", ""))
ts.NoError(wfHandle.Get(ctx, nil))

// Post completion it should have the value of the last task
enval, err = ts.client.QueryWorkflow(ctx, wfHandle.GetID(), wfHandle.GetRunID(), "get-last-build-id", nil)
ts.NoError(err)
ts.NoError(enval.Get(&lastBuildID))
ts.Equal("1.1", lastBuildID)
}
27 changes: 27 additions & 0 deletions test/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1882,6 +1882,32 @@ func (w *Workflows) WaitSignalToStart(ctx workflow.Context) (string, error) {
return value, nil
}

func (w *Workflows) BuildIDWorkflow(ctx workflow.Context) error {
activityRan := false
_ = workflow.SetQueryHandler(ctx, "get-last-build-id", func() (string, error) {
return workflow.GetInfo(ctx).GetCurrentBuildID(), nil
})
_ = workflow.SetQueryHandler(ctx, "activity-ran", func() (bool, error) {
return activityRan, nil
})

if err := workflow.Sleep(ctx, 1*time.Millisecond); err != nil {
return err
}
// Ensure that we are still deterministic when a test using a worker with a different build id
// re-runs this workflow
if workflow.GetInfo(ctx).GetCurrentBuildID() == "1.0" {
ctx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{StartToCloseTimeout: 1 * time.Minute})
if err := workflow.ExecuteActivity(ctx, new(Activities).Echo, 0, 1).Get(ctx, nil); err != nil {
return err
}
activityRan = true
}

workflow.GetSignalChannel(ctx, "finish").Receive(ctx, nil)
return nil
}

func (w *Workflows) SignalsAndQueries(ctx workflow.Context, execChild, execActivity bool) error {
// Add query handler
err := workflow.SetQueryHandler(ctx, "workflow-query", func() (string, error) { return "query-response", nil })
Expand Down Expand Up @@ -2511,6 +2537,7 @@ func (w *Workflows) register(worker worker.Worker) {
worker.RegisterWorkflow(w.SleepForDuration)
worker.RegisterWorkflow(w.InterceptorCalls)
worker.RegisterWorkflow(w.WaitSignalToStart)
worker.RegisterWorkflow(w.BuildIDWorkflow)
worker.RegisterWorkflow(w.SignalsAndQueries)
worker.RegisterWorkflow(w.CheckOpenTelemetryBaggage)
worker.RegisterWorkflow(w.AdvancedPostCancellation)
Expand Down
Loading