Skip to content

Commit

Permalink
Incorporate & update changes from old PR
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Sep 28, 2022
1 parent bbbbfd4 commit 0d67096
Show file tree
Hide file tree
Showing 13 changed files with 664 additions and 183 deletions.
13 changes: 5 additions & 8 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,8 @@ type (
// GetWorkerBuildIDOrderingOptions is the input to Client.GetWorkerBuildIDOrdering.
GetWorkerBuildIDOrderingOptions = internal.GetWorkerBuildIDOrderingOptions

// WorkerBuildIDVersionGraph is the response for Client.GetWorkerBuildIdOrdering
WorkerBuildIDVersionGraph = internal.WorkerBuildIDVersionGraph

// WorkerVersionIDNode represents a node in the WorkerBuildIDVersionGraph
WorkerVersionIDNode = internal.WorkerVersionIDNode
// WorkerBuildIDVersionSets is the response for Client.GetWorkerBuildIDOrdering.
WorkerBuildIDVersionSets = internal.WorkerBuildIDVersionSets

// Client is the client for starting and getting information about a workflow executions as well as
// completing activities asynchronously.
Expand Down Expand Up @@ -390,13 +387,13 @@ type (
ResetWorkflowExecution(ctx context.Context, request *workflowservice.ResetWorkflowExecutionRequest) (*workflowservice.ResetWorkflowExecutionResponse, error)

// UpdateWorkerBuildIDOrdering **IN DEVELOPMENT**
// Allows you to update the worker-build-id based version graph for a particular task queue. This is used in
// Allows you to update the worker-build-id based version sets for a particular task queue. This is used in
// conjunction with workers who specify their build id and thus opt into the feature. For more, see: <doc link>
UpdateWorkerBuildIDOrdering(ctx context.Context, options *UpdateWorkerBuildIDOrderingOptions) error

// GetWorkerBuildIDOrdering **IN DEVELOPMENT**
// Returns the worker-build-id based version graph for a particular task queue.
GetWorkerBuildIDOrdering(ctx context.Context, options *GetWorkerBuildIDOrderingOptions) (*WorkerBuildIDVersionGraph, error)
// Returns the worker-build-id based version sets for a particular task queue.
GetWorkerBuildIDOrdering(ctx context.Context, options *GetWorkerBuildIDOrderingOptions) (*WorkerBuildIDVersionSets, error)

// CheckHealth performs a server health check using the gRPC health check
// API. If the check fails, an error is returned.
Expand Down
2 changes: 1 addition & 1 deletion internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ type (
UpdateWorkerBuildIDOrdering(ctx context.Context, options *UpdateWorkerBuildIDOrderingOptions) error

// GetWorkerBuildIDOrdering returns the worker-build-id based version graph for a particular task queue.
GetWorkerBuildIDOrdering(ctx context.Context, options *GetWorkerBuildIDOrderingOptions) (*WorkerBuildIDVersionGraph, error)
GetWorkerBuildIDOrdering(ctx context.Context, options *GetWorkerBuildIDOrderingOptions) (*WorkerBuildIDVersionSets, error)

// CheckHealth performs a server health check using the gRPC health check
// API. If the check fails, an error is returned.
Expand Down
4 changes: 3 additions & 1 deletion internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1570,7 +1570,9 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
Namespace: wth.namespace,
}
if wth.workerBuildID != "" {
builtRequest.WorkerVersioningId = &taskqueuepb.VersionId{WorkerBuildId: wth.workerBuildID}
builtRequest.WorkerVersionStamp = &commonpb.WorkerVersionStamp{
BuildId: wth.workerBuildID,
}
}
return builtRequest
}
Expand Down
2 changes: 1 addition & 1 deletion internal/internal_task_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ func (t *TaskHandlersTestSuite) TestRespondsToWFTWithWorkerBinaryID() {
response := request.(*workflowservice.RespondWorkflowTaskCompletedRequest)
t.NoError(err)
t.NotNil(response)
t.Equal(workerBuildID, response.WorkerVersioningId.GetWorkerBuildId())
t.Equal(workerBuildID, response.GetWorkerVersionsStamp().GetBuildId())
}

func (t *TaskHandlersTestSuite) TestWorkflowTask_ActivityTaskScheduled() {
Expand Down
8 changes: 6 additions & 2 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,9 @@ func (wtp *workflowTaskPoller) getNextPollRequest() (request *workflowservice.Po
BinaryChecksum: wtp.getBuildID(),
}
if wtp.workerBuildID != "" {
builtRequest.WorkerVersioningId = &taskqueuepb.VersionId{WorkerBuildId: wtp.workerBuildID}
builtRequest.WorkerVersionCapabilities = &commonpb.WorkerVersionCapabilities{
BuildId: wtp.workerBuildID,
}
}
return builtRequest
}
Expand Down Expand Up @@ -836,7 +838,9 @@ func (atp *activityTaskPoller) poll(ctx context.Context) (interface{}, error) {
TaskQueueMetadata: &taskqueuepb.TaskQueueMetadata{MaxTasksPerSecond: &types.DoubleValue{Value: atp.activitiesPerSecond}},
}
if atp.workerBuildID != "" {
request.WorkerVersioningId = &taskqueuepb.VersionId{WorkerBuildId: atp.workerBuildID}
request.WorkerVersionCapabilities = &commonpb.WorkerVersionCapabilities{
BuildId: atp.workerBuildID,
}
}

response, err := atp.service.PollActivityTaskQueue(ctx, request)
Expand Down
24 changes: 9 additions & 15 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,27 +883,21 @@ func (wc *WorkflowClient) UpdateWorkerBuildIDOrdering(ctx context.Context, optio
return err
}

var previousCompatibleID *taskqueuepb.VersionId
if options.PreviousCompatible != "" {
previousCompatibleID = &taskqueuepb.VersionId{WorkerBuildId: options.PreviousCompatible}
}
request := &workflowservice.UpdateWorkerBuildIdOrderingRequest{
Namespace: wc.namespace,
TaskQueue: options.TaskQueue,
VersionId: &taskqueuepb.VersionId{WorkerBuildId: options.WorkerBuildID},
PreviousCompatible: previousCompatibleID,
BecomeDefault: options.BecomeDefault,
request, err := options.validateAndConvertToProto()
if err != nil {
return err
}
request.Namespace = wc.namespace

grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx))
defer cancel()
_, err := wc.workflowService.UpdateWorkerBuildIdOrdering(grpcCtx, request)
_, err = wc.workflowService.UpdateWorkerBuildIdOrdering(grpcCtx, request)
return err
}

