Skip to content

Commit

Permalink
Incorporate & update changes from old PR
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Sep 27, 2022
1 parent bbbbfd4 commit eb9aeb6
Show file tree
Hide file tree
Showing 12 changed files with 569 additions and 120 deletions.
9 changes: 3 additions & 6 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,8 @@ type (
// GetWorkerBuildIDOrderingOptions is the input to Client.GetWorkerBuildIDOrdering.
GetWorkerBuildIDOrderingOptions = internal.GetWorkerBuildIDOrderingOptions

// WorkerBuildIDVersionGraph is the response for Client.GetWorkerBuildIdOrdering
WorkerBuildIDVersionGraph = internal.WorkerBuildIDVersionGraph

// WorkerVersionIDNode represents a node in the WorkerBuildIDVersionGraph
WorkerVersionIDNode = internal.WorkerVersionIDNode
// WorkerBuildIDVersionSets is the response for Client.GetWorkerBuildIDOrdering.
WorkerBuildIDVersionSets = internal.WorkerBuildIDVersionSets

// Client is the client for starting and getting information about a workflow executions as well as
// completing activities asynchronously.
Expand Down Expand Up @@ -396,7 +393,7 @@ type (

// GetWorkerBuildIDOrdering **IN DEVELOPMENT**
// Returns the worker-build-id based version graph for a particular task queue.
GetWorkerBuildIDOrdering(ctx context.Context, options *GetWorkerBuildIDOrderingOptions) (*WorkerBuildIDVersionGraph, error)
GetWorkerBuildIDOrdering(ctx context.Context, options *GetWorkerBuildIDOrderingOptions) (*WorkerBuildIDVersionSets, error)

// CheckHealth performs a server health check using the gRPC health check
// API. If the check fails, an error is returned.
Expand Down
2 changes: 1 addition & 1 deletion internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ type (
UpdateWorkerBuildIDOrdering(ctx context.Context, options *UpdateWorkerBuildIDOrderingOptions) error

// GetWorkerBuildIDOrdering returns the worker-build-id based version graph for a particular task queue.
GetWorkerBuildIDOrdering(ctx context.Context, options *GetWorkerBuildIDOrderingOptions) (*WorkerBuildIDVersionGraph, error)
GetWorkerBuildIDOrdering(ctx context.Context, options *GetWorkerBuildIDOrderingOptions) (*WorkerBuildIDVersionSets, error)

// CheckHealth performs a server health check using the gRPC health check
// API. If the check fails, an error is returned.
Expand Down
4 changes: 3 additions & 1 deletion internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1570,7 +1570,9 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
Namespace: wth.namespace,
}
if wth.workerBuildID != "" {
builtRequest.WorkerVersioningId = &taskqueuepb.VersionId{WorkerBuildId: wth.workerBuildID}
builtRequest.WorkerVersionsStamp = &commonpb.WorkerVersionStamp{
BuildId: wth.workerBuildID,
}
}
return builtRequest
}
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_task_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ func (t *TaskHandlersTestSuite) TestRespondsToWFTWithWorkerBinaryID() {
response := request.(*workflowservice.RespondWorkflowTaskCompletedRequest)
t.NoError(err)
t.NotNil(response)
t.Equal(workerBuildID, response.WorkerVersioningId.GetWorkerBuildId())
t.Equal(workerBuildID, response.GetWorkerVersionsStamp().GetBuildId())
}

func (t *TaskHandlersTestSuite) TestWorkflowTask_ActivityTaskScheduled() {
Expand Down
8 changes: 6 additions & 2 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,9 @@ func (wtp *workflowTaskPoller) getNextPollRequest() (request *workflowservice.Po
BinaryChecksum: wtp.getBuildID(),
}
if wtp.workerBuildID != "" {
builtRequest.WorkerVersioningId = &taskqueuepb.VersionId{WorkerBuildId: wtp.workerBuildID}
builtRequest.WorkerVersionCapabilities = &commonpb.WorkerVersionCapabilities{
BuildId: wtp.workerBuildID,
}
}
return builtRequest
}
Expand Down Expand Up @@ -836,7 +838,9 @@ func (atp *activityTaskPoller) poll(ctx context.Context) (interface{}, error) {
TaskQueueMetadata: &taskqueuepb.TaskQueueMetadata{MaxTasksPerSecond: &types.DoubleValue{Value: atp.activitiesPerSecond}},
}
if atp.workerBuildID != "" {
request.WorkerVersioningId = &taskqueuepb.VersionId{WorkerBuildId: atp.workerBuildID}
request.WorkerVersionCapabilities = &commonpb.WorkerVersionCapabilities{
BuildId: atp.workerBuildID,
}
}

response, err := atp.service.PollActivityTaskQueue(ctx, request)
Expand Down
22 changes: 9 additions & 13 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,16 +883,12 @@ func (wc *WorkflowClient) UpdateWorkerBuildIDOrdering(ctx context.Context, optio
return err
}

var previousCompatibleID *taskqueuepb.VersionId
if options.PreviousCompatible != "" {
previousCompatibleID = &taskqueuepb.VersionId{WorkerBuildId: options.PreviousCompatible}
}
request := &workflowservice.UpdateWorkerBuildIdOrderingRequest{
Namespace: wc.namespace,
TaskQueue: options.TaskQueue,
VersionId: &taskqueuepb.VersionId{WorkerBuildId: options.WorkerBuildID},
PreviousCompatible: previousCompatibleID,
BecomeDefault: options.BecomeDefault,
Namespace: wc.namespace,
TaskQueue: options.TaskQueue,
VersionId: options.WorkerBuildID,
CompatibleVersion: options.CompatibleVersion,
BecomeDefault: options.BecomeDefault,
}

grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx))
Expand All @@ -902,8 +898,8 @@ func (wc *WorkflowClient) UpdateWorkerBuildIDOrdering(ctx context.Context, optio
}

