diff --git a/client/client.go b/client/client.go index e65f8aec5..23f7c17ac 100644 --- a/client/client.go +++ b/client/client.go @@ -91,6 +91,18 @@ type ( // CheckHealthResponse is a response for Client.CheckHealth. CheckHealthResponse = internal.CheckHealthResponse + // UpdateWorkerBuildIDOrderingOptions is the input to Client.UpdateWorkerBuildIDOrdering. + UpdateWorkerBuildIDOrderingOptions = internal.UpdateWorkerBuildIDOrderingOptions + + // GetWorkerBuildIDOrderingOptions is the input to Client.GetWorkerBuildIDOrdering. + GetWorkerBuildIDOrderingOptions = internal.GetWorkerBuildIDOrderingOptions + + // WorkerBuildIDVersionGraph is the response for Client.GetWorkerBuildIdOrdering + WorkerBuildIDVersionGraph = internal.WorkerBuildIDVersionGraph + + // WorkerVersionIDNode represents a node in the WorkerBuildIDVersionGraph + WorkerVersionIDNode = internal.WorkerVersionIDNode + // Client is the client for starting and getting information about a workflow executions as well as // completing activities asynchronously. Client interface { @@ -372,11 +384,20 @@ type ( // - serviceerror.NotFound DescribeTaskQueue(ctx context.Context, taskqueue string, taskqueueType enumspb.TaskQueueType) (*workflowservice.DescribeTaskQueueResponse, error) - // ResetWorkflowExecution reset an existing workflow execution to WorkflowTaskFinishEventId(exclusive). + // ResetWorkflowExecution resets an existing workflow execution to WorkflowTaskFinishEventId(exclusive). // And it will immediately terminating the current execution instance. // RequestId is used to deduplicate requests. It will be autogenerated if not set. ResetWorkflowExecution(ctx context.Context, request *workflowservice.ResetWorkflowExecutionRequest) (*workflowservice.ResetWorkflowExecutionResponse, error) + // UpdateWorkerBuildIDOrdering **IN DEVELOPMENT** + // Allows you to update the worker-build-id based version graph for a particular task queue. This is used in + // conjunction with workers who specify their build id and thus opt into the feature. For more, see: + UpdateWorkerBuildIDOrdering(ctx context.Context, options *UpdateWorkerBuildIDOrderingOptions) error + + // GetWorkerBuildIDOrdering **IN DEVELOPMENT** + // Returns the worker-build-id based version graph for a particular task queue. + GetWorkerBuildIDOrdering(ctx context.Context, options *GetWorkerBuildIDOrderingOptions) (*WorkerBuildIDVersionGraph, error) + // CheckHealth performs a server health check using the gRPC health check // API. If the check fails, an error is returned. CheckHealth(ctx context.Context, request *CheckHealthRequest) (*CheckHealthResponse, error) @@ -511,8 +532,9 @@ var _ internal.NamespaceClient = NamespaceClient(nil) // User had Activity.RecordHeartbeat(ctx, "my-heartbeat") and then got response from calling Client.DescribeWorkflowExecution. // The response contains binary field PendingActivityInfo.HeartbeatDetails, // which can be decoded by using: -// var result string // This need to be same type as the one passed to RecordHeartbeat -// NewValue(data).Get(&result) +// +// var result string // This need to be same type as the one passed to RecordHeartbeat +// NewValue(data).Get(&result) func NewValue(data *commonpb.Payloads) converter.EncodedValue { return internal.NewValue(data) } @@ -521,9 +543,10 @@ func NewValue(data *commonpb.Payloads) converter.EncodedValue { // User had Activity.RecordHeartbeat(ctx, "my-heartbeat", 123) and then got response from calling Client.DescribeWorkflowExecution. // The response contains binary field PendingActivityInfo.HeartbeatDetails, // which can be decoded by using: -// var result1 string -// var result2 int // These need to be same type as those arguments passed to RecordHeartbeat -// NewValues(data).Get(&result1, &result2) +// +// var result1 string +// var result2 int // These need to be same type as those arguments passed to RecordHeartbeat +// NewValues(data).Get(&result1, &result2) func NewValues(data *commonpb.Payloads) converter.EncodedValues { return internal.NewValues(data) } diff --git a/internal/client.go b/internal/client.go index 9f4bf887f..6eb959af8 100644 --- a/internal/client.go +++ b/internal/client.go @@ -336,6 +336,14 @@ type ( // RequestId is used to deduplicate requests. It will be autogenerated if not set. ResetWorkflowExecution(ctx context.Context, request *workflowservice.ResetWorkflowExecutionRequest) (*workflowservice.ResetWorkflowExecutionResponse, error) + // UpdateWorkerBuildIDOrdering allows you to update the worker-build-id based version graph for a particular + // task queue. This is used in conjunction with workers who specify their build id and thus opt into the + // feature. For more, see: + UpdateWorkerBuildIDOrdering(ctx context.Context, options *UpdateWorkerBuildIDOrderingOptions) error + + // GetWorkerBuildIDOrdering returns the worker-build-id based version graph for a particular task queue. + GetWorkerBuildIDOrdering(ctx context.Context, options *GetWorkerBuildIDOrderingOptions) (*WorkerBuildIDVersionGraph, error) + // CheckHealth performs a server health check using the gRPC health check // API. If the check fails, an error is returned. CheckHealth(ctx context.Context, request *CheckHealthRequest) (*CheckHealthResponse, error) @@ -816,8 +824,9 @@ func newNamespaceServiceClient(workflowServiceClient workflowservice.WorkflowSer // User had Activity.RecordHeartbeat(ctx, "my-heartbeat") and then got response from calling Client.DescribeWorkflowExecution. // The response contains binary field PendingActivityInfo.HeartbeatDetails, // which can be decoded by using: -// var result string // This need to be same type as the one passed to RecordHeartbeat -// NewValue(data).Get(&result) +// +// var result string // This need to be same type as the one passed to RecordHeartbeat +// NewValue(data).Get(&result) func NewValue(data *commonpb.Payloads) converter.EncodedValue { return newEncodedValue(data, nil) } @@ -826,9 +835,10 @@ func NewValue(data *commonpb.Payloads) converter.EncodedValue { // User had Activity.RecordHeartbeat(ctx, "my-heartbeat", 123) and then got response from calling Client.DescribeWorkflowExecution. // The response contains binary field PendingActivityInfo.HeartbeatDetails, // which can be decoded by using: -// var result1 string -// var result2 int // These need to be same type as those arguments passed to RecordHeartbeat -// NewValues(data).Get(&result1, &result2) +// +// var result1 string +// var result2 int // These need to be same type as those arguments passed to RecordHeartbeat +// NewValues(data).Get(&result1, &result2) func NewValues(data *commonpb.Payloads) converter.EncodedValues { return newEncodedValues(data, nil) } diff --git a/internal/internal_task_handlers.go b/internal/internal_task_handlers.go index 343b5d3ce..fae85b358 100644 --- a/internal/internal_task_handlers.go +++ b/internal/internal_task_handlers.go @@ -121,6 +121,7 @@ type ( ppMgr pressurePointMgr logger log.Logger identity string + workerBuildID string enableLoggingInReplay bool registry *registry laTunnel *localActivityTunnel @@ -401,6 +402,7 @@ func newWorkflowTaskHandler(params workerExecutionParameters, ppMgr pressurePoin ppMgr: ppMgr, metricsHandler: params.MetricsHandler, identity: params.Identity, + workerBuildID: params.WorkerBuildID, enableLoggingInReplay: params.EnableLoggingInReplay, registry: registry, workflowPanicPolicy: params.WorkflowPanicPolicy, @@ -827,7 +829,7 @@ ProcessEvents: break ProcessEvents } if binaryChecksum == "" { - w.workflowInfo.BinaryChecksum = getBinaryChecksum() + w.workflowInfo.BinaryChecksum = w.wth.getBuildID() } else { w.workflowInfo.BinaryChecksum = binaryChecksum } @@ -1557,40 +1559,20 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow( } } - return &workflowservice.RespondWorkflowTaskCompletedRequest{ + builtRequest := &workflowservice.RespondWorkflowTaskCompletedRequest{ TaskToken: task.TaskToken, Commands: commands, Identity: wth.identity, ReturnNewWorkflowTask: true, ForceCreateNewWorkflowTask: forceNewWorkflowTask, - BinaryChecksum: getBinaryChecksum(), + BinaryChecksum: wth.getBuildID(), QueryResults: queryResults, Namespace: wth.namespace, } -} - -func errorToFailWorkflowTask(taskToken []byte, err error, identity string, dataConverter converter.DataConverter, - namespace string) *workflowservice.RespondWorkflowTaskFailedRequest { - - cause := enumspb.WORKFLOW_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE - // If it was a panic due to a bad state machine or if it was a history - // mismatch error, mark as non-deterministic - if panicErr, _ := err.(*workflowPanicError); panicErr != nil { - if _, badStateMachine := panicErr.value.(stateMachineIllegalStatePanic); badStateMachine { - cause = enumspb.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR - } - } else if _, mismatch := err.(historyMismatchError); mismatch { - cause = enumspb.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR - } - - return &workflowservice.RespondWorkflowTaskFailedRequest{ - TaskToken: taskToken, - Cause: cause, - Failure: ConvertErrorToFailure(err, dataConverter), - Identity: identity, - BinaryChecksum: getBinaryChecksum(), - Namespace: namespace, + if wth.workerBuildID != "" { + builtRequest.WorkerVersioningId = &taskqueuepb.VersionId{WorkerBuildId: wth.workerBuildID} } + return builtRequest } func (wth *workflowTaskHandlerImpl) executeAnyPressurePoints(event *historypb.HistoryEvent, isInReplay bool) error { @@ -1609,6 +1591,13 @@ func (wth *workflowTaskHandlerImpl) executeAnyPressurePoints(event *historypb.Hi return nil } +func (wth *workflowTaskHandlerImpl) getBuildID() string { + if wth.workerBuildID != "" { + return wth.workerBuildID + } + return getBinaryChecksum() +} + func newActivityTaskHandler( service workflowservice.WorkflowServiceClient, params workerExecutionParameters, diff --git a/internal/internal_task_handlers_test.go b/internal/internal_task_handlers_test.go index a7ed87b66..c87bff9ec 100644 --- a/internal/internal_task_handlers_test.go +++ b/internal/internal_task_handlers_test.go @@ -534,6 +534,25 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_BinaryChecksum() { t.Equal(getBinaryChecksum(), checksums[2]) } +func (t *TaskHandlersTestSuite) TestRespondsToWFTWithWorkerBinaryID() { + taskQueue := "tq1" + workerBuildID := "yaaaay" + testEvents := []*historypb.HistoryEvent{ + createTestEventWorkflowExecutionStarted(1, &historypb.WorkflowExecutionStartedEventAttributes{TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}}), + createTestEventWorkflowTaskScheduled(2, &historypb.WorkflowTaskScheduledEventAttributes{TaskQueue: &taskqueuepb.TaskQueue{Name: taskQueue}}), + createTestEventWorkflowTaskStarted(3), + } + task := createWorkflowTask(testEvents, 0, "HelloWorld_Workflow") + params := t.getTestWorkerExecutionParams() + params.WorkerBuildID = workerBuildID + taskHandler := newWorkflowTaskHandler(params, nil, t.registry) + request, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil) + response := request.(*workflowservice.RespondWorkflowTaskCompletedRequest) + t.NoError(err) + t.NotNil(response) + t.Equal(workerBuildID, response.WorkerVersioningId.GetWorkerBuildId()) +} + func (t *TaskHandlersTestSuite) TestWorkflowTask_ActivityTaskScheduled() { // Schedule an activity and see if we complete workflow. taskQueue := "tq1" diff --git a/internal/internal_task_pollers.go b/internal/internal_task_pollers.go index 116c53a03..5b94f665c 100644 --- a/internal/internal_task_pollers.go +++ b/internal/internal_task_pollers.go @@ -71,6 +71,8 @@ type ( basePoller struct { metricsHandler metrics.Handler // base metric handler used for rpc calls stopC <-chan struct{} + // Is set if this worker has opted in to build-id based versioning + workerBuildID string } // workflowTaskPoller implements polling/processing a workflow task @@ -224,7 +226,11 @@ func newWorkflowTaskPoller( params workerExecutionParameters, ) *workflowTaskPoller { return &workflowTaskPoller{ - basePoller: basePoller{metricsHandler: params.MetricsHandler, stopC: params.WorkerStopChannel}, + basePoller: basePoller{ + metricsHandler: params.MetricsHandler, + stopC: params.WorkerStopChannel, + workerBuildID: params.WorkerBuildID, + }, service: service, namespace: params.Namespace, taskQueueName: params.TaskQueue, @@ -340,7 +346,7 @@ func (wtp *workflowTaskPoller) RespondTaskCompletedWithMetrics( tagAttempt, task.Attempt, tagError, taskErr) // convert err to WorkflowTaskFailed - completedRequest = errorToFailWorkflowTask(task.TaskToken, taskErr, wtp.identity, wtp.dataConverter, wtp.namespace) + completedRequest = wtp.errorToFailWorkflowTask(task.TaskToken, taskErr) } metricsHandler.Timer(metrics.WorkflowTaskExecutionLatency).Record(time.Since(startTime)) @@ -401,6 +407,35 @@ func (wtp *workflowTaskPoller) RespondTaskCompleted(completedRequest interface{} return } +func (wtp *workflowTaskPoller) errorToFailWorkflowTask(taskToken []byte, err error) *workflowservice.RespondWorkflowTaskFailedRequest { + cause := enumspb.WORKFLOW_TASK_FAILED_CAUSE_WORKFLOW_WORKER_UNHANDLED_FAILURE + // If it was a panic due to a bad state machine or if it was a history + // mismatch error, mark as non-deterministic + if panicErr, _ := err.(*workflowPanicError); panicErr != nil { + if _, badStateMachine := panicErr.value.(stateMachineIllegalStatePanic); badStateMachine { + cause = enumspb.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR + } + } else if _, mismatch := err.(historyMismatchError); mismatch { + cause = enumspb.WORKFLOW_TASK_FAILED_CAUSE_NON_DETERMINISTIC_ERROR + } + + return &workflowservice.RespondWorkflowTaskFailedRequest{ + TaskToken: taskToken, + Cause: cause, + Failure: ConvertErrorToFailure(err, wtp.dataConverter), + Identity: wtp.identity, + BinaryChecksum: wtp.getBuildID(), + Namespace: wtp.namespace, + } +} + +func (bp *basePoller) getBuildID() string { + if bp.workerBuildID == "" { + return getBinaryChecksum() + } + return bp.workerBuildID +} + func newLocalActivityPoller( params workerExecutionParameters, laTunnel *localActivityTunnel, @@ -593,10 +628,11 @@ func (wtp *workflowTaskPoller) updateBacklog(taskQueueKind enumspb.TaskQueueKind // getNextPollRequest returns appropriate next poll request based on poller configuration. // Simple rules: -// 1) if sticky execution is disabled, always poll for regular task queue -// 2) otherwise: -// 2.1) if sticky task queue has backlog, always prefer to process sticky task first -// 2.2) poll from the task queue that has less pending requests (prefer sticky when they are the same). +// 1. if sticky execution is disabled, always poll for regular task queue +// 2. otherwise: +// 2.1) if sticky task queue has backlog, always prefer to process sticky task first +// 2.2) poll from the task queue that has less pending requests (prefer sticky when they are the same). +// // TODO: make this more smart to auto adjust based on poll latency func (wtp *workflowTaskPoller) getNextPollRequest() (request *workflowservice.PollWorkflowTaskQueueRequest) { taskQueueName := wtp.taskQueueName @@ -617,12 +653,16 @@ func (wtp *workflowTaskPoller) getNextPollRequest() (request *workflowservice.Po Name: taskQueueName, Kind: taskQueueKind, } - return &workflowservice.PollWorkflowTaskQueueRequest{ + builtRequest := &workflowservice.PollWorkflowTaskQueueRequest{ Namespace: wtp.namespace, TaskQueue: taskQueue, Identity: wtp.identity, - BinaryChecksum: getBinaryChecksum(), + BinaryChecksum: wtp.getBuildID(), + } + if wtp.workerBuildID != "" { + builtRequest.WorkerVersioningId = &taskqueuepb.VersionId{WorkerBuildId: wtp.workerBuildID} } + return builtRequest } // Poll for a single workflow task from the service @@ -769,7 +809,11 @@ func newGetHistoryPageFunc( func newActivityTaskPoller(taskHandler ActivityTaskHandler, service workflowservice.WorkflowServiceClient, params workerExecutionParameters) *activityTaskPoller { return &activityTaskPoller{ - basePoller: basePoller{metricsHandler: params.MetricsHandler, stopC: params.WorkerStopChannel}, + basePoller: basePoller{ + metricsHandler: params.MetricsHandler, + stopC: params.WorkerStopChannel, + workerBuildID: params.WorkerBuildID, + }, taskHandler: taskHandler, service: service, namespace: params.Namespace, @@ -791,6 +835,9 @@ func (atp *activityTaskPoller) poll(ctx context.Context) (interface{}, error) { Identity: atp.identity, TaskQueueMetadata: &taskqueuepb.TaskQueueMetadata{MaxTasksPerSecond: &types.DoubleValue{Value: atp.activitiesPerSecond}}, } + if atp.workerBuildID != "" { + request.WorkerVersioningId = &taskqueuepb.VersionId{WorkerBuildId: atp.workerBuildID} + } response, err := atp.service.PollActivityTaskQueue(ctx, request) if err != nil { diff --git a/internal/internal_worker.go b/internal/internal_worker.go index 06a3cb77e..c70214ced 100644 --- a/internal/internal_worker.go +++ b/internal/internal_worker.go @@ -163,6 +163,9 @@ type ( // a default option. Identity string + // The worker's build ID used for versioning, if one was set. + WorkerBuildID string + MetricsHandler metrics.Handler Logger log.Logger @@ -1374,6 +1377,7 @@ func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options Worke ConcurrentWorkflowTaskExecutionSize: options.MaxConcurrentWorkflowTaskExecutionSize, MaxConcurrentWorkflowTaskQueuePollers: options.MaxConcurrentWorkflowTaskPollers, Identity: client.identity, + WorkerBuildID: options.BuildIDForVersioning, MetricsHandler: client.metricsHandler.WithTags(metrics.TaskQueueTags(taskQueue)), Logger: client.logger, EnableLoggingInReplay: options.EnableLoggingInReplay, diff --git a/internal/internal_workflow_client.go b/internal/internal_workflow_client.go index a62763436..f302782e1 100644 --- a/internal/internal_workflow_client.go +++ b/internal/internal_workflow_client.go @@ -193,9 +193,11 @@ type ( // reaches the end state, such as workflow finished successfully or timeout. // The user can use this to start using a functor like below and get the workflow execution result, as EncodedValue // Either by -// ExecuteWorkflow(options, "workflowTypeName", arg1, arg2, arg3) -// or -// ExecuteWorkflow(options, workflowExecuteFn, arg1, arg2, arg3) +// +// ExecuteWorkflow(options, "workflowTypeName", arg1, arg2, arg3) +// or +// ExecuteWorkflow(options, workflowExecuteFn, arg1, arg2, arg3) +// // The current timeout resolution implementation is in seconds and uses math.Ceil(d.Seconds()) as the duration. But is // subjected to change in the future. // NOTE: the context.Context should have a fairly large timeout, since workflow execution may take a while to be finished @@ -531,10 +533,10 @@ func (wc *WorkflowClient) RecordActivityHeartbeatByID(ctx context.Context, // ListClosedWorkflow gets closed workflow executions based on request filters // The errors it can throw: -// - serviceerror.InvalidArgument -// - serviceerror.Internal -// - serviceerror.Unavailable -// - serviceerror.NamespaceNotFound +// - serviceerror.InvalidArgument +// - serviceerror.Internal +// - serviceerror.Unavailable +// - serviceerror.NamespaceNotFound func (wc *WorkflowClient) ListClosedWorkflow(ctx context.Context, request *workflowservice.ListClosedWorkflowExecutionsRequest) (*workflowservice.ListClosedWorkflowExecutionsResponse, error) { if err := wc.ensureInitialized(); err != nil { return nil, err @@ -554,10 +556,10 @@ func (wc *WorkflowClient) ListClosedWorkflow(ctx context.Context, request *workf // ListOpenWorkflow gets open workflow executions based on request filters // The errors it can throw: -// - serviceerror.InvalidArgument -// - serviceerror.Internal -// - serviceerror.Unavailable -// - serviceerror.NamespaceNotFound +// - serviceerror.InvalidArgument +// - serviceerror.Internal +// - serviceerror.Unavailable +// - serviceerror.NamespaceNotFound func (wc *WorkflowClient) ListOpenWorkflow(ctx context.Context, request *workflowservice.ListOpenWorkflowExecutionsRequest) (*workflowservice.ListOpenWorkflowExecutionsResponse, error) { if err := wc.ensureInitialized(); err != nil { return nil, err @@ -676,10 +678,10 @@ func (wc *WorkflowClient) GetSearchAttributes(ctx context.Context) (*workflowser // DescribeWorkflowExecution returns information about the specified workflow execution. // The errors it can return: -// - serviceerror.InvalidArgument -// - serviceerror.Internal -// - serviceerror.Unavailable -// - serviceerror.NotFound +// - serviceerror.InvalidArgument +// - serviceerror.Internal +// - serviceerror.Unavailable +// - serviceerror.NotFound func (wc *WorkflowClient) DescribeWorkflowExecution(ctx context.Context, workflowID, runID string) (*workflowservice.DescribeWorkflowExecutionResponse, error) { if err := wc.ensureInitialized(); err != nil { return nil, err @@ -709,11 +711,11 @@ func (wc *WorkflowClient) DescribeWorkflowExecution(ctx context.Context, workflo // - queryType is the type of the query. // - args... are the optional query parameters. // The errors it can return: -// - serviceerror.InvalidArgument -// - serviceerror.Internal -// - serviceerror.Unavailable -// - serviceerror.NotFound -// - serviceerror.QueryFailed +// - serviceerror.InvalidArgument +// - serviceerror.Internal +// - serviceerror.Unavailable +// - serviceerror.NotFound +// - serviceerror.QueryFailed func (wc *WorkflowClient) QueryWorkflow(ctx context.Context, workflowID string, runID string, queryType string, args ...interface{}) (converter.EncodedValue, error) { if err := wc.ensureInitialized(); err != nil { return nil, err @@ -771,11 +773,11 @@ type QueryWorkflowWithOptionsResponse struct { // QueryWorkflowWithOptions queries a given workflow execution and returns the query result synchronously. // See QueryWorkflowWithOptionsRequest and QueryWorkflowWithOptionsResult for more information. // The errors it can return: -// - serviceerror.InvalidArgument -// - serviceerror.Internal -// - serviceerror.Unavailable -// - serviceerror.NotFound -// - serviceerror.QueryFailed +// - serviceerror.InvalidArgument +// - serviceerror.Internal +// - serviceerror.Unavailable +// - serviceerror.NotFound +// - serviceerror.QueryFailed func (wc *WorkflowClient) QueryWorkflowWithOptions(ctx context.Context, request *QueryWorkflowWithOptionsRequest) (*QueryWorkflowWithOptionsResponse, error) { if err := wc.ensureInitialized(); err != nil { return nil, err @@ -826,10 +828,10 @@ func (wc *WorkflowClient) QueryWorkflowWithOptions(ctx context.Context, request // - taskqueue name of taskqueue // - taskqueueType type of taskqueue, can be workflow or activity // The errors it can return: -// - serviceerror.InvalidArgument -// - serviceerror.Internal -// - serviceerror.Unavailable -// - serviceerror.NotFound +// - serviceerror.InvalidArgument +// - serviceerror.Internal +// - serviceerror.Unavailable +// - serviceerror.NotFound func (wc *WorkflowClient) DescribeTaskQueue(ctx context.Context, taskQueue string, taskQueueType enumspb.TaskQueueType) (*workflowservice.DescribeTaskQueueResponse, error) { if err := wc.ensureInitialized(); err != nil { return nil, err @@ -873,6 +875,57 @@ func (wc *WorkflowClient) ResetWorkflowExecution(ctx context.Context, request *w return resp, nil } +// UpdateWorkerBuildIDOrdering allows you to update the worker-build-id based version graph for a particular +// task queue. This is used in conjunction with workers who specify their build id and thus opt into the +// feature. For more, see: +func (wc *WorkflowClient) UpdateWorkerBuildIDOrdering(ctx context.Context, options *UpdateWorkerBuildIDOrderingOptions) error { + if err := wc.ensureInitialized(); err != nil { + return err + } + + var previousCompatibleID *taskqueuepb.VersionId + if options.PreviousCompatible != "" { + previousCompatibleID = &taskqueuepb.VersionId{WorkerBuildId: options.PreviousCompatible} + } + request := &workflowservice.UpdateWorkerBuildIdOrderingRequest{ + Namespace: wc.namespace, + TaskQueue: options.TaskQueue, + VersionId: &taskqueuepb.VersionId{WorkerBuildId: options.WorkerBuildID}, + PreviousCompatible: previousCompatibleID, + BecomeDefault: options.BecomeDefault, + } + + grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) + defer cancel() + _, err := wc.workflowService.UpdateWorkerBuildIdOrdering(grpcCtx, request) + return err +} + +// GetWorkerBuildIDOrdering returns the worker-build-id based version graph for a particular task queue. +func (wc *WorkflowClient) GetWorkerBuildIDOrdering(ctx context.Context, options *GetWorkerBuildIDOrderingOptions) (*WorkerBuildIDVersionGraph, error) { + if options.MaxDepth < 0 { + return nil, errors.New("maxDepth must be >= 0") + } + if err := wc.ensureInitialized(); err != nil { + return nil, err + } + + grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) + defer cancel() + + request := &workflowservice.GetWorkerBuildIdOrderingRequest{ + Namespace: wc.namespace, + TaskQueue: options.TaskQueue, + MaxDepth: int32(options.MaxDepth), + } + resp, err := wc.workflowService.GetWorkerBuildIdOrdering(grpcCtx, request) + if err != nil { + return nil, err + } + converted := workerVersionGraphFromProtoResponse(resp) + return converted, nil +} + // CheckHealthRequest is a request for Client.CheckHealth. type CheckHealthRequest struct{} @@ -978,10 +1031,10 @@ func (wc *WorkflowClient) Close() { // Register a namespace with temporal server // The errors it can throw: -// - NamespaceAlreadyExistsError -// - serviceerror.InvalidArgument -// - serviceerror.Internal -// - serviceerror.Unavailable +// - NamespaceAlreadyExistsError +// - serviceerror.InvalidArgument +// - serviceerror.Internal +// - serviceerror.Unavailable func (nc *namespaceClient) Register(ctx context.Context, request *workflowservice.RegisterNamespaceRequest) error { grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) defer cancel() @@ -995,10 +1048,10 @@ func (nc *namespaceClient) Register(ctx context.Context, request *workflowservic // NamespaceConfiguration - Configuration like Workflow Execution Retention Period In Days, Whether to emit metrics. // ReplicationConfiguration - replication config like clusters and active cluster name // The errors it can throw: -// - serviceerror.NamespaceNotFound -// - serviceerror.InvalidArgument -// - serviceerror.Internal -// - serviceerror.Unavailable +// - serviceerror.NamespaceNotFound +// - serviceerror.InvalidArgument +// - serviceerror.Internal +// - serviceerror.Unavailable func (nc *namespaceClient) Describe(ctx context.Context, namespace string) (*workflowservice.DescribeNamespaceResponse, error) { request := &workflowservice.DescribeNamespaceRequest{ Namespace: namespace, @@ -1015,10 +1068,10 @@ func (nc *namespaceClient) Describe(ctx context.Context, namespace string) (*wor // Update a namespace. // The errors it can throw: -// - serviceerror.NamespaceNotFound -// - serviceerror.InvalidArgument -// - serviceerror.Internal -// - serviceerror.Unavailable +// - serviceerror.NamespaceNotFound +// - serviceerror.InvalidArgument +// - serviceerror.Internal +// - serviceerror.Unavailable func (nc *namespaceClient) Update(ctx context.Context, request *workflowservice.UpdateNamespaceRequest) error { grpcCtx, cancel := newGRPCContext(ctx, defaultGrpcRetryParameters(ctx)) defer cancel() diff --git a/internal/worker.go b/internal/worker.go index e0233e1a8..6c8240f35 100644 --- a/internal/worker.go +++ b/internal/worker.go @@ -225,6 +225,13 @@ type ( // and aliased names when not using string names when executing child // workflow or activities. DisableRegistrationAliasing bool + + // **IN DEVELOPMENT** + // Optional: If set, opts this worker into the worker build id based versioning + // feature. It will only operate on workflows it claims to be compatible with. + // Additionally, if this is set it will serve as the binary checksum for the worker. + // For more, see: + BuildIDForVersioning string } ) @@ -257,7 +264,9 @@ func IsReplayNamespace(dn string) bool { // NewWorker creates an instance of worker for managing workflow and activity executions. // client - client created with client.Dial() or client.NewLazyClient(). // taskQueue - is the task queue name you use to identify your client worker, also -// identifies group of workflow and activity implementations that are hosted by a single worker process. +// +// identifies group of workflow and activity implementations that are hosted by a single worker process. +// // options - configure any worker specific options. func NewWorker( client Client, diff --git a/internal/worker_version_graph.go b/internal/worker_version_graph.go new file mode 100644 index 000000000..48e70dfa7 --- /dev/null +++ b/internal/worker_version_graph.go @@ -0,0 +1,95 @@ +// The MIT License +// +// Copyright (c) 2022 Temporal Technologies Inc. All rights reserved. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package internal + +import ( + taskqueuepb "go.temporal.io/api/taskqueue/v1" + "go.temporal.io/api/workflowservice/v1" +) + +// UpdateWorkerBuildIDOrderingOptions is the input to Client.UpdateWorkerBuildIDOrdering. +type UpdateWorkerBuildIDOrderingOptions struct { + // The task queue to update the version graph of. + TaskQueue string + // Required, indicates the build id being added (or changed) to/in the graph. + WorkerBuildID string + // May be empty, and if set, indicates an existing version the new id should be considered compatible with. + PreviousCompatible string + // If true, this new id will become the default version for new workflow executions. + BecomeDefault bool +} + +type GetWorkerBuildIDOrderingOptions struct { + TaskQueue string + MaxDepth int +} + +// WorkerBuildIDVersionGraph is the response for Client.GetWorkerBuildIdOrdering and represents the graph +// of worker build id based versions. +type WorkerBuildIDVersionGraph struct { + // The currently established default version + Default *WorkerVersionIDNode + // Other current latest-compatible versions which are not the overall default + CompatibleLeaves []*WorkerVersionIDNode +} + +// WorkerVersionIDNode is a single node in a worker version graph. +type WorkerVersionIDNode struct { + WorkerBuildID string + // A pointer to the previous version this version is considered to be compatible with + PreviousCompatible *WorkerVersionIDNode + // A pointer to the previous incompatible version (previous major version) + PreviousIncompatible *WorkerVersionIDNode +} + +func workerVersionGraphFromProtoResponse(response *workflowservice.GetWorkerBuildIdOrderingResponse) *WorkerBuildIDVersionGraph { + if response == nil { + return nil + } + return &WorkerBuildIDVersionGraph{ + Default: workerVersionNodeFromProto(response.CurrentDefault), + CompatibleLeaves: workerVersionNodesFromProto(response.CompatibleLeaves), + } +} + +func workerVersionNodeFromProto(node *taskqueuepb.VersionIdNode) *WorkerVersionIDNode { + if node == nil { + return nil + } + return &WorkerVersionIDNode{ + WorkerBuildID: node.GetVersion().GetWorkerBuildId(), + PreviousCompatible: workerVersionNodeFromProto(node.PreviousCompatible), + PreviousIncompatible: workerVersionNodeFromProto(node.PreviousIncompatible), + } +} + +func workerVersionNodesFromProto(nodes []*taskqueuepb.VersionIdNode) []*WorkerVersionIDNode { + if len(nodes) == 0 { + return nil + } + result := make([]*WorkerVersionIDNode, len(nodes)) + for i, node := range nodes { + result[i] = workerVersionNodeFromProto(node) + } + return result +} diff --git a/internal/worker_version_graph_test.go b/internal/worker_version_graph_test.go new file mode 100644 index 000000000..68aad5ed6 --- /dev/null +++ b/internal/worker_version_graph_test.go @@ -0,0 +1,93 @@ +// The MIT License +// +// Copyright (c) 2022 Temporal Technologies Inc. All rights reserved. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package internal + +import ( + "testing" + + "github.com/stretchr/testify/assert" + taskqueuepb "go.temporal.io/api/taskqueue/v1" + "go.temporal.io/api/workflowservice/v1" +) + +func Test_WorkerVersionGraph_fromProtoResponse(t *testing.T) { + tests := []struct { + name string + response *workflowservice.GetWorkerBuildIdOrderingResponse + want *WorkerBuildIDVersionGraph + }{ + { + name: "nil response", + response: nil, + want: nil, + }, + { + name: "normal graph", + response: &workflowservice.GetWorkerBuildIdOrderingResponse{ + CurrentDefault: &taskqueuepb.VersionIdNode{ + Version: &taskqueuepb.VersionId{ + WorkerBuildId: "2.0", + }, + PreviousIncompatible: &taskqueuepb.VersionIdNode{ + Version: &taskqueuepb.VersionId{ + WorkerBuildId: "1.0", + }, + }, + }, + CompatibleLeaves: []*taskqueuepb.VersionIdNode{ + { + Version: &taskqueuepb.VersionId{ + WorkerBuildId: "1.1", + }, + PreviousCompatible: &taskqueuepb.VersionIdNode{ + Version: &taskqueuepb.VersionId{ + WorkerBuildId: "1.0", + }, + }, + }, + }, + }, + want: &WorkerBuildIDVersionGraph{ + Default: &WorkerVersionIDNode{ + WorkerBuildID: "2.0", + PreviousIncompatible: &WorkerVersionIDNode{ + WorkerBuildID: "1.0", + }, + }, + CompatibleLeaves: []*WorkerVersionIDNode{ + { + WorkerBuildID: "1.1", + PreviousCompatible: &WorkerVersionIDNode{ + WorkerBuildID: "1.0", + }, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + assert.Equalf(t, tt.want, workerVersionGraphFromProtoResponse(tt.response), "workerVersionGraphFromProtoResponse(%v)", tt.response) + }) + } +} diff --git a/internal/workflow.go b/internal/workflow.go index b0ed0b722..907dc82f6 100644 --- a/internal/workflow.go +++ b/internal/workflow.go @@ -513,16 +513,20 @@ func (wc *workflowEnvironmentInterceptor) Init(outbound WorkflowOutboundIntercep // Context can be used to pass the settings for this activity. // For example: task queue that this need to be routed, timeouts that need to be configured. // Use ActivityOptions to pass down the options. -// ao := ActivityOptions{ -// TaskQueue: "exampleTaskQueue", -// ScheduleToStartTimeout: 10 * time.Second, -// StartToCloseTimeout: 5 * time.Second, -// ScheduleToCloseTimeout: 10 * time.Second, -// HeartbeatTimeout: 0, -// } -// ctx := WithActivityOptions(ctx, ao) +// +// ao := ActivityOptions{ +// TaskQueue: "exampleTaskQueue", +// ScheduleToStartTimeout: 10 * time.Second, +// StartToCloseTimeout: 5 * time.Second, +// ScheduleToCloseTimeout: 10 * time.Second, +// HeartbeatTimeout: 0, +// } +// ctx := WithActivityOptions(ctx, ao) +// // Or to override a single option -// ctx := WithTaskQueue(ctx, "exampleTaskQueue") +// +// ctx := WithTaskQueue(ctx, "exampleTaskQueue") +// // Input activity is either an activity name (string) or a function representing an activity that is getting scheduled. // Input args are the arguments that need to be passed to the scheduled activity. // @@ -630,10 +634,12 @@ func (wc *workflowEnvironmentInterceptor) ExecuteActivity(ctx Context, typeName // // Context can be used to pass the settings for this local activity. // For now there is only one setting for timeout to be set: -// lao := LocalActivityOptions{ -// ScheduleToCloseTimeout: 5 * time.Second, -// } -// ctx := WithLocalActivityOptions(ctx, lao) +// +// lao := LocalActivityOptions{ +// ScheduleToCloseTimeout: 5 * time.Second, +// } +// ctx := WithLocalActivityOptions(ctx, lao) +// // The timeout here should be relative shorter than the WorkflowTaskTimeout of the workflow. If you need a // longer timeout, you probably should not use local activity and instead should use regular activity. Local activity is // designed to be used for short living activities (usually finishes within seconds). @@ -822,11 +828,13 @@ func (wc *workflowEnvironmentInterceptor) scheduleLocalActivity(ctx Context, par // Context can be used to pass the settings for the child workflow. // For example: task queue that this child workflow should be routed, timeouts that need to be configured. // Use ChildWorkflowOptions to pass down the options. -// cwo := ChildWorkflowOptions{ -// WorkflowExecutionTimeout: 10 * time.Minute, -// WorkflowTaskTimeout: time.Minute, -// } -// ctx := WithChildWorkflowOptions(ctx, cwo) +// +// cwo := ChildWorkflowOptions{ +// WorkflowExecutionTimeout: 10 * time.Minute, +// WorkflowTaskTimeout: time.Minute, +// } +// ctx := WithChildWorkflowOptions(ctx, cwo) +// // Input childWorkflow is either a workflow name or a workflow function that is getting scheduled. // Input args are the arguments that need to be passed to the child workflow function represented by childWorkflow. // @@ -954,7 +962,11 @@ type WorkflowInfo struct { Memo *commonpb.Memo // Value can be decoded using data converter (defaultDataConverter, or custom one if set). SearchAttributes *commonpb.SearchAttributes // Value can be decoded using defaultDataConverter. RetryPolicy *RetryPolicy - BinaryChecksum string + // BinaryChecksum represents the value persisted by the last worker to complete a task in this workflow. It may be + // an explicitly set or implicitly derived binary checksum of the worker binary, or, if this worker has opted into + // build-id based versioning, is the explicitly set worker build id. If this is the first worker to operate on the + // workflow, it is this worker's current value. + BinaryChecksum string currentHistoryLength int } @@ -1082,7 +1094,9 @@ func (wc *workflowEnvironmentInterceptor) Sleep(ctx Context, d time.Duration) (e // then the currently running instance of that workflowID will be used. // By default, the current workflow's namespace will be used as target namespace. However, you can specify a different namespace // of the target workflow using the context like: +// // ctx := WithWorkflowNamespace(ctx, "namespace") +// // RequestCancelExternalWorkflow return Future with failure or empty success result. func RequestCancelExternalWorkflow(ctx Context, workflowID, runID string) Future { i := getWorkflowOutboundInterceptor(ctx) @@ -1119,7 +1133,9 @@ func (wc *workflowEnvironmentInterceptor) RequestCancelExternalWorkflow(ctx Cont // then the currently running instance of that workflowID will be used. // By default, the current workflow's namespace will be used as target namespace. However, you can specify a different namespace // of the target workflow using the context like: +// // ctx := WithWorkflowNamespace(ctx, "namespace") +// // SignalExternalWorkflow return Future with failure or empty success result. func SignalExternalWorkflow(ctx Context, workflowID, runID, signalName string, arg interface{}) Future { i := getWorkflowOutboundInterceptor(ctx) @@ -1188,25 +1204,29 @@ func signalExternalWorkflow(ctx Context, workflowID, runID, signalName string, a // The value has to deterministic when replay; // The value has to be Json serializable. // UpsertSearchAttributes will merge attributes to existing map in workflow, for example workflow code: -// func MyWorkflow(ctx workflow.Context, input string) error { -// attr1 := map[string]interface{}{ -// "CustomIntField": 1, -// "CustomBoolField": true, -// } -// workflow.UpsertSearchAttributes(ctx, attr1) -// -// attr2 := map[string]interface{}{ -// "CustomIntField": 2, -// "CustomKeywordField": "seattle", -// } -// workflow.UpsertSearchAttributes(ctx, attr2) -// } +// +// func MyWorkflow(ctx workflow.Context, input string) error { +// attr1 := map[string]interface{}{ +// "CustomIntField": 1, +// "CustomBoolField": true, +// } +// workflow.UpsertSearchAttributes(ctx, attr1) +// +// attr2 := map[string]interface{}{ +// "CustomIntField": 2, +// "CustomKeywordField": "seattle", +// } +// workflow.UpsertSearchAttributes(ctx, attr2) +// } +// // will eventually have search attributes: -// map[string]interface{}{ -// "CustomIntField": 2, -// "CustomBoolField": true, -// "CustomKeywordField": "seattle", -// } +// +// map[string]interface{}{ +// "CustomIntField": 2, +// "CustomBoolField": true, +// "CustomKeywordField": "seattle", +// } +// // This is only supported when using ElasticSearch. func UpsertSearchAttributes(ctx Context, attributes map[string]interface{}) error { i := getWorkflowOutboundInterceptor(ctx) @@ -1403,33 +1423,36 @@ func (b EncodedValue) HasValue() bool { // // Caution: do not use SideEffect to modify closures. Always retrieve result from SideEffect's encoded return value. // For example this code is BROKEN: -// // Bad example: -// var random int -// workflow.SideEffect(func(ctx workflow.Context) interface{} { -// random = rand.Intn(100) -// return nil -// }) -// // random will always be 0 in replay, thus this code is non-deterministic -// if random < 50 { -// .... -// } else { -// .... -// } +// +// // Bad example: +// var random int +// workflow.SideEffect(func(ctx workflow.Context) interface{} { +// random = rand.Intn(100) +// return nil +// }) +// // random will always be 0 in replay, thus this code is non-deterministic +// if random < 50 { +// .... +// } else { +// .... +// } +// // On replay the provided function is not executed, the random will always be 0, and the workflow could takes a // different path breaking the determinism. // // Here is the correct way to use SideEffect: -// // Good example: -// encodedRandom := SideEffect(func(ctx workflow.Context) interface{} { -// return rand.Intn(100) -// }) -// var random int -// encodedRandom.Get(&random) -// if random < 50 { -// .... -// } else { -// .... -// } +// +// // Good example: +// encodedRandom := SideEffect(func(ctx workflow.Context) interface{} { +// return rand.Intn(100) +// }) +// var random int +// encodedRandom.Get(&random) +// if random < 50 { +// .... +// } else { +// .... +// } func SideEffect(ctx Context, f func(ctx Context) interface{}) converter.EncodedValue { i := getWorkflowOutboundInterceptor(ctx) return i.SideEffect(ctx, f) @@ -1456,8 +1479,9 @@ func (wc *workflowEnvironmentInterceptor) SideEffect(ctx Context, f func(ctx Con // MutableSideEffect executes the provided function once, then it looks up the history for the value with the given id. // If there is no existing value, then it records the function result as a value with the given id on history; // otherwise, it compares whether the existing value from history has changed from the new function result by calling -// the provided equals function. If they are equal, it returns the value without recording a new one in history; -// otherwise, it records the new value with the same id on history. +// theprovided equals function. If they are equal, it returns the value without recording a new one in history; +// +// otherwise, it records the new value with the same id on history. // // Caution: do not use MutableSideEffect to modify closures. Always retrieve result from MutableSideEffect's encoded // return value. @@ -1494,38 +1518,46 @@ const TemporalChangeVersion = "TemporalChangeVersion" // workflow history as a marker event. Even if maxSupported version is changed the version that was recorded is // returned on replay. DefaultVersion constant contains version of code that wasn't versioned before. // For example initially workflow has the following code: -// err = workflow.ExecuteActivity(ctx, foo).Get(ctx, nil) +// +// err = workflow.ExecuteActivity(ctx, foo).Get(ctx, nil) +// // it should be updated to -// err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil) +// +// err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil) +// // The backwards compatible way to execute the update is -// v := GetVersion(ctx, "fooChange", DefaultVersion, 1) -// if v == DefaultVersion { -// err = workflow.ExecuteActivity(ctx, foo).Get(ctx, nil) -// } else { -// err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil) -// } +// +// v := GetVersion(ctx, "fooChange", DefaultVersion, 1) +// if v == DefaultVersion { +// err = workflow.ExecuteActivity(ctx, foo).Get(ctx, nil) +// } else { +// err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil) +// } // // Then bar has to be changed to baz: -// v := GetVersion(ctx, "fooChange", DefaultVersion, 2) -// if v == DefaultVersion { -// err = workflow.ExecuteActivity(ctx, foo).Get(ctx, nil) -// } else if v == 1 { -// err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil) -// } else { -// err = workflow.ExecuteActivity(ctx, baz).Get(ctx, nil) -// } +// +// v := GetVersion(ctx, "fooChange", DefaultVersion, 2) +// if v == DefaultVersion { +// err = workflow.ExecuteActivity(ctx, foo).Get(ctx, nil) +// } else if v == 1 { +// err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil) +// } else { +// err = workflow.ExecuteActivity(ctx, baz).Get(ctx, nil) +// } // // Later when there are no workflow executions running DefaultVersion the correspondent branch can be removed: -// v := GetVersion(ctx, "fooChange", 1, 2) -// if v == 1 { -// err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil) -// } else { -// err = workflow.ExecuteActivity(ctx, baz).Get(ctx, nil) -// } +// +// v := GetVersion(ctx, "fooChange", 1, 2) +// if v == 1 { +// err = workflow.ExecuteActivity(ctx, bar).Get(ctx, nil) +// } else { +// err = workflow.ExecuteActivity(ctx, baz).Get(ctx, nil) +// } // // It is recommended to keep the GetVersion() call even if single branch is left: -// GetVersion(ctx, "fooChange", 2, 2) -// err = workflow.ExecuteActivity(ctx, baz).Get(ctx, nil) +// +// GetVersion(ctx, "fooChange", 2, 2) +// err = workflow.ExecuteActivity(ctx, baz).Get(ctx, nil) // // The reason to keep it is: 1) it ensures that if there is older version execution still running, it will fail here // and not proceed; 2) if you ever need to make more changes for “fooChange”, for example change activity from baz to qux, @@ -1537,12 +1569,12 @@ const TemporalChangeVersion = "TemporalChangeVersion" // as changeID. If you ever need to make changes to that same part like change from baz to qux, you would need to use a // different changeID like “fooChange-fix2”, and start minVersion from DefaultVersion again. The code would looks like: // -// v := workflow.GetVersion(ctx, "fooChange-fix2", workflow.DefaultVersion, 1) -// if v == workflow.DefaultVersion { -// err = workflow.ExecuteActivity(ctx, baz, data).Get(ctx, nil) -// } else { -// err = workflow.ExecuteActivity(ctx, qux, data).Get(ctx, nil) -// } +// v := workflow.GetVersion(ctx, "fooChange-fix2", workflow.DefaultVersion, 1) +// if v == workflow.DefaultVersion { +// err = workflow.ExecuteActivity(ctx, baz, data).Get(ctx, nil) +// } else { +// err = workflow.ExecuteActivity(ctx, qux, data).Get(ctx, nil) +// } func GetVersion(ctx Context, changeID string, minSupported, maxSupported Version) Version { i := getWorkflowOutboundInterceptor(ctx) return i.GetVersion(ctx, changeID, minSupported, maxSupported) @@ -1563,33 +1595,34 @@ func (wc *workflowEnvironmentInterceptor) GetVersion(ctx Context, changeID strin // Channel.Get() or Future.Get(). Trying to do so in query handler code will fail the query and client will receive // QueryFailedError. // Example of workflow code that support query type "current_state": -// func MyWorkflow(ctx workflow.Context, input string) error { -// currentState := "started" // this could be any serializable struct -// err := workflow.SetQueryHandler(ctx, "current_state", func() (string, error) { -// return currentState, nil -// }) -// if err != nil { -// currentState = "failed to register query handler" -// return err -// } -// // your normal workflow code begins here, and you update the currentState as the code makes progress. -// currentState = "waiting timer" -// err = NewTimer(ctx, time.Hour).Get(ctx, nil) -// if err != nil { -// currentState = "timer failed" -// return err -// } -// -// currentState = "waiting activity" -// ctx = WithActivityOptions(ctx, myActivityOptions) -// err = ExecuteActivity(ctx, MyActivity, "my_input").Get(ctx, nil) -// if err != nil { -// currentState = "activity failed" -// return err -// } -// currentState = "done" -// return nil -// } +// +// func MyWorkflow(ctx workflow.Context, input string) error { +// currentState := "started" // this could be any serializable struct +// err := workflow.SetQueryHandler(ctx, "current_state", func() (string, error) { +// return currentState, nil +// }) +// if err != nil { +// currentState = "failed to register query handler" +// return err +// } +// // your normal workflow code begins here, and you update the currentState as the code makes progress. +// currentState = "waiting timer" +// err = NewTimer(ctx, time.Hour).Get(ctx, nil) +// if err != nil { +// currentState = "timer failed" +// return err +// } +// +// currentState = "waiting activity" +// ctx = WithActivityOptions(ctx, myActivityOptions) +// err = ExecuteActivity(ctx, MyActivity, "my_input").Get(ctx, nil) +// if err != nil { +// currentState = "activity failed" +// return err +// } +// currentState = "done" +// return nil +// } func SetQueryHandler(ctx Context, queryType string, handler interface{}) error { i := getWorkflowOutboundInterceptor(ctx) return i.SetQueryHandler(ctx, queryType, handler) diff --git a/mocks/Client.go b/mocks/Client.go index ba22fe970..5db1dc7c4 100644 --- a/mocks/Client.go +++ b/mocks/Client.go @@ -37,6 +37,7 @@ import ( "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" "go.temporal.io/sdk/converter" + "go.temporal.io/sdk/internal" ) // Client is an autogenerated mock type for the Client type @@ -542,6 +543,43 @@ func (_m *Client) CheckHealth(ctx context.Context, request *client.CheckHealthRe return r0, r1 } +// UpdateWorkerBuildIDOrdering provides a mock function with given fields: ctx, taskQueue, workerBuildId, previousCompatible, becomeDefault +func (_m *Client) UpdateWorkerBuildIDOrdering(ctx context.Context, options *client.UpdateWorkerBuildIDOrderingOptions) error { + ret := _m.Called(ctx, options) + + var r0 error + if rf, ok := ret.Get(1).(func(context.Context, *client.UpdateWorkerBuildIDOrderingOptions) error); ok { + r0 = rf(ctx, options) + } else { + r0 = ret.Error(1) + } + + return r0 +} + +// GetWorkerBuildIDOrdering provides a mock function with given fields: ctx, request +func (_m *Client) GetWorkerBuildIDOrdering(ctx context.Context, options *client.GetWorkerBuildIDOrderingOptions) (*client.WorkerBuildIDVersionGraph, error) { + ret := _m.Called(ctx, options) + + var r0 *internal.WorkerBuildIDVersionGraph + if rf, ok := ret.Get(0).(func(context.Context, *client.GetWorkerBuildIDOrderingOptions) *client.WorkerBuildIDVersionGraph); ok { + r0 = rf(ctx, options) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*internal.WorkerBuildIDVersionGraph) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *client.GetWorkerBuildIDOrderingOptions) error); ok { + r1 = rf(ctx, options) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // WorkflowService provides a mock function with given fields: func (_m *Client) WorkflowService() workflowservice.WorkflowServiceClient { ret := _m.Called() diff --git a/test/bindings_test.go b/test/bindings_test.go index d31afc468..0f31d31b5 100644 --- a/test/bindings_test.go +++ b/test/bindings_test.go @@ -26,19 +26,11 @@ package test_test import ( "context" - "fmt" "testing" - "time" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" - enumspb "go.temporal.io/api/enums/v1" - "go.temporal.io/api/serviceerror" - "go.temporal.io/api/workflowservice/v1" - "go.temporal.io/sdk/client" - "go.temporal.io/sdk/internal/common" - ilog "go.temporal.io/sdk/internal/log" "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" ) @@ -46,106 +38,19 @@ import ( type AsyncBindingsTestSuite struct { *require.Assertions suite.Suite - config Config - client client.Client + ConfigAndClientSuiteBase worker worker.Worker taskQueueName string } -func SimplestWorkflow(ctx workflow.Context) error { - return nil -} - func TestAsyncBindingsTestSuite(t *testing.T) { suite.Run(t, new(AsyncBindingsTestSuite)) } func (ts *AsyncBindingsTestSuite) SetupSuite() { ts.Assertions = require.New(ts.T()) - ts.config = NewConfig() - var err error - ts.client, err = client.Dial(client.Options{ - HostPort: ts.config.ServiceAddr, - Namespace: ts.config.Namespace, - Logger: ilog.NewDefaultLogger(), - ConnectionOptions: client.ConnectionOptions{TLS: ts.config.TLS}, - }) - ts.NoError(err) - if ts.config.ShouldRegisterNamespace { - ts.registerNamespace() - } -} - -func (ts *AsyncBindingsTestSuite) registerNamespace() { - client, err := client.NewNamespaceClient(client.Options{ - HostPort: ts.config.ServiceAddr, - ConnectionOptions: client.ConnectionOptions{TLS: ts.config.TLS}, - }) - ts.NoError(err) - ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) - defer cancel() - err = client.Register(ctx, &workflowservice.RegisterNamespaceRequest{ - Namespace: ts.config.Namespace, - WorkflowExecutionRetentionPeriod: common.DurationPtr(1 * 24 * time.Hour), - }) - defer client.Close() - if _, ok := err.(*serviceerror.NamespaceAlreadyExists); ok { - return - } - ts.NoError(err) - time.Sleep(namespaceCacheRefreshInterval) // wait for namespace cache refresh on temporal-server - // bellow is used to guarantee namespace is ready - var dummyReturn string - err = ts.executeWorkflow("test-namespace-exist", SimplestWorkflow, &dummyReturn) - numOfRetry := 20 - for err != nil && numOfRetry >= 0 { - if _, ok := err.(*serviceerror.NamespaceNotFound); ok { - time.Sleep(namespaceCacheRefreshInterval) - err = ts.executeWorkflow("test-namespace-exist", SimplestWorkflow, &dummyReturn) - } else { - break - } - numOfRetry-- - } -} - -// executeWorkflow executes a given workflow and waits for the result -func (ts *AsyncBindingsTestSuite) executeWorkflow( - wfID string, wfFunc interface{}, retValPtr interface{}, args ...interface{}) error { - options := ts.startWorkflowOptions(wfID) - return ts.executeWorkflowWithOption(options, wfFunc, retValPtr, args...) -} - -func (ts *AsyncBindingsTestSuite) executeWorkflowWithOption( - options client.StartWorkflowOptions, wfFunc interface{}, retValPtr interface{}, args ...interface{}) error { - ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) - defer cancel() - run, err := ts.client.ExecuteWorkflow(ctx, options, wfFunc, args...) - if err != nil { - return err - } - err = run.Get(ctx, retValPtr) - if ts.config.Debug { - iter := ts.client.GetWorkflowHistory(ctx, options.ID, run.GetRunID(), false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) - for iter.HasNext() { - event, err1 := iter.Next() - if err1 != nil { - break - } - fmt.Println(event.String()) - } - } - return err -} - -func (ts *AsyncBindingsTestSuite) startWorkflowOptions(wfID string) client.StartWorkflowOptions { - return client.StartWorkflowOptions{ - ID: wfID, - TaskQueue: ts.taskQueueName, - WorkflowExecutionTimeout: 15 * time.Second, - WorkflowTaskTimeout: time.Second, - WorkflowIDReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, - } + ts.NoError(ts.InitConfigAndNamespace()) + ts.NoError(ts.InitClient()) } func (ts *AsyncBindingsTestSuite) TearDownSuite() { diff --git a/test/integration_test.go b/test/integration_test.go index e0f1cfa89..32b40240d 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -82,13 +82,11 @@ const ( type IntegrationTestSuite struct { *require.Assertions suite.Suite - config Config - client client.Client + ConfigAndClientSuiteBase activities *Activities workflows *Workflows worker worker.Worker workerStopped bool - taskQueueName string tracer *tracingInterceptor inboundSignalInterceptor *signalInterceptor trafficController *test.SimpleTrafficController @@ -106,13 +104,9 @@ func TestIntegrationSuite(t *testing.T) { func (ts *IntegrationTestSuite) SetupSuite() { ts.Assertions = require.New(ts.T()) - ts.config = NewConfig() ts.activities = newActivities() ts.workflows = &Workflows{} - ts.NoError(WaitForTCP(time.Minute, ts.config.ServiceAddr)) - if ts.config.ShouldRegisterNamespace { - ts.registerNamespace() - } + ts.NoError(ts.InitConfigAndNamespace()) } func (ts *IntegrationTestSuite) TearDownSuite() { @@ -1312,7 +1306,6 @@ func (ts *IntegrationTestSuite) TestEndToEndLatencyMetrics() { ts.NotNil(nonLocal) ts.Equal(prevNonLocalValue, nonLocal.Value()) } - func (ts *IntegrationTestSuite) TestGracefulActivityCompletion() { // FYI, setup of this test allows the worker to wait to stop for 10 seconds ctx, cancel := context.WithCancel(context.Background()) @@ -1454,7 +1447,6 @@ func (ts *IntegrationTestSuite) TestInterceptorCalls() { "WorkflowOutboundInterceptor.RequestCancelExternalWorkflow": {}, "WorkflowOutboundInterceptor.SignalExternalWorkflow": {}, "WorkflowOutboundInterceptor.UpsertSearchAttributes": {}, - "WorkflowOutboundInterceptor.UpsertMemo": {}, "WorkflowOutboundInterceptor.GetSignalChannel": {}, "WorkflowOutboundInterceptor.SideEffect": {}, "WorkflowOutboundInterceptor.MutableSideEffect": {}, @@ -2393,40 +2385,6 @@ func (ts *IntegrationTestSuite) TestReplayerWithInterceptor() { workflow.Execution{ID: run.GetID(), RunID: run.GetRunID()})) } -func (ts *IntegrationTestSuite) registerNamespace() { - client, err := client.NewNamespaceClient(client.Options{ - HostPort: ts.config.ServiceAddr, - ConnectionOptions: client.ConnectionOptions{TLS: ts.config.TLS}, - }) - ts.NoError(err) - ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) - defer cancel() - retention := 1 * time.Hour * 24 - err = client.Register(ctx, &workflowservice.RegisterNamespaceRequest{ - Namespace: ts.config.Namespace, - WorkflowExecutionRetentionPeriod: &retention, - }) - client.Close() - if _, ok := err.(*serviceerror.NamespaceAlreadyExists); ok { - return - } - ts.NoError(err) - time.Sleep(namespaceCacheRefreshInterval) // wait for namespace cache refresh on temporal-server - // bellow is used to guarantee namespace is ready - var dummyReturn string - err = ts.executeWorkflow("test-namespace-exist", ts.workflows.SimplestWorkflow, &dummyReturn) - numOfRetry := 20 - for err != nil && numOfRetry >= 0 { - if _, ok := err.(*serviceerror.NamespaceNotFound); ok { - time.Sleep(namespaceCacheRefreshInterval) - err = ts.executeWorkflow("test-namespace-exist", ts.workflows.SimplestWorkflow, &dummyReturn) - } else { - break - } - numOfRetry-- - } -} - // We count on the no-cache test to test replay conditions here func (ts *IntegrationTestSuite) TestHistoryLength() { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) @@ -2518,216 +2476,6 @@ func (ts *IntegrationTestSuite) TestHeartbeatThrottleDisabled() { ts.assertReportedOperationCount("temporal_request_failure", "RecordActivityTaskHeartbeat", 0) } -func (ts *IntegrationTestSuite) TestUpsertMemoFromNil() { - ts.T().Skip("temporal server 1.18.0 has a bug") - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - systemInfo, err := ts.client.WorkflowService().GetSystemInfo( - ctx, - &workflowservice.GetSystemInfoRequest{}, - ) - ts.NoError(err) - if !systemInfo.GetCapabilities().GetUpsertMemo() { - ts.T().Skip("UpsertMemo not implemented in server yet") - } - - upsertMemo := map[string]interface{}{ - "key_1": "new_value_1", - "key_2": nil, - "key_3": 123, - } - - expectedKey1Value, _ := converter.GetDefaultDataConverter().ToPayload("new_value_1") - expectedKey3Value, _ := converter.GetDefaultDataConverter().ToPayload(123) - expectedMemo := &commonpb.Memo{ - Fields: map[string]*commonpb.Payload{ - "key_1": expectedKey1Value, - "key_3": expectedKey3Value, - }, - } - - // Start workflow - wfid := "test-upsert-memo-from-nil" - wfOptions := ts.startWorkflowOptions(wfid) - run, err := ts.client.ExecuteWorkflow(ctx, wfOptions, ts.workflows.UpsertMemo, upsertMemo) - ts.NoError(err) - ts.NotNil(run) - - var memo *commonpb.Memo - err = run.Get(ctx, &memo) - ts.NoError(err) - - // Wait a little bit for ES to update - time.Sleep(2 * time.Second) - - // Query ES for memo - resp, err := ts.client.DescribeWorkflowExecution(ctx, wfid, "") - ts.NoError(err) - ts.NotNil(resp) - - // workflow execution info matches memo in ES and correct - ts.Equal(resp.WorkflowExecutionInfo.Memo, memo) - ts.Equal(expectedMemo, memo) -} - -func (ts *IntegrationTestSuite) TestUpsertMemoFromEmptyMap() { - ts.T().Skip("temporal server 1.18.0 has a bug") - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - systemInfo, err := ts.client.WorkflowService().GetSystemInfo( - ctx, - &workflowservice.GetSystemInfoRequest{}, - ) - ts.NoError(err) - if !systemInfo.GetCapabilities().GetUpsertMemo() { - ts.T().Skip("UpsertMemo not implemented in server yet") - } - - upsertMemo := map[string]interface{}{ - "key_1": "new_value_1", - "key_2": nil, - "key_3": 123, - } - - expectedKey1Value, _ := converter.GetDefaultDataConverter().ToPayload("new_value_1") - expectedKey3Value, _ := converter.GetDefaultDataConverter().ToPayload(123) - expectedMemo := &commonpb.Memo{ - Fields: map[string]*commonpb.Payload{ - "key_1": expectedKey1Value, - "key_3": expectedKey3Value, - }, - } - - // Start workflow - wfid := "test-upsert-memo-from-empty-map" - wfOptions := ts.startWorkflowOptions(wfid) - wfOptions.Memo = map[string]interface{}{} - run, err := ts.client.ExecuteWorkflow(ctx, wfOptions, ts.workflows.UpsertMemo, upsertMemo) - ts.NoError(err) - ts.NotNil(run) - - var memo *commonpb.Memo - err = run.Get(ctx, &memo) - ts.NoError(err) - - // Wait a little bit for ES to update - time.Sleep(2 * time.Second) - - // Query ES for memo - resp, err := ts.client.DescribeWorkflowExecution(ctx, wfid, "") - ts.NoError(err) - ts.NotNil(resp) - - // workflow execution info matches memo in ES and correct - ts.Equal(resp.WorkflowExecutionInfo.Memo, memo) - ts.Equal(expectedMemo, memo) -} - -func (ts *IntegrationTestSuite) TestUpsertMemoWithExistingMemo() { - ts.T().Skip("temporal server 1.18.0 has a bug") - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - systemInfo, err := ts.client.WorkflowService().GetSystemInfo( - ctx, - &workflowservice.GetSystemInfoRequest{}, - ) - ts.NoError(err) - if !systemInfo.GetCapabilities().GetUpsertMemo() { - ts.T().Skip("UpsertMemo not implemented in server yet") - } - - upsertMemo := map[string]interface{}{ - "key_1": "new_value_1", - "key_2": nil, - "key_3": 123, - } - - expectedKey1Value, _ := converter.GetDefaultDataConverter().ToPayload("new_value_1") - expectedKey3Value, _ := converter.GetDefaultDataConverter().ToPayload(123) - expectedMemo := &commonpb.Memo{ - Fields: map[string]*commonpb.Payload{ - "key_1": expectedKey1Value, - "key_3": expectedKey3Value, - }, - } - - // Start workflow - wfid := "test-upsert-memo-with-existing-memo" - wfOptions := ts.startWorkflowOptions(wfid) - wfOptions.Memo = map[string]interface{}{ - "key_1": "value_1", - "key_2": "value_2", - } - run, err := ts.client.ExecuteWorkflow(ctx, wfOptions, ts.workflows.UpsertMemo, upsertMemo) - ts.NoError(err) - ts.NotNil(run) - - var memo *commonpb.Memo - err = run.Get(ctx, &memo) - ts.NoError(err) - - // Wait a little bit for ES to update - time.Sleep(2 * time.Second) - - // Query ES for memo - resp, err := ts.client.DescribeWorkflowExecution(ctx, wfid, "") - ts.NoError(err) - ts.NotNil(resp) - - // workflow execution info matches memo in ES and correct - ts.Equal(resp.WorkflowExecutionInfo.Memo, memo) - ts.Equal(expectedMemo, memo) -} - -// executeWorkflow executes a given workflow and waits for the result -func (ts *IntegrationTestSuite) executeWorkflow( - wfID string, wfFunc interface{}, retValPtr interface{}, args ...interface{}) error { - return ts.executeWorkflowWithOption(ts.startWorkflowOptions(wfID), wfFunc, retValPtr, args...) -} -func (ts *IntegrationTestSuite) executeWorkflowWithOption( - options client.StartWorkflowOptions, wfFunc interface{}, retValPtr interface{}, args ...interface{}) error { - return ts.executeWorkflowWithContextAndOption(context.Background(), options, wfFunc, retValPtr, args...) -} - -func (ts *IntegrationTestSuite) executeWorkflowWithContextAndOption( - ctx context.Context, options client.StartWorkflowOptions, wfFunc interface{}, retValPtr interface{}, args ...interface{}) error { - ctx, cancel := context.WithTimeout(ctx, ctxTimeout) - defer cancel() - run, err := ts.client.ExecuteWorkflow(ctx, options, wfFunc, args...) - if err != nil { - return err - } - err = run.Get(ctx, retValPtr) - if ts.config.Debug { - iter := ts.client.GetWorkflowHistory(ctx, options.ID, run.GetRunID(), false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) - for iter.HasNext() { - event, err1 := iter.Next() - if err1 != nil { - break - } - fmt.Println(event.String()) - } - } - return err -} - -func (ts *IntegrationTestSuite) startWorkflowOptions(wfID string) client.StartWorkflowOptions { - var wfOptions = client.StartWorkflowOptions{ - ID: wfID, - TaskQueue: ts.taskQueueName, - WorkflowExecutionTimeout: 15 * time.Second, - WorkflowTaskTimeout: time.Second, - WorkflowIDReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, - } - if wfID == CronWorkflowID { - wfOptions.CronSchedule = "@every 1s" - } - return wfOptions -} - func (ts *IntegrationTestSuite) registerWorkflowsAndActivities(w worker.Worker) { ts.workflows.register(w) ts.activities.register(w) diff --git a/test/test_utils_test.go b/test/test_utils_test.go index 5d614766f..cede8635d 100644 --- a/test/test_utils_test.go +++ b/test/test_utils_test.go @@ -36,8 +36,13 @@ import ( "time" "github.com/pborman/uuid" + enumspb "go.temporal.io/api/enums/v1" + "go.temporal.io/api/serviceerror" + "go.temporal.io/api/workflowservice/v1" "go.temporal.io/sdk/client" "go.temporal.io/sdk/converter" + "go.temporal.io/sdk/internal/common" + ilog "go.temporal.io/sdk/internal/log" "go.temporal.io/sdk/workflow" ) @@ -52,7 +57,12 @@ type ( TLS *tls.Config } // context.WithValue need this type instead of basic type string to avoid lint error - contextKey string + contextKey string + ConfigAndClientSuiteBase struct { + config Config + client client.Client + taskQueueName string + } ) var taskQueuePrefix = "tq-" + uuid.New() @@ -207,3 +217,119 @@ func (s *keysPropagator) ExtractToWorkflow(ctx workflow.Context, reader workflow } return ctx, nil } + +func (ts *ConfigAndClientSuiteBase) InitConfigAndNamespace() error { + ts.config = NewConfig() + var err error + err = WaitForTCP(time.Minute, ts.config.ServiceAddr) + if err != nil { + return err + } + if ts.config.ShouldRegisterNamespace { + err = ts.registerNamespace() + if err != nil { + return err + } + } + return nil +} + +func (ts *ConfigAndClientSuiteBase) InitClient() error { + var err error + ts.client, err = client.Dial(client.Options{ + HostPort: ts.config.ServiceAddr, + Namespace: ts.config.Namespace, + Logger: ilog.NewDefaultLogger(), + ConnectionOptions: client.ConnectionOptions{TLS: ts.config.TLS}, + }) + return err +} + +func SimplestWorkflow(_ workflow.Context) error { + return nil +} + +func (ts *ConfigAndClientSuiteBase) registerNamespace() error { + client, err := client.NewNamespaceClient(client.Options{ + HostPort: ts.config.ServiceAddr, + ConnectionOptions: client.ConnectionOptions{TLS: ts.config.TLS}, + }) + if err != nil { + return err + } + ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) + defer cancel() + err = client.Register(ctx, &workflowservice.RegisterNamespaceRequest{ + Namespace: ts.config.Namespace, + WorkflowExecutionRetentionPeriod: common.DurationPtr(1 * 24 * time.Hour), + }) + defer client.Close() + if _, ok := err.(*serviceerror.NamespaceAlreadyExists); ok { + return nil + } + if err != nil { + return err + } + time.Sleep(namespaceCacheRefreshInterval) // wait for namespace cache refresh on temporal-server + // below is used to guarantee namespace is ready + var dummyReturn string + err = ts.executeWorkflow("test-namespace-exist", SimplestWorkflow, &dummyReturn) + numOfRetry := 20 + for err != nil && numOfRetry >= 0 { + if _, ok := err.(*serviceerror.NamespaceNotFound); ok { + time.Sleep(namespaceCacheRefreshInterval) + err = ts.executeWorkflow("test-namespace-exist", SimplestWorkflow, &dummyReturn) + } else { + break + } + numOfRetry-- + } + return nil +} + +// executeWorkflow executes a given workflow and waits for the result +func (ts *ConfigAndClientSuiteBase) executeWorkflow( + wfID string, wfFunc interface{}, retValPtr interface{}, args ...interface{}) error { + return ts.executeWorkflowWithOption(ts.startWorkflowOptions(wfID), wfFunc, retValPtr, args...) +} + +func (ts *ConfigAndClientSuiteBase) executeWorkflowWithOption( + options client.StartWorkflowOptions, wfFunc interface{}, retValPtr interface{}, args ...interface{}) error { + return ts.executeWorkflowWithContextAndOption(context.Background(), options, wfFunc, retValPtr, args...) +} + +func (ts *ConfigAndClientSuiteBase) executeWorkflowWithContextAndOption( + ctx context.Context, options client.StartWorkflowOptions, wfFunc interface{}, retValPtr interface{}, args ...interface{}) error { + ctx, cancel := context.WithTimeout(ctx, ctxTimeout) + defer cancel() + run, err := ts.client.ExecuteWorkflow(ctx, options, wfFunc, args...) + if err != nil { + return err + } + err = run.Get(ctx, retValPtr) + if ts.config.Debug { + iter := ts.client.GetWorkflowHistory(ctx, options.ID, run.GetRunID(), false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT) + for iter.HasNext() { + event, err1 := iter.Next() + if err1 != nil { + break + } + fmt.Println(event.String()) + } + } + return err +} + +func (ts *ConfigAndClientSuiteBase) startWorkflowOptions(wfID string) client.StartWorkflowOptions { + var wfOptions = client.StartWorkflowOptions{ + ID: wfID, + TaskQueue: ts.taskQueueName, + WorkflowExecutionTimeout: 15 * time.Second, + WorkflowTaskTimeout: time.Second, + WorkflowIDReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, + } + if wfID == CronWorkflowID { + wfOptions.CronSchedule = "@every 1s" + } + return wfOptions +} diff --git a/test/worker_versioning_test.go b/test/worker_versioning_test.go new file mode 100644 index 000000000..49c4019f3 --- /dev/null +++ b/test/worker_versioning_test.go @@ -0,0 +1,144 @@ +// The MIT License +// +// Copyright (c) 2022 Temporal Technologies Inc. All rights reserved. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package test_test + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" +) + +type WorkerVersioningTestSuite struct { + *require.Assertions + suite.Suite + ConfigAndClientSuiteBase + workflows *Workflows +} + +func TestWorkerVersioningTestSuite(t *testing.T) { + suite.Run(t, new(WorkerVersioningTestSuite)) +} + +func (ts *WorkerVersioningTestSuite) SetupSuite() { + ts.Assertions = require.New(ts.T()) + ts.workflows = &Workflows{} + ts.NoError(ts.InitConfigAndNamespace()) + ts.NoError(ts.InitClient()) +} + +func (ts *WorkerVersioningTestSuite) TearDownSuite() { + ts.Assertions = require.New(ts.T()) + ts.client.Close() +} + +func (ts *WorkerVersioningTestSuite) SetupTest() { + ts.taskQueueName = taskQueuePrefix + "-" + ts.T().Name() +} + +func (ts *WorkerVersioningTestSuite) TestManipulateVersionGraph() { + ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) + defer cancel() + err := ts.client.UpdateWorkerBuildIDOrdering(ctx, &client.UpdateWorkerBuildIDOrderingOptions{ + TaskQueue: ts.taskQueueName, + WorkerBuildID: "1.0", + BecomeDefault: true, + }) + ts.NoError(err) + err = ts.client.UpdateWorkerBuildIDOrdering(ctx, &client.UpdateWorkerBuildIDOrderingOptions{ + TaskQueue: ts.taskQueueName, + WorkerBuildID: "2.0", + BecomeDefault: true, + }) + ts.NoError(err) + err = ts.client.UpdateWorkerBuildIDOrdering(ctx, &client.UpdateWorkerBuildIDOrderingOptions{ + TaskQueue: ts.taskQueueName, + WorkerBuildID: "1.1", + PreviousCompatible: "1.0", + }) + ts.NoError(err) + + res, err := ts.client.GetWorkerBuildIDOrdering(ctx, &client.GetWorkerBuildIDOrderingOptions{ + TaskQueue: ts.taskQueueName, + }) + ts.NoError(err) + ts.Equal("2.0", res.Default.WorkerBuildID) + ts.Equal("1.1", res.CompatibleLeaves[0].WorkerBuildID) + ts.Equal("1.0", res.CompatibleLeaves[0].PreviousCompatible.WorkerBuildID) +} + +func (ts *WorkerVersioningTestSuite) TestTwoWorkersGetDifferentTasks() { + ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout) + defer cancel() + + err := ts.client.UpdateWorkerBuildIDOrdering(ctx, &client.UpdateWorkerBuildIDOrderingOptions{ + TaskQueue: ts.taskQueueName, + WorkerBuildID: "1.0", + BecomeDefault: true, + }) + ts.NoError(err) + + worker1 := worker.New(ts.client, ts.taskQueueName, worker.Options{BuildIDForVersioning: "1.0"}) + ts.workflows.register(worker1) + ts.NoError(worker1.Start()) + defer worker1.Stop() + worker2 := worker.New(ts.client, ts.taskQueueName, worker.Options{BuildIDForVersioning: "2.0"}) + ts.workflows.register(worker2) + ts.NoError(worker2.Start()) + defer worker2.Stop() + + // Start some workflows targeting 1.0 + handle11, err := ts.client.ExecuteWorkflow(ctx, ts.startWorkflowOptions("1-1"), ts.workflows.WaitSignalToStart) + ts.NoError(err) + handle12, err := ts.client.ExecuteWorkflow(ctx, ts.startWorkflowOptions("1-2"), ts.workflows.WaitSignalToStart) + ts.NoError(err) + + // Now add the 2.0 version + err = ts.client.UpdateWorkerBuildIDOrdering(ctx, &client.UpdateWorkerBuildIDOrderingOptions{ + TaskQueue: ts.taskQueueName, + WorkerBuildID: "2.0", + BecomeDefault: true, + }) + ts.NoError(err) + + // 2.0 workflows + handle21, err := ts.client.ExecuteWorkflow(ctx, ts.startWorkflowOptions("2-1"), ts.workflows.WaitSignalToStart) + ts.NoError(err) + handle22, err := ts.client.ExecuteWorkflow(ctx, ts.startWorkflowOptions("2-2"), ts.workflows.WaitSignalToStart) + ts.NoError(err) + + // finish them all + ts.NoError(ts.client.SignalWorkflow(ctx, handle11.GetID(), handle11.GetRunID(), "start-signal", "")) + ts.NoError(ts.client.SignalWorkflow(ctx, handle12.GetID(), handle12.GetRunID(), "start-signal", "")) + ts.NoError(ts.client.SignalWorkflow(ctx, handle21.GetID(), handle21.GetRunID(), "start-signal", "")) + ts.NoError(ts.client.SignalWorkflow(ctx, handle22.GetID(), handle22.GetRunID(), "start-signal", "")) + + // Wait for all wfs to finish + ts.NoError(handle11.Get(ctx, nil)) + ts.NoError(handle12.Get(ctx, nil)) + ts.NoError(handle21.Get(ctx, nil)) + ts.NoError(handle22.Get(ctx, nil)) +} diff --git a/worker/worker.go b/worker/worker.go index 73becabf6..e7a2c5863 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -219,11 +219,12 @@ const ( ) // New creates an instance of worker for managing workflow and activity executions. -// client - the client for use by the worker -// taskQueue - is the task queue name you use to identify your client worker, also -// identifies group of workflow and activity implementations that are -// hosted by a single worker process -// options - configure any worker specific options like logger, metrics, identity +// +// client - the client for use by the worker +// taskQueue - is the task queue name you use to identify your client worker, also +// identifies group of workflow and activity implementations that are +// hosted by a single worker process +// options - configure any worker specific options like logger, metrics, identity func New( client client.Client, taskQueue string,