// GetWorkerBuildIDOrdering returns the worker-build-id based version graph for a particular task queue.
func (wc *WorkflowClient) GetWorkerBuildIDOrdering(ctx context.Context, options *GetWorkerBuildIDOrderingOptions) (*WorkerBuildIDVersionGraph, error) {
if options.MaxDepth < 0 {
func (wc *WorkflowClient) GetWorkerBuildIDOrdering(ctx context.Context, options *GetWorkerBuildIDOrderingOptions) (*WorkerBuildIDVersionSets, error) {
if options.MaxSets < 0 {
return nil, errors.New("maxDepth must be >= 0")
}
if err := wc.ensureInitialized(); err != nil {
Expand All @@ -916,13 +910,13 @@ func (wc *WorkflowClient) GetWorkerBuildIDOrdering(ctx context.Context, options
request := &workflowservice.GetWorkerBuildIdOrderingRequest{
Namespace: wc.namespace,
TaskQueue: options.TaskQueue,
MaxDepth: int32(options.MaxDepth),
MaxSets: int32(options.MaxSets),
}
resp, err := wc.workflowService.GetWorkerBuildIdOrdering(grpcCtx, request)
if err != nil {
return nil, err
}
converted := workerVersionGraphFromProtoResponse(resp)
converted := workerVersionSetsFromProtoResponse(resp)
return converted, nil
}

Expand Down
95 changes: 0 additions & 95 deletions internal/worker_version_graph.go

This file was deleted.

127 changes: 127 additions & 0 deletions internal/worker_version_sets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
// The MIT License
//
// Copyright (c) 2022 Temporal Technologies Inc. All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package internal

import (
"errors"

taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"
)

// UpdateWorkerBuildIDOrderingOptions is the input to Client.UpdateWorkerBuildIDOrdering.
type UpdateWorkerBuildIDOrderingOptions struct {
// The task queue to update the version sets of.
TaskQueue string
// Required, indicates the build id being added or targeted.
WorkerBuildID string
// May be empty, and if set, indicates an existing version the new id should be considered compatible with.
CompatibleVersion string
// If true, this new id will become the default version for new workflow executions.
BecomeDefault bool
}

// Validates and converts the user's options into the proto request. Namespace must be attached afterward.
func (uw *UpdateWorkerBuildIDOrderingOptions) validateAndConvertToProto() (*workflowservice.UpdateWorkerBuildIdOrderingRequest, error) {
if uw.TaskQueue == "" {
return nil, errors.New("TaskQueue is required")
}
if uw.WorkerBuildID == "" {
return nil, errors.New("WorkerBuildID is required")
}
req := &workflowservice.UpdateWorkerBuildIdOrderingRequest{
TaskQueue: uw.TaskQueue,
}
if uw.CompatibleVersion != "" {
req.Operation = &workflowservice.UpdateWorkerBuildIdOrderingRequest_NewCompatibleVersion_{
NewCompatibleVersion: &workflowservice.UpdateWorkerBuildIdOrderingRequest_NewCompatibleVersion{
NewVersionId: uw.WorkerBuildID,
ExistingCompatibleVersion: uw.CompatibleVersion,
BecomeDefault: uw.BecomeDefault,
},
}
} else if uw.BecomeDefault {
req.Operation = &workflowservice.UpdateWorkerBuildIdOrderingRequest_ExistingVersionIdInSetToPromote{
ExistingVersionIdInSetToPromote: uw.WorkerBuildID,
}
} else {
req.Operation = &workflowservice.UpdateWorkerBuildIdOrderingRequest_NewDefaultVersionId{
NewDefaultVersionId: uw.WorkerBuildID,
}
}

return req, nil
}

type GetWorkerBuildIDOrderingOptions struct {
TaskQueue string
MaxSets int
}

// WorkerBuildIDVersionSets is the response for Client.GetWorkerBuildIdOrdering and represents the sets
// of worker build id based versions.
type WorkerBuildIDVersionSets struct {
Sets []*CompatibleVersionSet
}

// Default returns the current overall default version. IE: The one that will be used to start new workflows.
// Returns the empty string if there are no versions present.
func (s *WorkerBuildIDVersionSets) Default() string {
if len(s.Sets) == 0 {
return ""
}
lastSet := s.Sets[len(s.Sets)-1]
if len(lastSet.Versions) == 0 {
return ""
}
return lastSet.Versions[len(lastSet.Versions)-1]
}

// CompatibleVersionSet represents a set of worker build ids which are compatible with each other.
type CompatibleVersionSet struct {
id string
Versions []string
}

func workerVersionSetsFromProtoResponse(response *workflowservice.GetWorkerBuildIdOrderingResponse) *WorkerBuildIDVersionSets {
if response == nil {
return nil
}
return &WorkerBuildIDVersionSets{
Sets: workerVersionSetsFromProto(response.GetMajorVersionSets()),
}
}

func workerVersionSetsFromProto(sets []*taskqueuepb.CompatibleVersionSet) []*CompatibleVersionSet {
if sets == nil {
return nil
}
result := make([]*CompatibleVersionSet, len(sets))
for i, s := range sets {
result[i] = &CompatibleVersionSet{
id: s.GetId(),
Versions: s.GetVersions(),
}
}
return result
}
Loading

0 comments on commit 0d67096

Please sign in to comment.