Skip to content

Commit

Permalink
Update with latest API changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Sushisource committed Mar 13, 2023
1 parent 106ce38 commit e655cd3
Show file tree
Hide file tree
Showing 11 changed files with 517 additions and 104 deletions.
18 changes: 9 additions & 9 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,13 @@ type (
// NOTE: Experimental
WorkflowUpdateHandle = internal.WorkflowUpdateHandle

// UpdateWorkerBuildIDOrderingOptions is the input to Client.UpdateWorkerBuildIDOrdering.
UpdateWorkerBuildIDOrderingOptions = internal.UpdateWorkerBuildIDOrderingOptions
// UpdateWorkerBuildIDCompatabilityOptions is the input to Client.UpdateWorkerBuildIdCompatability.
UpdateWorkerBuildIDCompatabilityOptions = internal.UpdateWorkerBuildIDCompatabilityOptions

// GetWorkerBuildIDOrderingOptions is the input to Client.GetWorkerBuildIDOrdering.
GetWorkerBuildIDOrderingOptions = internal.GetWorkerBuildIDOrderingOptions
// GetWorkerBuildIDCompatabilityOptions is the input to Client.GetWorkerBuildIdCompatability.
GetWorkerBuildIDCompatabilityOptions = internal.GetWorkerBuildIDCompatabilityOptions

// WorkerBuildIDVersionSets is the response for Client.GetWorkerBuildIDOrdering.
// WorkerBuildIDVersionSets is the response for Client.GetWorkerBuildIdCompatability.
WorkerBuildIDVersionSets = internal.WorkerBuildIDVersionSets

// Client is the client for starting and getting information about a workflow executions as well as
Expand Down Expand Up @@ -492,14 +492,14 @@ type (
// RequestId is used to deduplicate requests. It will be autogenerated if not set.
ResetWorkflowExecution(ctx context.Context, request *workflowservice.ResetWorkflowExecutionRequest) (*workflowservice.ResetWorkflowExecutionResponse, error)

// UpdateWorkerBuildIDOrdering **IN DEVELOPMENT**
// UpdateWorkerBuildIDCompatability **IN DEVELOPMENT**
// Allows you to update the worker-build-id based version sets for a particular task queue. This is used in
// conjunction with workers who specify their build id and thus opt into the feature. For more, see: <doc link>
UpdateWorkerBuildIDOrdering(ctx context.Context, options *UpdateWorkerBuildIDOrderingOptions) error
UpdateWorkerBuildIDCompatability(ctx context.Context, options *UpdateWorkerBuildIDCompatabilityOptions) error

// GetWorkerBuildIDOrdering **IN DEVELOPMENT**
// GetWorkerBuildIDCompatability **IN DEVELOPMENT**
// Returns the worker-build-id based version sets for a particular task queue.
GetWorkerBuildIDOrdering(ctx context.Context, options *GetWorkerBuildIDOrderingOptions) (*WorkerBuildIDVersionSets, error)
GetWorkerBuildIDCompatability(ctx context.Context, options *GetWorkerBuildIDCompatabilityOptions) (*WorkerBuildIDVersionSets, error)

// CheckHealth performs a server health check using the gRPC health check
// API. If the check fails, an error is returned.
Expand Down
484 changes: 464 additions & 20 deletions go.sum

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,13 +336,13 @@ type (
// RequestId is used to deduplicate requests. It will be autogenerated if not set.
ResetWorkflowExecution(ctx context.Context, request *workflowservice.ResetWorkflowExecutionRequest) (*workflowservice.ResetWorkflowExecutionResponse, error)

// UpdateWorkerBuildIDOrdering allows you to update the worker-build-id based version graph for a particular
// UpdateWorkerBuildIDCompatability allows you to update the worker-build-id based version sets for a particular
// task queue. This is used in conjunction with workers who specify their build id and thus opt into the
// feature. For more, see: <doc link>
UpdateWorkerBuildIDOrdering(ctx context.Context, options *UpdateWorkerBuildIDOrderingOptions) error
UpdateWorkerBuildIDCompatability(ctx context.Context, options *UpdateWorkerBuildIDCompatabilityOptions) error

// GetWorkerBuildIDOrdering returns the worker-build-id based version graph for a particular task queue.
GetWorkerBuildIDOrdering(ctx context.Context, options *GetWorkerBuildIDOrderingOptions) (*WorkerBuildIDVersionSets, error)
// GetWorkerBuildIDCompatability returns the worker-build-id based version sets for a particular task queue.
GetWorkerBuildIDCompatability(ctx context.Context, options *GetWorkerBuildIDCompatabilityOptions) (*WorkerBuildIDVersionSets, error)

// CheckHealth performs a server health check using the gRPC health check
// API. If the check fails, an error is returned.
Expand Down
30 changes: 0 additions & 30 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1624,36 +1624,6 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
return builtRequest
}

func errorToFailWorkflowTask(taskToken []byte, err error, identity string, dataConverter converter.DataConverter,
failureConverter converter.FailureConverter, namespace string,
) *workflowservice.RespondWorkflowTaskFailedRequest {
cause := enumspb.WORKFLOW_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE
// If it was a panic due to a bad state machine or if it was a history
// mismatch error, mark as non-deterministic
if panicErr, _ := err.(*workflowPanicError); panicErr != nil {
if _, badStateMachine := panicErr.value.(stateMachineIllegalStatePanic); badStateMachine {
cause = enumspb.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR
}
} else if _, mismatch := err.(historyMismatchError); mismatch {
cause = enumspb.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR
}

return &workflowservice.RespondWorkflowTaskFailedRequest{
TaskToken: taskToken,
Cause: cause,
Failure: failureConverter.ErrorToFailure(err),
Identity: identity,
BinaryChecksum: getBinaryChecksum(),
Namespace: namespace,
}
if wth.workerBuildID != "" {
builtRequest.WorkerVersionStamp = &commonpb.WorkerVersionStamp{
BuildId: wth.workerBuildID,
}
}
return builtRequest
}

func (wth *workflowTaskHandlerImpl) executeAnyPressurePoints(event *historypb.HistoryEvent, isInReplay bool) error {
if wth.ppMgr != nil && !reflect.ValueOf(wth.ppMgr).IsNil() && !isInReplay {
switch event.GetEventType() {
Expand Down
4 changes: 2 additions & 2 deletions internal/internal_task_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,11 +572,11 @@ func (t *TaskHandlersTestSuite) TestRespondsToWFTWithWorkerBinaryID() {
params := t.getTestWorkerExecutionParams()
params.WorkerBuildID = workerBuildID
taskHandler := newWorkflowTaskHandler(params, nil, t.registry)
request, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
request, _, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
response := request.(*workflowservice.RespondWorkflowTaskCompletedRequest)
t.NoError(err)
t.NotNil(response)
t.Equal(workerBuildID, response.GetWorkerVersionsStamp().GetBuildId())
t.Equal(workerBuildID, response.GetWorkerVersionStamp().GetBuildId())
}

func (t *TaskHandlersTestSuite) TestWorkflowTask_ActivityTaskScheduled() {
Expand Down
5 changes: 2 additions & 3 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,7 @@ func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics(
tagRunID, task.WorkflowExecution.GetRunId(),
tagAttempt, task.Attempt,
tagError, taskErr)
// convert err to WorkflowTaskFailed
completedRequest = errorToFailWorkflowTask(task.TaskToken, taskErr, wtp.identity, wtp.dataConverter, wtp.failureConverter, wtp.namespace)
completedRequest = wtp.errorToFailWorkflowTask(task.TaskToken, taskErr)
}

metricsHandler.Timer(metrics.WorkflowTaskExecutionLatency).Record(time.Since(startTime))
Expand Down Expand Up @@ -460,7 +459,7 @@ func (wtp *workflowTaskPoller) errorToFailWorkflowTask(taskToken []byte, err err
return &workflowservice.RespondWorkflowTaskFailedRequest{
TaskToken: taskToken,
Cause: cause,
Failure: ConvertErrorToFailure(err, wtp.dataConverter),
Failure: wtp.failureConverter.ErrorToFailure(err),
Identity: wtp.identity,
BinaryChecksum: wtp.getBuildID(),
Namespace: wtp.namespace,
Expand Down
14 changes: 7 additions & 7 deletions internal/internal_workflow_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,10 +938,10 @@ func (wc *WorkflowClient) ResetWorkflowExecution(ctx context.Context, request *w
return resp, nil
}

// UpdateWorkerBuildIDOrdering allows you to update the worker-build-id based version graph for a particular
// UpdateWorkerBuildIDCompatability allows you to update the worker-build-id based version sets for a particular
// task queue. This is used in conjunction with workers who specify their build id and thus opt into the
// feature. For more, see: <doc link>
func (wc *WorkflowClient) UpdateWorkerBuildIDOrdering(ctx context.Context, options *UpdateWorkerBuildIDOrderingOptions) error {
func (wc *WorkflowClient) UpdateWorkerBuildIDCompatability(ctx context.Context, options *UpdateWorkerBuildIDCompatabilityOptions) error {
if err := wc.ensureInitialized(); err != nil {
return err
}
Expand All @@ -954,12 +954,12 @@ func (wc *WorkflowClient) UpdateWorkerBuildIDOrdering(ctx context.Context, optio

grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx))
defer cancel()
_, err = wc.workflowService.UpdateWorkerBuildIdOrdering(grpcCtx, request)
_, err = wc.workflowService.UpdateWorkerBuildIdCompatability(grpcCtx, request)
return err
}

// GetWorkerBuildIDOrdering returns the worker-build-id based version graph for a particular task queue.
func (wc *WorkflowClient) GetWorkerBuildIDOrdering(ctx context.Context, options *GetWorkerBuildIDOrderingOptions) (*WorkerBuildIDVersionSets, error) {
// GetWorkerBuildIDCompatability returns the worker-build-id based version sets for a particular task queue.
func (wc *WorkflowClient) GetWorkerBuildIDCompatability(ctx context.Context, options *GetWorkerBuildIDCompatabilityOptions) (*WorkerBuildIDVersionSets, error) {
if options.MaxSets < 0 {
return nil, errors.New("maxDepth must be >= 0")
}
Expand All @@ -970,12 +970,12 @@ func (wc *WorkflowClient) GetWorkerBuildIDOrdering(ctx context.Context, options
grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx))
defer cancel()

request := &workflowservice.GetWorkerBuildIdOrderingRequest{
request := &workflowservice.GetWorkerBuildIdCompatabilityRequest{
Namespace: wc.namespace,
TaskQueue: options.TaskQueue,
MaxSets: int32(options.MaxSets),
}
resp, err := wc.workflowService.GetWorkerBuildIdOrdering(grpcCtx, request)
resp, err := wc.workflowService.GetWorkerBuildIdCompatability(grpcCtx, request)
if err != nil {
return nil, err
}
Expand Down
28 changes: 14 additions & 14 deletions internal/worker_version_sets.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
"go.temporal.io/api/workflowservice/v1"
)

// UpdateWorkerBuildIDOrderingOptions is the input to Client.UpdateWorkerBuildIDOrdering.
type UpdateWorkerBuildIDOrderingOptions struct {
// UpdateWorkerBuildIDCompatabilityOptions is the input to Client.UpdateWorkerBuildIDCompatability.
type UpdateWorkerBuildIDCompatabilityOptions struct {
// The task queue to update the version sets of.
TaskQueue string
// Required, indicates the build id being added or targeted.
Expand All @@ -42,43 +42,43 @@ type UpdateWorkerBuildIDOrderingOptions struct {
}

// Validates and converts the user's options into the proto request. Namespace must be attached afterward.
func (uw *UpdateWorkerBuildIDOrderingOptions) validateAndConvertToProto() (*workflowservice.UpdateWorkerBuildIdOrderingRequest, error) {
func (uw *UpdateWorkerBuildIDCompatabilityOptions) validateAndConvertToProto() (*workflowservice.UpdateWorkerBuildIdCompatabilityRequest, error) {
if uw.TaskQueue == "" {
return nil, errors.New("TaskQueue is required")
}
if uw.WorkerBuildID == "" {
return nil, errors.New("WorkerBuildID is required")
}
req := &workflowservice.UpdateWorkerBuildIdOrderingRequest{
req := &workflowservice.UpdateWorkerBuildIdCompatabilityRequest{
TaskQueue: uw.TaskQueue,
}
if uw.CompatibleVersion != "" {
req.Operation = &workflowservice.UpdateWorkerBuildIdOrderingRequest_NewCompatibleVersion_{
NewCompatibleVersion: &workflowservice.UpdateWorkerBuildIdOrderingRequest_NewCompatibleVersion{
req.Operation = &workflowservice.UpdateWorkerBuildIdCompatabilityRequest_AddNewCompatibleVersion_{
AddNewCompatibleVersion: &workflowservice.UpdateWorkerBuildIdCompatabilityRequest_AddNewCompatibleVersion{
NewVersionId: uw.WorkerBuildID,
ExistingCompatibleVersion: uw.CompatibleVersion,
BecomeDefault: uw.BecomeDefault,
MakeSetDefault: uw.BecomeDefault,
},
}
} else if uw.BecomeDefault {
req.Operation = &workflowservice.UpdateWorkerBuildIdOrderingRequest_ExistingVersionIdInSetToPromote{
ExistingVersionIdInSetToPromote: uw.WorkerBuildID,
req.Operation = &workflowservice.UpdateWorkerBuildIdCompatabilityRequest_PromoteSetByVersionId{
PromoteSetByVersionId: uw.WorkerBuildID,
}
} else {
req.Operation = &workflowservice.UpdateWorkerBuildIdOrderingRequest_NewDefaultVersionId{
NewDefaultVersionId: uw.WorkerBuildID,
req.Operation = &workflowservice.UpdateWorkerBuildIdCompatabilityRequest_AddNewVersionIdInNewDefaultSet{
AddNewVersionIdInNewDefaultSet: uw.WorkerBuildID,
}
}

return req, nil
}

type GetWorkerBuildIDOrderingOptions struct {
type GetWorkerBuildIDCompatabilityOptions struct {
TaskQueue string
MaxSets int
}

// WorkerBuildIDVersionSets is the response for Client.GetWorkerBuildIdOrdering and represents the sets
// WorkerBuildIDVersionSets is the response for Client.GetWorkerBuildIDCompatability and represents the sets
// of worker build id based versions.
type WorkerBuildIDVersionSets struct {
Sets []*CompatibleVersionSet
Expand All @@ -103,7 +103,7 @@ type CompatibleVersionSet struct {
Versions []string
}

func workerVersionSetsFromProtoResponse(response *workflowservice.GetWorkerBuildIdOrderingResponse) *WorkerBuildIDVersionSets {
func workerVersionSetsFromProtoResponse(response *workflowservice.GetWorkerBuildIdCompatabilityResponse) *WorkerBuildIDVersionSets {
if response == nil {
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions internal/worker_version_sets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
func Test_WorkerVersionSets_fromProtoResponse(t *testing.T) {
tests := []struct {
name string
response *workflowservice.GetWorkerBuildIdOrderingResponse
response *workflowservice.GetWorkerBuildIdCompatabilityResponse
want *WorkerBuildIDVersionSets
}{
{
Expand All @@ -43,7 +43,7 @@ func Test_WorkerVersionSets_fromProtoResponse(t *testing.T) {
},
{
name: "normal sets",
response: &workflowservice.GetWorkerBuildIdOrderingResponse{
response: &workflowservice.GetWorkerBuildIdCompatabilityResponse{
MajorVersionSets: []*taskqueuepb.CompatibleVersionSet{
{Versions: []string{"1.0", "1.1"}, Id: "1"},
{Versions: []string{"2.0"}, Id: "2"},
Expand Down
14 changes: 7 additions & 7 deletions mocks/Client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions test/worker_versioning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,24 +62,24 @@ func (ts *WorkerVersioningTestSuite) SetupTest() {
func (ts *WorkerVersioningTestSuite) TestManipulateVersionGraph() {
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
defer cancel()
err := ts.client.UpdateWorkerBuildIDOrdering(ctx, &client.UpdateWorkerBuildIDOrderingOptions{
err := ts.client.UpdateWorkerBuildIDCompatability(ctx, &client.UpdateWorkerBuildIDCompatabilityOptions{
TaskQueue: ts.taskQueueName,
WorkerBuildID: "1.0",
})
ts.NoError(err)
err = ts.client.UpdateWorkerBuildIDOrdering(ctx, &client.UpdateWorkerBuildIDOrderingOptions{
err = ts.client.UpdateWorkerBuildIDCompatability(ctx, &client.UpdateWorkerBuildIDCompatabilityOptions{
TaskQueue: ts.taskQueueName,
WorkerBuildID: "2.0",
})
ts.NoError(err)
err = ts.client.UpdateWorkerBuildIDOrdering(ctx, &client.UpdateWorkerBuildIDOrderingOptions{
err = ts.client.UpdateWorkerBuildIDCompatability(ctx, &client.UpdateWorkerBuildIDCompatabilityOptions{
TaskQueue: ts.taskQueueName,
WorkerBuildID: "1.1",
CompatibleVersion: "1.0",
})
ts.NoError(err)

res, err := ts.client.GetWorkerBuildIDOrdering(ctx, &client.GetWorkerBuildIDOrderingOptions{
res, err := ts.client.GetWorkerBuildIDCompatability(ctx, &client.GetWorkerBuildIDCompatabilityOptions{
TaskQueue: ts.taskQueueName,
})
ts.NoError(err)
Expand All @@ -92,7 +92,7 @@ func (ts *WorkerVersioningTestSuite) TestTwoWorkersGetDifferentTasks() {
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
defer cancel()

err := ts.client.UpdateWorkerBuildIDOrdering(ctx, &client.UpdateWorkerBuildIDOrderingOptions{
err := ts.client.UpdateWorkerBuildIDCompatability(ctx, &client.UpdateWorkerBuildIDCompatabilityOptions{
TaskQueue: ts.taskQueueName,
WorkerBuildID: "1.0",
})
Expand All @@ -114,7 +114,7 @@ func (ts *WorkerVersioningTestSuite) TestTwoWorkersGetDifferentTasks() {
ts.NoError(err)

// Now add the 2.0 version
err = ts.client.UpdateWorkerBuildIDOrdering(ctx, &client.UpdateWorkerBuildIDOrderingOptions{
err = ts.client.UpdateWorkerBuildIDCompatability(ctx, &client.UpdateWorkerBuildIDCompatabilityOptions{
TaskQueue: ts.taskQueueName,
WorkerBuildID: "2.0",
})
Expand Down

0 comments on commit e655cd3

Please sign in to comment.