// GetWorkerBuildIDOrdering returns the worker-build-id based version graph for a particular task queue.
func (wc *WorkflowClient) GetWorkerBuildIDOrdering(ctx context.Context, options *GetWorkerBuildIDOrderingOptions) (*WorkerBuildIDVersionGraph, error) {
if options.MaxDepth < 0 {
func (wc *WorkflowClient) GetWorkerBuildIDOrdering(ctx context.Context, options *GetWorkerBuildIDOrderingOptions) (*WorkerBuildIDVersionSets, error) {
if options.MaxSets < 0 {
return nil, errors.New("maxDepth must be >= 0")
}
if err := wc.ensureInitialized(); err != nil {
Expand All @@ -916,13 +912,13 @@ func (wc *WorkflowClient) GetWorkerBuildIDOrdering(ctx context.Context, options
request := &workflowservice.GetWorkerBuildIdOrderingRequest{
Namespace: wc.namespace,
TaskQueue: options.TaskQueue,
MaxDepth: int32(options.MaxDepth),
MaxSets: int32(options.MaxSets),
}
resp, err := wc.workflowService.GetWorkerBuildIdOrdering(grpcCtx, request)
if err != nil {
return nil, err
}
converted := workerVersionGraphFromProtoResponse(resp)
converted := workerVersionSetsFromProtoResponse(resp)
return converted, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,67 +29,65 @@ import (

// UpdateWorkerBuildIDOrderingOptions is the input to Client.UpdateWorkerBuildIDOrdering.
type UpdateWorkerBuildIDOrderingOptions struct {
// The task queue to update the version graph of.
// The task queue to update the version sets of.
TaskQueue string
// Required, indicates the build id being added (or changed) to/in the graph.
// Required, indicates the build id being added or targeted.
WorkerBuildID string
// May be empty, and if set, indicates an existing version the new id should be considered compatible with.
PreviousCompatible string
CompatibleVersion string
// If true, this new id will become the default version for new workflow executions.
BecomeDefault bool
}

type GetWorkerBuildIDOrderingOptions struct {
TaskQueue string
MaxDepth int
MaxSets int
}

// WorkerBuildIDVersionGraph is the response for Client.GetWorkerBuildIdOrdering and represents the graph
// WorkerBuildIDVersionSets is the response for Client.GetWorkerBuildIdOrdering and represents the sets
// of worker build id based versions.
type WorkerBuildIDVersionGraph struct {
// The currently established default version
Default *WorkerVersionIDNode
// Other current latest-compatible versions which are not the overall default
CompatibleLeaves []*WorkerVersionIDNode
type WorkerBuildIDVersionSets struct {
Sets []*CompatibleVersionSet
}

// WorkerVersionIDNode is a single node in a worker version graph.
type WorkerVersionIDNode struct {
WorkerBuildID string
// A pointer to the previous version this version is considered to be compatible with
PreviousCompatible *WorkerVersionIDNode
// A pointer to the previous incompatible version (previous major version)
PreviousIncompatible *WorkerVersionIDNode
}

func workerVersionGraphFromProtoResponse(response *workflowservice.GetWorkerBuildIdOrderingResponse) *WorkerBuildIDVersionGraph {
if response == nil {
return nil
// Default returns the current overall default version. IE: The one that will be used to start new workflows.
// Returns the empty string if there are no versions present.
func (s *WorkerBuildIDVersionSets) Default() string {
if len(s.Sets) == 0 {
return ""
}
return &WorkerBuildIDVersionGraph{
Default: workerVersionNodeFromProto(response.CurrentDefault),
CompatibleLeaves: workerVersionNodesFromProto(response.CompatibleLeaves),
lastSet := s.Sets[len(s.Sets)-1]
if len(lastSet.Versions) == 0 {
return ""
}
return lastSet.Versions[len(lastSet.Versions)-1]
}

// CompatibleVersionSet represents a set of worker build ids which are compatible with each other.
type CompatibleVersionSet struct {
id string
Versions []string
}

func workerVersionNodeFromProto(node *taskqueuepb.VersionIdNode) *WorkerVersionIDNode {
if node == nil {
func workerVersionSetsFromProtoResponse(response *workflowservice.GetWorkerBuildIdOrderingResponse) *WorkerBuildIDVersionSets {
if response == nil {
return nil
}
return &WorkerVersionIDNode{
WorkerBuildID: node.GetVersion().GetWorkerBuildId(),
PreviousCompatible: workerVersionNodeFromProto(node.PreviousCompatible),
PreviousIncompatible: workerVersionNodeFromProto(node.PreviousIncompatible),
return &WorkerBuildIDVersionSets{
Sets: workerVersionSetsFromProto(response.GetMajorVersionSets()),
}
}

func workerVersionNodesFromProto(nodes []*taskqueuepb.VersionIdNode) []*WorkerVersionIDNode {
if len(nodes) == 0 {
func workerVersionSetsFromProto(sets []*taskqueuepb.CompatibleVersionSet) []*CompatibleVersionSet {
if sets == nil {
return nil
}
result := make([]*WorkerVersionIDNode, len(nodes))
for i, node := range nodes {
result[i] = workerVersionNodeFromProto(node)
result := make([]*CompatibleVersionSet, len(sets))
for i, s := range sets {
result[i] = &CompatibleVersionSet{
id: s.GetId(),
Versions: s.GetVersions(),
}
}
return result
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,64 +30,36 @@ import (
"go.temporal.io/api/workflowservice/v1"
)

func Test_WorkerVersionGraph_fromProtoResponse(t *testing.T) {
func Test_WorkerVersionSets_fromProtoResponse(t *testing.T) {
tests := []struct {
name string
response *workflowservice.GetWorkerBuildIdOrderingResponse
want *WorkerBuildIDVersionGraph
want *WorkerBuildIDVersionSets
}{
{
name: "nil response",
response: nil,
want: nil,
},
{
name: "normal graph",
name: "normal sets",
response: &workflowservice.GetWorkerBuildIdOrderingResponse{
CurrentDefault: &taskqueuepb.VersionIdNode{
Version: &taskqueuepb.VersionId{
WorkerBuildId: "2.0",
},
PreviousIncompatible: &taskqueuepb.VersionIdNode{
Version: &taskqueuepb.VersionId{
WorkerBuildId: "1.0",
},
},
},
CompatibleLeaves: []*taskqueuepb.VersionIdNode{
{
Version: &taskqueuepb.VersionId{
WorkerBuildId: "1.1",
},
PreviousCompatible: &taskqueuepb.VersionIdNode{
Version: &taskqueuepb.VersionId{
WorkerBuildId: "1.0",
},
},
},
MajorVersionSets: []*taskqueuepb.CompatibleVersionSet{
{Versions: []string{"1.0", "1.1"}, Id: "1"},
{Versions: []string{"2.0"}, Id: "2"},
},
},
want: &WorkerBuildIDVersionGraph{
Default: &WorkerVersionIDNode{
WorkerBuildID: "2.0",
PreviousIncompatible: &WorkerVersionIDNode{
WorkerBuildID: "1.0",
},
},
CompatibleLeaves: []*WorkerVersionIDNode{
{
WorkerBuildID: "1.1",
PreviousCompatible: &WorkerVersionIDNode{
WorkerBuildID: "1.0",
},
},
want: &WorkerBuildIDVersionSets{
Sets: []*CompatibleVersionSet{
{Versions: []string{"1.0", "1.1"}, id: "1"},
{Versions: []string{"2.0"}, id: "2"},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t, tt.want, workerVersionGraphFromProtoResponse(tt.response), "workerVersionGraphFromProtoResponse(%v)", tt.response)
assert.Equalf(t, tt.want, workerVersionSetsFromProtoResponse(tt.response), "workerVersionSetsFromProtoResponse(%v)", tt.response)
})
}
}
8 changes: 4 additions & 4 deletions mocks/Client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions test/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (
)

replace (
go.temporal.io/api => /mnt/chonky/dev/temporal/api-go
go.temporal.io/sdk => ../
go.temporal.io/sdk/contrib/opentelemetry => ../contrib/opentelemetry
go.temporal.io/sdk/contrib/opentracing => ../contrib/opentracing
Expand Down
Loading

0 comments on commit eb9aeb6

Please sign in to comment.