diff --git a/api/historyservice/v1/request_response.pb.go b/api/historyservice/v1/request_response.pb.go index 8396a9e7bab..5d32cf0672f 100644 --- a/api/historyservice/v1/request_response.pb.go +++ b/api/historyservice/v1/request_response.pb.go @@ -1132,8 +1132,7 @@ type RecordWorkflowTaskStartedRequest struct { PollRequest *v1.PollWorkflowTaskQueueRequest `protobuf:"bytes,6,opt,name=poll_request,json=pollRequest,proto3" json:"poll_request,omitempty"` Clock *v16.VectorClock `protobuf:"bytes,7,opt,name=clock,proto3" json:"clock,omitempty"` BuildIdRedirectInfo *v111.BuildIdRedirectInfo `protobuf:"bytes,8,opt,name=build_id_redirect_info,json=buildIdRedirectInfo,proto3" json:"build_id_redirect_info,omitempty"` - // Presence of this value means matching has redirected the task to a deployment other than - // the deployment that History passed when scheduling the task. + // The deployment passed by History when the task was scheduled. ScheduledDeployment *v112.Deployment `protobuf:"bytes,9,opt,name=scheduled_deployment,json=scheduledDeployment,proto3" json:"scheduled_deployment,omitempty"` } @@ -1423,13 +1422,12 @@ type RecordActivityTaskStartedRequest struct { BuildIdRedirectInfo *v111.BuildIdRedirectInfo `protobuf:"bytes,8,opt,name=build_id_redirect_info,json=buildIdRedirectInfo,proto3" json:"build_id_redirect_info,omitempty"` // Stamp represents the internal “version” of the activity options and can/will be changed with Activity API. Stamp int32 `protobuf:"varint,9,opt,name=stamp,proto3" json:"stamp,omitempty"` - // Presence of this value means matching has redirected the task to a deployment other than - // the deployment that History passed when scheduling the task. + // The deployment passed by History when the task was scheduled. ScheduledDeployment *v112.Deployment `protobuf:"bytes,10,opt,name=scheduled_deployment,json=scheduledDeployment,proto3" json:"scheduled_deployment,omitempty"` - // Whether the directive deployment contains the activity's task queue. Used by History to + // Whether the scheduled deployment contains the activity's task queue. Used by History to // determine if the activity redirect should affect the workflow. - // Only set if `directive_deployment` is set (i.e. the task is redirected). - DirectiveDeploymentContainsTaskQueue bool `protobuf:"varint,11,opt,name=directive_deployment_contains_task_queue,json=directiveDeploymentContainsTaskQueue,proto3" json:"directive_deployment_contains_task_queue,omitempty"` + // Only set if `scheduled_deployment` is set (i.e. the task is redirected). + ScheduledDeploymentContainsTaskQueue bool `protobuf:"varint,11,opt,name=scheduled_deployment_contains_task_queue,json=scheduledDeploymentContainsTaskQueue,proto3" json:"scheduled_deployment_contains_task_queue,omitempty"` } func (x *RecordActivityTaskStartedRequest) Reset() { @@ -1527,9 +1525,9 @@ func (x *RecordActivityTaskStartedRequest) GetScheduledDeployment() *v112.Deploy return nil } -func (x *RecordActivityTaskStartedRequest) GetDirectiveDeploymentContainsTaskQueue() bool { +func (x *RecordActivityTaskStartedRequest) GetScheduledDeploymentContainsTaskQueue() bool { if x != nil { - return x.DirectiveDeploymentContainsTaskQueue + return x.ScheduledDeploymentContainsTaskQueue } return false } @@ -10206,10 +10204,10 @@ var file_temporal_server_api_historyservice_v1_request_response_proto_rawDesc = 0x2e, 0x61, 0x70, 0x69, 0x2e, 0x64, 0x65, 0x70, 0x6c, 0x6f, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x2e, 0x76, 0x31, 0x2e, 0x44, 0x65, 0x70, 0x6c, 0x6f, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x13, 0x73, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x64, 0x44, 0x65, 0x70, 0x6c, 0x6f, 0x79, 0x6d, 0x65, 0x6e, 0x74, - 0x42, 0x02, 0x68, 0x00, 0x12, 0x5a, 0x0a, 0x28, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x76, 0x65, + 0x42, 0x02, 0x68, 0x00, 0x12, 0x5a, 0x0a, 0x28, 0x73, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x64, 0x5f, 0x64, 0x65, 0x70, 0x6c, 0x6f, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x5f, 0x63, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x73, 0x5f, 0x74, 0x61, 0x73, 0x6b, 0x5f, 0x71, 0x75, 0x65, 0x75, 0x65, 0x18, 0x0b, - 0x20, 0x01, 0x28, 0x08, 0x52, 0x24, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x76, 0x65, 0x44, 0x65, + 0x20, 0x01, 0x28, 0x08, 0x52, 0x24, 0x73, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x64, 0x44, 0x65, 0x70, 0x6c, 0x6f, 0x79, 0x6d, 0x65, 0x6e, 0x74, 0x43, 0x6f, 0x6e, 0x74, 0x61, 0x69, 0x6e, 0x73, 0x54, 0x61, 0x73, 0x6b, 0x51, 0x75, 0x65, 0x75, 0x65, 0x42, 0x02, 0x68, 0x00, 0x3a, 0x24, 0x92, 0xc4, 0x03, 0x20, 0x2a, 0x1e, 0x77, 0x6f, 0x72, 0x6b, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x65, 0x78, 0x65, 0x63, diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 86665ecd438..14f4933fb7c 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -852,10 +852,12 @@ of Timeout and if no activity is seen even after that the connection is closed.` true, `FrontendEnableSchedules enables schedule-related RPCs in the frontend`, ) - FrontendEnableDeployments = NewNamespaceBoolSetting( - "frontend.enableDeployments", + EnableDeployments = NewNamespaceBoolSetting( + "system.enableDeployments", false, - `FrontendEnableDeployments enables deployment-related RPCs in the frontend`, + `EnableDeployments enables deployments (versioning v3) in all services, +including deployment-related RPCs in the frontend, deployment entity workflows in the worker, +and deployment interaction in matching and history.`, ) EnableNexus = NewGlobalBoolSetting( "system.enableNexus", @@ -1220,11 +1222,6 @@ these log lines can be noisy, we want to be able to turn on and sample selective false, `MatchingDropNonRetryableTasks states if we should drop matching tasks with Internal/Dataloss errors`, ) - MatchingEnableDeployments = NewNamespaceBoolSetting( - "matching.enableDeployment", - false, - `MatchingEnableDeployments enables deployment-related RPCs in matching`, - ) MatchingMaxTaskQueuesInDeployment = NewNamespaceIntSetting( "matching.maxTaskQueuesInDeployment", 1000, @@ -2560,11 +2557,6 @@ If the service configures with archival feature enabled, update worker.historySc `How long to sleep within a local activity before pushing to workflow level sleep (don't make this close to or more than the workflow task timeout)`, ) - WorkerEnableDeployment = NewNamespaceBoolSetting( - "worker.enableDeployment", - false, - `WorkerEnableDeploymentGroup controls whether to start the worker for deployment and deployment-name workflows`, - ) WorkerDeleteNamespaceActivityLimits = NewGlobalTypedSetting( "worker.deleteNamespaceActivityLimitsConfig", sdkworker.Options{}, diff --git a/common/testing/taskpoller/taskpoller.go b/common/testing/taskpoller/taskpoller.go index 8b77a9370c8..7a8e579cf41 100644 --- a/common/testing/taskpoller/taskpoller.go +++ b/common/testing/taskpoller/taskpoller.go @@ -228,7 +228,7 @@ func (p *workflowTaskPoller) pollTask( if err != nil { return nil, err } - if resp == nil { + if resp == nil || resp.TaskToken == nil { return nil, NoWorkflowTaskAvailable } diff --git a/common/testing/testvars/test_vars.go b/common/testing/testvars/test_vars.go index 56e0d77c847..1e4d6728a2d 100644 --- a/common/testing/testvars/test_vars.go +++ b/common/testing/testvars/test_vars.go @@ -31,6 +31,7 @@ import ( "github.com/pborman/uuid" commonpb "go.temporal.io/api/common/v1" + "go.temporal.io/api/deployment/v1" enumspb "go.temporal.io/api/enums/v1" namespacepb "go.temporal.io/api/namespace/v1" taskqueuepb "go.temporal.io/api/taskqueue/v1" @@ -157,11 +158,23 @@ func (tv *TestVars) BuildId(key ...string) string { return tv.getOrCreate("build_id", key).(string) } +func (tv *TestVars) WithBuildId(buildId string, key ...string) *TestVars { + return tv.cloneSet("build_id", key, buildId) +} + func (tv *TestVars) DeploymentSeries(key ...string) string { //revive:disable-next-line:unchecked-type-assertion return tv.getOrCreate("deployment_series", key).(string) } +func (tv *TestVars) Deployment(key ...string) *deployment.Deployment { + //revive:disable-next-line:unchecked-type-assertion + return &deployment.Deployment{ + SeriesName: tv.DeploymentSeries(key...), + BuildId: tv.BuildId(key...), + } +} + func (tv *TestVars) WithWorkflowID(workflowID string, key ...string) *TestVars { return tv.cloneSet("workflow_id", key, workflowID) } diff --git a/common/worker_versioning/worker_versioning.go b/common/worker_versioning/worker_versioning.go index 55767b23354..e30ed8e773f 100644 --- a/common/worker_versioning/worker_versioning.go +++ b/common/worker_versioning/worker_versioning.go @@ -218,10 +218,14 @@ func MakeBuildIdDirective(buildId string) *taskqueuespb.TaskVersionDirective { } func StampFromCapabilities(cap *commonpb.WorkerVersionCapabilities) *commonpb.WorkerVersionStamp { + if cap.GetUseVersioning() && cap.GetDeploymentSeriesName() != "" { + // Versioning 3, do not return stamp. + return nil + } // TODO: remove `cap.BuildId != ""` condition after old versioning cleanup. this condition is used to differentiate // between old and new versioning in Record*TaskStart calls. [cleanup-old-wv] // we don't want to add stamp for task started events in old versioning - if cap != nil && cap.BuildId != "" { + if cap.GetBuildId() != "" { return &commonpb.WorkerVersionStamp{UseVersioning: cap.UseVersioning, BuildId: cap.BuildId} } return nil diff --git a/proto/internal/temporal/server/api/historyservice/v1/request_response.proto b/proto/internal/temporal/server/api/historyservice/v1/request_response.proto index cdd31b48202..0da3bedbcd5 100644 --- a/proto/internal/temporal/server/api/historyservice/v1/request_response.proto +++ b/proto/internal/temporal/server/api/historyservice/v1/request_response.proto @@ -243,8 +243,7 @@ message RecordWorkflowTaskStartedRequest { temporal.api.workflowservice.v1.PollWorkflowTaskQueueRequest poll_request = 6; temporal.server.api.clock.v1.VectorClock clock = 7; temporal.server.api.taskqueue.v1.BuildIdRedirectInfo build_id_redirect_info = 8; - // Presence of this value means matching has redirected the task to a deployment other than - // the deployment that History passed when scheduling the task. + // The deployment passed by History when the task was scheduled. temporal.api.deployment.v1.Deployment scheduled_deployment = 9; } @@ -286,13 +285,12 @@ message RecordActivityTaskStartedRequest { // Stamp represents the internal “version” of the activity options and can/will be changed with Activity API. int32 stamp = 9; - // Presence of this value means matching has redirected the task to a deployment other than - // the deployment that History passed when scheduling the task. + // The deployment passed by History when the task was scheduled. temporal.api.deployment.v1.Deployment scheduled_deployment = 10; - // Whether the directive deployment contains the activity's task queue. Used by History to + // Whether the scheduled deployment contains the activity's task queue. Used by History to // determine if the activity redirect should affect the workflow. - // Only set if `directive_deployment` is set (i.e. the task is redirected). - bool directive_deployment_contains_task_queue = 11; + // Only set if `scheduled_deployment` is set (i.e. the task is redirected). + bool scheduled_deployment_contains_task_queue = 11; } message RecordActivityTaskStartedResponse { diff --git a/service/frontend/service.go b/service/frontend/service.go index 77d03de1539..1377588fef3 100644 --- a/service/frontend/service.go +++ b/service/frontend/service.go @@ -309,7 +309,7 @@ func NewConfig( EnableSchedules: dynamicconfig.FrontendEnableSchedules.Get(dc), - EnableDeployments: dynamicconfig.FrontendEnableDeployments.Get(dc), + EnableDeployments: dynamicconfig.EnableDeployments.Get(dc), EnableBatcher: dynamicconfig.FrontendEnableBatcher.Get(dc), MaxConcurrentBatchOperation: dynamicconfig.FrontendMaxConcurrentBatchOperationPerNamespace.Get(dc), diff --git a/service/history/api/recordworkflowtaskstarted/api.go b/service/history/api/recordworkflowtaskstarted/api.go index d336cdfae2a..0c73d22cb25 100644 --- a/service/history/api/recordworkflowtaskstarted/api.go +++ b/service/history/api/recordworkflowtaskstarted/api.go @@ -171,10 +171,6 @@ func Invoke( pollerDeployment := worker_versioning.DeploymentFromCapabilities(req.PollRequest.WorkerVersionCapabilities) // Effective deployment of the workflow when History scheduled the WFT. scheduledDeployment := req.GetScheduledDeployment() - if scheduledDeployment == nil { - // Matching does not send the directive deployment when it's the same as poller's. - scheduledDeployment = pollerDeployment - } if !scheduledDeployment.Equal(wfDeployment) { // This must be an AT scheduled before the workflow transitions to the current // deployment. Matching can drop it. diff --git a/service/matching/config.go b/service/matching/config.go index 8ac73fd9146..b155bb03475 100644 --- a/service/matching/config.go +++ b/service/matching/config.go @@ -214,7 +214,7 @@ func NewConfig( TestDisableSyncMatch: dynamicconfig.TestMatchingDisableSyncMatch.Get(dc), LoadUserData: dynamicconfig.MatchingLoadUserData.Get(dc), HistoryMaxPageSize: dynamicconfig.MatchingHistoryMaxPageSize.Get(dc), - EnableDeployments: dynamicconfig.MatchingEnableDeployments.Get(dc), + EnableDeployments: dynamicconfig.EnableDeployments.Get(dc), MaxTaskQueuesInDeployment: dynamicconfig.MatchingMaxTaskQueuesInDeployment.Get(dc), RPS: dynamicconfig.MatchingRPS.Get(dc), OperatorRPSRatio: dynamicconfig.OperatorRPSRatio.Get(dc), diff --git a/service/matching/matching_engine.go b/service/matching/matching_engine.go index de750e21fd2..affa38585e8 100644 --- a/service/matching/matching_engine.go +++ b/service/matching/matching_engine.go @@ -2406,14 +2406,7 @@ func (e *matchingEngineImpl) recordWorkflowTaskStarted( RequestId: uuid.New(), PollRequest: pollReq, BuildIdRedirectInfo: task.redirectInfo, - } - - scheduledDeployment := task.event.Data.VersionDirective.GetDeployment() - dispatchDeployment := worker_versioning.DeploymentFromCapabilities(pollReq.GetWorkerVersionCapabilities()) - if !scheduledDeployment.Equal(dispatchDeployment) { - // Redirect has happened, set the directive deployment in the request so History can - // validate the task is not stale. - recordStartedRequest.ScheduledDeployment = scheduledDeployment + ScheduledDeployment: task.event.Data.VersionDirective.GetDeployment(), } return e.historyClient.RecordWorkflowTaskStarted(ctx, recordStartedRequest) @@ -2436,14 +2429,7 @@ func (e *matchingEngineImpl) recordActivityTaskStarted( PollRequest: pollReq, BuildIdRedirectInfo: task.redirectInfo, Stamp: task.event.Data.GetStamp(), - } - - scheduledDeployment := task.event.Data.VersionDirective.GetDeployment() - dispatchDeployment := worker_versioning.DeploymentFromCapabilities(pollReq.GetWorkerVersionCapabilities()) - if !scheduledDeployment.Equal(dispatchDeployment) { - // Redirect has happened, set the directive deployment in the request so History can - // validate the task is not stale. - recordStartedRequest.ScheduledDeployment = scheduledDeployment + ScheduledDeployment: task.event.Data.VersionDirective.GetDeployment(), } return e.historyClient.RecordActivityTaskStarted(ctx, recordStartedRequest) diff --git a/service/matching/task_queue_partition_manager.go b/service/matching/task_queue_partition_manager.go index 9d678e0e4b2..7c48e64b347 100644 --- a/service/matching/task_queue_partition_manager.go +++ b/service/matching/task_queue_partition_manager.go @@ -781,62 +781,61 @@ func (pm *taskQueuePartitionManagerImpl) getPhysicalQueuesForAdd( forwardInfo *taskqueuespb.TaskForwardInfo, runId string, ) (pinnedQueue physicalTaskQueueManager, syncMatchQueue physicalTaskQueueManager, userDataChanged <-chan struct{}, err error) { - if deployment := directive.GetDeployment(); deployment != nil { - wfBehavior := directive.GetBehavior() - - switch wfBehavior { - case enumspb.VERSIONING_BEHAVIOR_PINNED: - if pm.partition.Kind() == enumspb.TASK_QUEUE_KIND_STICKY { - // TODO (shahab): we can verify the passed deployment matches the last poller's deployment - return pm.defaultQueue, pm.defaultQueue, userDataChanged, nil - } + wfBehavior := directive.GetBehavior() + deployment := directive.GetDeployment() - err = worker_versioning.ValidateDeployment(deployment) - if err != nil { - return nil, nil, nil, err - } - pinnedQueue, err = pm.getVersionedQueue(ctx, "", "", deployment, true) - if err != nil { - return nil, nil, nil, err - } - if forwardInfo == nil { - // Task is not forwarded, so it can be spooled if sync match fails. - // Spool queue and sync match queue is the same for pinned workflows. - return pinnedQueue, pinnedQueue, nil, nil - } else { - // Forwarded from child partition - only do sync match. - return nil, pinnedQueue, nil, nil - } - case enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE: - perTypeUserData, perTypeUserDataChanged, err := pm.getPerTypeUserData() - if err != nil { - return nil, nil, nil, err - } + if wfBehavior == enumspb.VERSIONING_BEHAVIOR_PINNED { + if pm.partition.Kind() == enumspb.TASK_QUEUE_KIND_STICKY { + // TODO (shahab): we can verify the passed deployment matches the last poller's deployment + return pm.defaultQueue, pm.defaultQueue, userDataChanged, nil + } - currentDeployment := findCurrentDeployment(perTypeUserData.GetDeploymentData()) + err = worker_versioning.ValidateDeployment(deployment) + if err != nil { + return nil, nil, nil, err + } + pinnedQueue, err = pm.getVersionedQueue(ctx, "", "", deployment, true) + if err != nil { + return nil, nil, nil, err + } + if forwardInfo == nil { + // Task is not forwarded, so it can be spooled if sync match fails. + // Spool queue and sync match queue is the same for pinned workflows. + return pinnedQueue, pinnedQueue, nil, nil + } else { + // Forwarded from child partition - only do sync match. + return nil, pinnedQueue, nil, nil + } + } - if pm.partition.Kind() == enumspb.TASK_QUEUE_KIND_STICKY { - if !deployment.Equal(currentDeployment) { - // Current deployment has changed, so the workflow should move to a normal queue to - // get redirected to the new deployment. - return nil, nil, nil, serviceerrors.NewStickyWorkerUnavailable() - } + perTypeUserData, perTypeUserDataChanged, err := pm.getPerTypeUserData() + if err != nil { + return nil, nil, nil, err + } - // TODO (shahab): we can verify the passed deployment matches the last poller's deployment - return pm.defaultQueue, pm.defaultQueue, perTypeUserDataChanged, nil + currentDeployment := findCurrentDeployment(perTypeUserData.GetDeploymentData()) + if currentDeployment != nil && + // Make sure the wf is not v1-2 versioned + directive.GetAssignedBuildId() == "" { + if pm.partition.Kind() == enumspb.TASK_QUEUE_KIND_STICKY { + if !deployment.Equal(currentDeployment) { + // Current deployment has changed, so the workflow should move to a normal queue to + // get redirected to the new deployment. + return nil, nil, nil, serviceerrors.NewStickyWorkerUnavailable() } - currentDeploymentQueue, err := pm.getVersionedQueue(ctx, "", "", currentDeployment, true) - if forwardInfo == nil { - // Task is not forwarded, so it can be spooled if sync match fails. - // Unpinned tasks are spooled in default queue - return pm.defaultQueue, currentDeploymentQueue, perTypeUserDataChanged, err - } else { - // Forwarded from child partition - only do sync match. - return nil, currentDeploymentQueue, perTypeUserDataChanged, err - } - case enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED: - return nil, nil, nil, serviceerror.NewInvalidArgument("versioning behavior must be set") + // TODO (shahab): we can verify the passed deployment matches the last poller's deployment + return pm.defaultQueue, pm.defaultQueue, perTypeUserDataChanged, nil + } + + currentDeploymentQueue, err := pm.getVersionedQueue(ctx, "", "", currentDeployment, true) + if forwardInfo == nil { + // Task is not forwarded, so it can be spooled if sync match fails. + // Unpinned tasks are spooled in default queue + return pm.defaultQueue, currentDeploymentQueue, perTypeUserDataChanged, err + } else { + // Forwarded from child partition - only do sync match. + return nil, currentDeploymentQueue, perTypeUserDataChanged, err } } @@ -987,8 +986,6 @@ func (pm *taskQueuePartitionManagerImpl) getPerTypeUserData() (*persistencespb.T if err != nil { return nil, nil, err } - if perType, ok := userData.GetData().GetPerType()[int32(pm.Partition().TaskType())]; ok { - return perType, userDataChanged, nil - } - return nil, userDataChanged, nil + perType := userData.GetData().GetPerType()[int32(pm.Partition().TaskType())] + return perType, userDataChanged, nil } diff --git a/service/matching/task_reader.go b/service/matching/task_reader.go index 7117d859a69..7ae5c805f09 100644 --- a/service/matching/task_reader.go +++ b/service/matching/task_reader.go @@ -39,6 +39,7 @@ import ( "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/metrics" "go.temporal.io/server/common/primitives/timestamp" + serviceerrors "go.temporal.io/server/common/serviceerror" "go.temporal.io/server/common/util" "go.temporal.io/server/internal/goro" ) @@ -155,10 +156,12 @@ dispatchLoop: continue dispatchLoop } + var stickyUnavailable *serviceerrors.StickyWorkerUnavailable // if task is still valid (truly valid or unable to verify if task is valid) metrics.BufferThrottlePerTaskQueueCounter.With(tr.taggedMetricsHandler()).Record(1) - if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) { - // Don't log here if encounters missing user data error when dispatch a versioned task. + if !errors.Is(err, context.DeadlineExceeded) && !errors.Is(err, context.Canceled) && + // StickyWorkerUnavailable is expected for versioned sticky queues + !errors.As(err, &stickyUnavailable) { tr.throttledLogger().Error("taskReader: unexpected error dispatching task", tag.Error(err)) } util.InterruptibleSleep(ctx, taskReaderOfferThrottleWait) diff --git a/service/matching/user_data_manager.go b/service/matching/user_data_manager.go index ccbb1615b4c..864cdfc8d0b 100644 --- a/service/matching/user_data_manager.go +++ b/service/matching/user_data_manager.go @@ -310,6 +310,7 @@ func (m *userDataManagerImpl) fetchUserData(ctx context.Context) error { return nil } + fastResponseCounter := 0 minWaitTime := m.config.GetUserDataMinWaitTime for ctx.Err() == nil { @@ -322,11 +323,20 @@ func (m *userDataManagerImpl) fetchUserData(ctx context.Context) error { // spinning. So enforce a minimum wait time that increases as long as we keep getting // very fast replies. if elapsed < m.config.GetUserDataMinWaitTime { - util.InterruptibleSleep(ctx, minWaitTime-elapsed) - // Don't let this get near our call timeout, otherwise we can't tell the difference - // between a fast reply and a timeout. - minWaitTime = min(minWaitTime*2, m.config.GetUserDataLongPollTimeout()/2) + if fastResponseCounter >= 3 { + // 3 or more consecutive fast responses, let's throttle! + util.InterruptibleSleep(ctx, minWaitTime-elapsed) + // Don't let this get near our call timeout, otherwise we can't tell the difference + // between a fast reply and a timeout. + minWaitTime = min(minWaitTime*2, m.config.GetUserDataLongPollTimeout()/2) + } else { + // Not yet 3 consecutive fast responses. A few rapid refreshes for versioned queues + // is expected when the first poller arrives. We do not want to slow down the queue + // for that. + fastResponseCounter++ + } } else { + fastResponseCounter = 0 minWaitTime = m.config.GetUserDataMinWaitTime } } diff --git a/service/matching/user_data_manager_test.go b/service/matching/user_data_manager_test.go index 27ab3173e6b..b7d91ac92be 100644 --- a/service/matching/user_data_manager_test.go +++ b/service/matching/user_data_manager_test.go @@ -153,12 +153,25 @@ func TestUserData_FetchesOnInit(t *testing.T) { TaskQueue: defaultRootTqID, TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW, LastKnownUserDataVersion: 0, - WaitNewData: false, // first fetch is not long poll + WaitNewData: false, // first is not long poll }). Return(&matchingservice.GetTaskQueueUserDataResponse{ UserData: data1, }, nil) + tqCfg.matchingClientMock.EXPECT().GetTaskQueueUserData( + gomock.Any(), + &matchingservice.GetTaskQueueUserDataRequest{ + NamespaceId: defaultNamespaceId, + TaskQueue: defaultRootTqID, + TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW, + LastKnownUserDataVersion: 1, + WaitNewData: true, // second is long poll + }). + Return(&matchingservice.GetTaskQueueUserDataResponse{ + UserData: data1, + }, nil).MaxTimes(3) + m := createUserDataManager(t, controller, tqCfg) m.config.GetUserDataMinWaitTime = 10 * time.Second // only one fetch @@ -283,6 +296,19 @@ func TestUserData_RetriesFetchOnUnavailable(t *testing.T) { }, nil }) + tqCfg.matchingClientMock.EXPECT().GetTaskQueueUserData( + gomock.Any(), + &matchingservice.GetTaskQueueUserDataRequest{ + NamespaceId: defaultNamespaceId, + TaskQueue: defaultRootTqID, + TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW, + LastKnownUserDataVersion: 1, + WaitNewData: true, // after first successful poll, there would be long polls + }). + Return(&matchingservice.GetTaskQueueUserDataResponse{ + UserData: data1, + }, nil).MaxTimes(3) + m := createUserDataManager(t, controller, tqCfg) m.config.GetUserDataMinWaitTime = 10 * time.Second // wait on success m.config.GetUserDataRetryPolicy = backoff.NewExponentialRetryPolicy(50 * time.Millisecond). @@ -354,6 +380,19 @@ func TestUserData_RetriesFetchOnUnImplemented(t *testing.T) { }, nil }) + tqCfg.matchingClientMock.EXPECT().GetTaskQueueUserData( + gomock.Any(), + &matchingservice.GetTaskQueueUserDataRequest{ + NamespaceId: defaultNamespaceId, + TaskQueue: defaultRootTqID, + TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW, + LastKnownUserDataVersion: 1, + WaitNewData: true, // after first successful poll, there would be long polls + }). + Return(&matchingservice.GetTaskQueueUserDataResponse{ + UserData: data1, + }, nil).MaxTimes(3) + m := createUserDataManager(t, controller, tqCfg) m.config.GetUserDataMinWaitTime = 10 * time.Second // wait on success m.config.GetUserDataRetryPolicy = backoff.NewExponentialRetryPolicy(50 * time.Millisecond). @@ -410,6 +449,19 @@ func TestUserData_FetchesUpTree(t *testing.T) { UserData: data1, }, nil) + tqCfg.matchingClientMock.EXPECT().GetTaskQueueUserData( + gomock.Any(), + &matchingservice.GetTaskQueueUserDataRequest{ + NamespaceId: defaultNamespaceId, + TaskQueue: taskQueue.NormalPartition(10).RpcName(), + TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW, + LastKnownUserDataVersion: 1, + WaitNewData: true, // after first successful poll, there would be long polls + }). + Return(&matchingservice.GetTaskQueueUserDataResponse{ + UserData: data1, + }, nil).MaxTimes(3) + m := createUserDataManager(t, controller, tqCfg) m.config.GetUserDataMinWaitTime = 10 * time.Second // wait on success m.Start() @@ -448,6 +500,19 @@ func TestUserData_FetchesActivityToWorkflow(t *testing.T) { UserData: data1, }, nil) + tqCfg.matchingClientMock.EXPECT().GetTaskQueueUserData( + gomock.Any(), + &matchingservice.GetTaskQueueUserDataRequest{ + NamespaceId: defaultNamespaceId, + TaskQueue: defaultRootTqID, + TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW, + LastKnownUserDataVersion: 1, + WaitNewData: true, // after first successful poll, there would be long polls + }). + Return(&matchingservice.GetTaskQueueUserDataResponse{ + UserData: data1, + }, nil).MaxTimes(3) + m := createUserDataManager(t, controller, tqCfg) m.config.GetUserDataMinWaitTime = 10 * time.Second // wait on success m.Start() @@ -490,6 +555,19 @@ func TestUserData_FetchesStickyToNormal(t *testing.T) { UserData: data1, }, nil) + tqCfg.matchingClientMock.EXPECT().GetTaskQueueUserData( + gomock.Any(), + &matchingservice.GetTaskQueueUserDataRequest{ + NamespaceId: defaultNamespaceId, + TaskQueue: normalName, + TaskQueueType: enumspb.TASK_QUEUE_TYPE_WORKFLOW, + LastKnownUserDataVersion: 1, + WaitNewData: true, // after first successful poll, there would be long polls + }). + Return(&matchingservice.GetTaskQueueUserDataResponse{ + UserData: data1, + }, nil).MaxTimes(3) + m := createUserDataManager(t, controller, tqCfg) m.config.GetUserDataMinWaitTime = 10 * time.Second // wait on success m.Start() diff --git a/service/worker/deployment/fx.go b/service/worker/deployment/fx.go index 53405aeb0f4..046cf8a4680 100644 --- a/service/worker/deployment/fx.go +++ b/service/worker/deployment/fx.go @@ -93,7 +93,7 @@ func NewResult( return fxResult{ Component: &workerComponent{ activityDeps: params, - enabledForNs: dynamicconfig.WorkerEnableDeployment.Get(dc), + enabledForNs: dynamicconfig.EnableDeployments.Get(dc), }, } } diff --git a/tests/child_workflow_test.go b/tests/child_workflow_test.go index 1e4810cae0e..5d12d796899 100644 --- a/tests/child_workflow_test.go +++ b/tests/child_workflow_test.go @@ -25,15 +25,21 @@ package tests import ( + "context" "fmt" + "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/testing/taskpoller" + "go.temporal.io/server/common/testing/testvars" "sort" "testing" "time" "github.com/pborman/uuid" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" commandpb "go.temporal.io/api/command/v1" commonpb "go.temporal.io/api/common/v1" + deploymentpb "go.temporal.io/api/deployment/v1" enumspb "go.temporal.io/api/enums/v1" failurepb "go.temporal.io/api/failure/v1" filterpb "go.temporal.io/api/filter/v1" @@ -45,6 +51,7 @@ import ( "go.temporal.io/server/common/payload" "go.temporal.io/server/common/payloads" "go.temporal.io/server/tests/testcore" + "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" ) @@ -53,30 +60,111 @@ type ChildWorkflowSuite struct { testcore.FunctionalSuite } +func (s *ChildWorkflowSuite) SetupSuite() { + dynamicConfigOverrides := map[dynamicconfig.Key]any{ + dynamicconfig.EnableDeployments.Key(): true, + dynamicconfig.FrontendEnableWorkerVersioningWorkflowAPIs.Key(): true, + dynamicconfig.MatchingForwarderMaxChildrenPerNode.Key(): partitionTreeDegree, + + // Make sure we don't hit the rate limiter in tests + dynamicconfig.FrontendGlobalNamespaceNamespaceReplicationInducingAPIsRPS.Key(): 1000, + dynamicconfig.FrontendMaxNamespaceNamespaceReplicationInducingAPIsBurstRatioPerInstance.Key(): 1, + dynamicconfig.FrontendNamespaceReplicationInducingAPIsRPS.Key(): 1000, + + // this is overridden for tests using RunTestWithMatchingBehavior + dynamicconfig.MatchingNumTaskqueueReadPartitions.Key(): 4, + dynamicconfig.MatchingNumTaskqueueWritePartitions.Key(): 4, + } + s.SetDynamicConfigOverrides(dynamicConfigOverrides) + s.FunctionalTestBase.SetupSuite("testdata/es_cluster.yaml") +} + +func (s *ChildWorkflowSuite) TearDownSuite() { + s.FunctionalTestBase.TearDownSuite() +} + func TestChildWorkflowSuite(t *testing.T) { t.Parallel() suite.Run(t, new(ChildWorkflowSuite)) } -func (s *ChildWorkflowSuite) TestChildWorkflowExecution() { - parentID := "functional-child-workflow-test-parent" - childID := "functional-child-workflow-test-child" - grandchildID := "functional-child-workflow-test-grandchild" - wtParent := "functional-child-workflow-test-parent-type" - wtChild := "functional-child-workflow-test-child-type" - wtGrandchild := "functional-child-workflow-test-grandchild-type" - tlParent := "functional-child-workflow-test-parent-taskqueue" - tlChild := "functional-child-workflow-test-child-taskqueue" - tlGrandchild := "functional-child-workflow-test-grandchild-taskqueue" +func (s *ChildWorkflowSuite) TestChildWorkflowExecution_NoOverride() { + s.testChildWorkflowExecution(nil) +} + +func (s *ChildWorkflowSuite) TestChildWorkflowExecution_WithVersioningOverride() { + deploymentA := &deploymentpb.Deployment{ + SeriesName: "seriesName", + BuildId: "A", + } + override := &workflowpb.VersioningOverride{ + Behavior: enumspb.VERSIONING_BEHAVIOR_PINNED, + Deployment: deploymentA, + } + s.testChildWorkflowExecution(override) +} + +func (s *ChildWorkflowSuite) checkDescribeWorkflowAfterOverride( + wf *commonpb.WorkflowExecution, + expectedOverride *workflowpb.VersioningOverride, +) { + s.EventuallyWithT(func(t *assert.CollectT) { + a := assert.New(t) + resp, err := s.FrontendClient().DescribeWorkflowExecution(context.Background(), &workflowservice.DescribeWorkflowExecutionRequest{ + Namespace: s.Namespace(), + Execution: wf, + }) + a.NoError(err) + a.NotNil(resp) + a.NotNil(resp.GetWorkflowExecutionInfo()) + a.True(proto.Equal(expectedOverride, resp.GetWorkflowExecutionInfo().GetVersioningInfo().GetVersioningOverride())) + }, 5*time.Second, 50*time.Millisecond) +} + +func (s *ChildWorkflowSuite) testChildWorkflowExecution(override *workflowpb.VersioningOverride) { + var overrideDeployment *deploymentpb.Deployment + var overrideBehavior enumspb.VersioningBehavior + var versionCap *commonpb.WorkerVersionCapabilities + if override != nil { + overrideDeployment = override.GetDeployment() + overrideBehavior = override.GetBehavior() + versionCap = &commonpb.WorkerVersionCapabilities{ + BuildId: overrideDeployment.GetBuildId(), + UseVersioning: true, + DeploymentSeriesName: overrideDeployment.GetSeriesName(), + } + } + parentTV := testvars.New(s.T()) + childTV := testvars.New(s.T()) + grandchildTV := testvars.New(s.T()) + + parentID := parentTV.String("functional-child-workflow-test-parent") + childID := childTV.String("functional-child-workflow-test-child") + grandchildID := grandchildTV.String("functional-child-workflow-test-grandchild") + wtParent := parentTV.String("functional-child-workflow-test-parent-type") + wtChild := childTV.String("functional-child-workflow-test-child-type") + wtGrandchild := grandchildTV.String("functional-child-workflow-test-grandchild-type") + tlParent := parentTV.String("functional-child-workflow-test-parent-taskqueue") + tlChild := childTV.String("functional-child-workflow-test-child-taskqueue") + tlGrandchild := grandchildTV.String("functional-child-workflow-test-grandchild-taskqueue") identity := "worker1" saName := "CustomKeywordField" // Uncomment this line to test with mapper. // saName = "AliasForCustomKeywordField" + parentTV = parentTV.WithTaskQueue(tlParent) + childTV = childTV.WithTaskQueue(tlChild) + grandchildTV = grandchildTV.WithTaskQueue(tlGrandchild) parentWorkflowType := &commonpb.WorkflowType{Name: wtParent} childWorkflowType := &commonpb.WorkflowType{Name: wtChild} grandchildWorkflowType := &commonpb.WorkflowType{Name: wtGrandchild} + if overrideDeployment != nil { + parentTV = parentTV.WithBuildId(overrideDeployment.GetBuildId()) + childTV = childTV.WithBuildId(overrideDeployment.GetBuildId()) + grandchildTV = grandchildTV.WithBuildId(overrideDeployment.GetBuildId()) + } + taskQueueParent := &taskqueuepb.TaskQueue{Name: tlParent, Kind: enumspb.TASK_QUEUE_KIND_NORMAL} taskQueueChild := &taskqueuepb.TaskQueue{Name: tlChild, Kind: enumspb.TASK_QUEUE_KIND_NORMAL} taskQueueGrandchild := &taskqueuepb.TaskQueue{Name: tlGrandchild, Kind: enumspb.TASK_QUEUE_KIND_NORMAL} @@ -96,6 +184,7 @@ func (s *ChildWorkflowSuite) TestChildWorkflowExecution() { WorkflowRunTimeout: durationpb.New(100 * time.Second), WorkflowTaskTimeout: durationpb.New(1 * time.Second), Identity: identity, + VersioningOverride: override, } we, err0 := s.FrontendClient().StartWorkflowExecution(testcore.NewContext(), request) @@ -125,7 +214,7 @@ func (s *ChildWorkflowSuite) TestChildWorkflowExecution() { // Parent workflow logic wtHandlerParent := func( - task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { + task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { s.Logger.Info("Processing workflow task for Parent", tag.WorkflowID(task.WorkflowExecution.WorkflowId)) parentStartedEvent = task.History.Events[0] @@ -134,7 +223,7 @@ func (s *ChildWorkflowSuite) TestChildWorkflowExecution() { s.Logger.Info("Starting child execution") childExecutionStarted = true - return []*commandpb.Command{{ + commands := []*commandpb.Command{{ CommandType: enumspb.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION, Attributes: &commandpb.Command_StartChildWorkflowExecutionCommandAttributes{ StartChildWorkflowExecutionCommandAttributes: &commandpb.StartChildWorkflowExecutionCommandAttributes{ @@ -150,24 +239,46 @@ func (s *ChildWorkflowSuite) TestChildWorkflowExecution() { SearchAttributes: searchAttr, }, }, - }}, nil + }} + + return &workflowservice.RespondWorkflowTaskCompletedRequest{ + Commands: commands, + Identity: identity, + Namespace: s.Namespace(), + Deployment: overrideDeployment, + VersioningBehavior: overrideBehavior, + }, nil } else if task.PreviousStartedEventId > 0 { for _, event := range task.History.Events[task.PreviousStartedEventId:] { if event.GetEventType() == enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED { childStartedEventFromParent = event - return []*commandpb.Command{}, nil + commands := []*commandpb.Command{} + return &workflowservice.RespondWorkflowTaskCompletedRequest{ + Commands: commands, + Identity: identity, + Namespace: s.Namespace(), + Deployment: overrideDeployment, + VersioningBehavior: overrideBehavior, + }, nil } if event.GetEventType() == enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED { childCompletedEventFromParent = event - return []*commandpb.Command{{ + commands := []*commandpb.Command{{ CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{ CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{ Result: payloads.EncodeString("Done"), }, }, - }}, nil + }} + return &workflowservice.RespondWorkflowTaskCompletedRequest{ + Commands: commands, + Identity: identity, + Namespace: s.Namespace(), + Deployment: overrideDeployment, + VersioningBehavior: overrideBehavior, + }, nil } } } @@ -179,7 +290,7 @@ func (s *ChildWorkflowSuite) TestChildWorkflowExecution() { var childStartedEvent *historypb.HistoryEvent var childRunID string // Child workflow logic - wtHandlerChild := func(task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { + wtHandlerChild := func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { if task.PreviousStartedEventId <= 0 { childStartedEvent = task.History.Events[0] childRunID = task.WorkflowExecution.GetRunId() @@ -190,7 +301,7 @@ func (s *ChildWorkflowSuite) TestChildWorkflowExecution() { s.Logger.Info("Starting grandchild execution") grandchildExecutionStarted = true - return []*commandpb.Command{{ + commands := []*commandpb.Command{{ CommandType: enumspb.COMMAND_TYPE_START_CHILD_WORKFLOW_EXECUTION, Attributes: &commandpb.Command_StartChildWorkflowExecutionCommandAttributes{ StartChildWorkflowExecutionCommandAttributes: &commandpb.StartChildWorkflowExecutionCommandAttributes{ @@ -206,24 +317,46 @@ func (s *ChildWorkflowSuite) TestChildWorkflowExecution() { SearchAttributes: searchAttr, }, }, - }}, nil + }} + return &workflowservice.RespondWorkflowTaskCompletedRequest{ + Commands: commands, + Identity: identity, + Namespace: s.Namespace(), + Deployment: overrideDeployment, + VersioningBehavior: overrideBehavior, + }, nil } if task.PreviousStartedEventId > 0 { for _, event := range task.History.Events[task.PreviousStartedEventId:] { if event.GetEventType() == enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_STARTED { - return []*commandpb.Command{}, nil + commands := []*commandpb.Command{} + return &workflowservice.RespondWorkflowTaskCompletedRequest{ + Commands: commands, + Identity: identity, + Namespace: s.Namespace(), + Deployment: overrideDeployment, + VersioningBehavior: overrideBehavior, + }, nil + } if event.GetEventType() == enumspb.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_COMPLETED { childComplete = true - return []*commandpb.Command{{ + commands := []*commandpb.Command{{ CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{ CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{ Result: payloads.EncodeString("Child Done"), }, }, - }}, nil + }} + return &workflowservice.RespondWorkflowTaskCompletedRequest{ + Commands: commands, + Identity: identity, + Namespace: s.Namespace(), + Deployment: overrideDeployment, + VersioningBehavior: overrideBehavior, + }, nil } } } @@ -234,56 +367,39 @@ func (s *ChildWorkflowSuite) TestChildWorkflowExecution() { var grandchildStartedEvent *historypb.HistoryEvent // Grandchild workflow logic to check root workflow execution is carried correctly wtHandlerGrandchild := func( - task *workflowservice.PollWorkflowTaskQueueResponse) ([]*commandpb.Command, error) { + task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { if task.PreviousStartedEventId <= 0 { grandchildStartedEvent = task.History.Events[0] } s.Logger.Info("Processing workflow task for Grandchild", tag.WorkflowID(task.WorkflowExecution.WorkflowId)) grandchildComplete = true - return []*commandpb.Command{{ + commands := []*commandpb.Command{{ CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{ CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{ Result: payloads.EncodeString("Grandchild Done"), }, }, - }}, nil + }} + return &workflowservice.RespondWorkflowTaskCompletedRequest{ + Commands: commands, + Identity: identity, + Namespace: s.Namespace(), + Deployment: overrideDeployment, + VersioningBehavior: overrideBehavior, + }, nil } - pollerParent := &testcore.TaskPoller{ - Client: s.FrontendClient(), - Namespace: s.Namespace(), - TaskQueue: taskQueueParent, - Identity: identity, - WorkflowTaskHandler: wtHandlerParent, - Logger: s.Logger, - T: s.T(), - } - - pollerChild := &testcore.TaskPoller{ - Client: s.FrontendClient(), - Namespace: s.Namespace(), - TaskQueue: taskQueueChild, - Identity: identity, - WorkflowTaskHandler: wtHandlerChild, - Logger: s.Logger, - T: s.T(), - } - - pollerGrandchild := &testcore.TaskPoller{ - Client: s.FrontendClient(), - Namespace: s.Namespace(), - TaskQueue: taskQueueGrandchild, - Identity: identity, - WorkflowTaskHandler: wtHandlerGrandchild, - Logger: s.Logger, - T: s.T(), - } + pollerParent := taskpoller.New(s.T(), s.FrontendClient(), s.Namespace()) // taskQueueParent + pollerChild := taskpoller.New(s.T(), s.FrontendClient(), s.Namespace()) // taskQueueChild + pollerGrandchild := taskpoller.New(s.T(), s.FrontendClient(), s.Namespace()) // taskQueueGrandchild // Make first workflow task to start child execution - _, err := pollerParent.PollAndProcessWorkflowTask() - s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err)) + _, err := pollerParent.PollWorkflowTask( + &workflowservice.PollWorkflowTaskQueueRequest{WorkerVersionCapabilities: versionCap}, + ).HandleTask(parentTV, wtHandlerParent) + s.Logger.Info("PollAndHandleWorkflowTask", tag.Error(err)) s.NoError(err) s.True(childExecutionStarted) s.NotNil(parentStartedEvent) @@ -293,13 +409,17 @@ func (s *ChildWorkflowSuite) TestChildWorkflowExecution() { s.Nil(parentStartedEventAttrs.GetRootWorkflowExecution()) // Process ChildExecution Started event and Process Child Execution and complete it - _, err = pollerParent.PollAndProcessWorkflowTask() - s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err)) + _, err = pollerParent.PollWorkflowTask( + &workflowservice.PollWorkflowTaskQueueRequest{WorkerVersionCapabilities: versionCap}, + ).HandleTask(parentTV, wtHandlerParent) + s.Logger.Info("PollAndHandleWorkflowTask", tag.Error(err)) s.NoError(err) // Process Child workflow to start grandchild execution - _, err = pollerChild.PollAndProcessWorkflowTask() - s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err)) + _, err = pollerChild.PollWorkflowTask( + &workflowservice.PollWorkflowTaskQueueRequest{WorkerVersionCapabilities: versionCap}, + ).HandleTask(childTV, wtHandlerChild) + s.Logger.Info("PollAndHandleWorkflowTask", tag.Error(err)) s.NoError(err) s.NotNil(childStartedEventFromParent) s.NotNil(childStartedEvent) @@ -332,15 +452,22 @@ func (s *ChildWorkflowSuite) TestChildWorkflowExecution() { ) s.Equal(time.Duration(0), childStartedEventAttrs.GetWorkflowExecutionTimeout().AsDuration()) s.Equal(200*time.Second, childStartedEventAttrs.GetWorkflowRunTimeout().AsDuration()) + // check versioning override was inherited + s.ProtoEqual(override, childStartedEventAttrs.GetVersioningOverride()) + s.checkDescribeWorkflowAfterOverride(&commonpb.WorkflowExecution{WorkflowId: childID, RunId: childRunID}, override) // Process GrandchildExecution Started event and Process Grandchild Execution and complete it - _, err = pollerChild.PollAndProcessWorkflowTask() - s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err)) + _, err = pollerChild.PollWorkflowTask( + &workflowservice.PollWorkflowTaskQueueRequest{WorkerVersionCapabilities: versionCap}, + ).HandleTask(childTV, wtHandlerChild) + s.Logger.Info("PollAndHandleWorkflowTask", tag.Error(err)) s.NoError(err) // Process Grandchild workflow - _, err = pollerGrandchild.PollAndProcessWorkflowTask() - s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err)) + _, err = pollerGrandchild.PollWorkflowTask( + &workflowservice.PollWorkflowTaskQueueRequest{WorkerVersionCapabilities: versionCap}, + ).HandleTask(grandchildTV, wtHandlerGrandchild) + s.Logger.Info("PollAndHandleWorkflowTask", tag.Error(err)) s.NoError(err) s.True(grandchildComplete) s.NotNil(grandchildStartedEvent) @@ -354,16 +481,22 @@ func (s *ChildWorkflowSuite) TestChildWorkflowExecution() { s.NotNil(grandchildStartedEventAttrs.GetRootWorkflowExecution()) s.Equal(parentID, grandchildStartedEventAttrs.RootWorkflowExecution.GetWorkflowId()) s.Equal(we.GetRunId(), grandchildStartedEventAttrs.RootWorkflowExecution.GetRunId()) + // check versioning override was inherited + s.ProtoEqual(override, grandchildStartedEventAttrs.GetVersioningOverride()) // Process GrandchildExecution completed event and complete child execution - _, err = pollerChild.PollAndProcessWorkflowTask() - s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err)) + _, err = pollerChild.PollWorkflowTask( + &workflowservice.PollWorkflowTaskQueueRequest{WorkerVersionCapabilities: versionCap}, + ).HandleTask(childTV, wtHandlerChild) + s.Logger.Info("PollAndHandleWorkflowTask", tag.Error(err)) s.NoError(err) s.True(childComplete) // Process ChildExecution completed event and complete parent execution - _, err = pollerParent.PollAndProcessWorkflowTask() - s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err)) + _, err = pollerParent.PollWorkflowTask( + &workflowservice.PollWorkflowTaskQueueRequest{WorkerVersionCapabilities: versionCap}, + ).HandleTask(parentTV, wtHandlerParent) + s.Logger.Info("PollAndHandleWorkflowTask", tag.Error(err)) s.NoError(err) s.NotNil(childCompletedEventFromParent) completedAttributes := childCompletedEventFromParent.GetChildWorkflowExecutionCompletedEventAttributes() @@ -733,7 +866,7 @@ func (s *ChildWorkflowSuite) TestRetryChildWorkflowExecution() { // Process ChildExecution Started event _, err = pollerParent.PollAndProcessWorkflowTask() - s.Logger.Info("PollAndProcessWorkflowTask", tag.Error(err)) + s.Logger.Info("PollAndHWorkflowTask", tag.Error(err)) s.NoError(err) s.NotNil(startedEvent) diff --git a/tests/deployment_test.go b/tests/deployment_test.go index f125b77fcd1..d22598c907f 100644 --- a/tests/deployment_test.go +++ b/tests/deployment_test.go @@ -28,6 +28,8 @@ import ( "context" "errors" "fmt" + "go.temporal.io/sdk/worker" + "go.temporal.io/sdk/workflow" "testing" "time" @@ -48,6 +50,7 @@ import ( "go.temporal.io/server/common/log/tag" "go.temporal.io/server/common/payload" "go.temporal.io/server/common/testing/testvars" + "go.temporal.io/server/common/tqid" deploymentwf "go.temporal.io/server/service/worker/deployment" "go.temporal.io/server/tests/testcore" "google.golang.org/protobuf/proto" @@ -83,13 +86,11 @@ func TestDeploymentSuite(t *testing.T) { func (s *DeploymentSuite) SetupSuite() { s.setAssertions() dynamicConfigOverrides := map[dynamicconfig.Key]any{ - dynamicconfig.FrontendEnableDeployments.Key(): true, + dynamicconfig.EnableDeployments.Key(): true, dynamicconfig.FrontendEnableWorkerVersioningDataAPIs.Key(): true, dynamicconfig.FrontendEnableWorkerVersioningWorkflowAPIs.Key(): true, dynamicconfig.FrontendEnableWorkerVersioningRuleAPIs.Key(): true, dynamicconfig.FrontendEnableExecuteMultiOperation.Key(): true, - dynamicconfig.MatchingEnableDeployments.Key(): true, - dynamicconfig.WorkerEnableDeployment.Key(): true, // Reachability dynamicconfig.ReachabilityCacheOpenWFsTTL.Key(): testReachabilityCacheOpenWFsTTL, @@ -186,6 +187,58 @@ func (s *DeploymentSuite) TestDescribeDeployment_RegisterTaskQueue() { }, time.Second*5, time.Millisecond*200) } +func (s *DeploymentSuite) TestDescribeDeployment_RegisterTaskQueue_ConcurrentPollers() { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + // presence of internally used delimiters (:) or escape + // characters shouldn't break functionality + seriesName := testcore.RandomizeStr("my-series|:|:") + buildID := testcore.RandomizeStr("bgt:|") + + taskQueue := &taskqueuepb.TaskQueue{Name: "deployment-test", Kind: enumspb.TASK_QUEUE_KIND_NORMAL} + workerDeployment := &deploymentpb.Deployment{ + SeriesName: seriesName, + BuildId: buildID, + } + numberOfDeployments := 1 + + root, err := tqid.PartitionFromProto(taskQueue, s.Namespace(), enumspb.TASK_QUEUE_TYPE_WORKFLOW) + s.NoError(err) + // Making concurrent polls to 4 partitions, 3 polls to each + for p := 0; p < 4; p++ { + for i := 0; i < 3; i++ { + tq := &taskqueuepb.TaskQueue{Name: root.TaskQueue().NormalPartition(p).RpcName(), Kind: enumspb.TASK_QUEUE_KIND_NORMAL} + s.pollFromDeployment(ctx, tq, workerDeployment) + } + } + + // Querying the Deployment + s.EventuallyWithT(func(t *assert.CollectT) { + a := assert.New(t) + + resp, err := s.FrontendClient().DescribeDeployment(ctx, &workflowservice.DescribeDeploymentRequest{ + Namespace: s.Namespace(), + Deployment: workerDeployment, + }) + if !a.NoError(err) { + return + } + a.NotNil(resp.GetDeploymentInfo()) + a.NotNil(resp.GetDeploymentInfo().GetDeployment()) + + a.Equal(seriesName, resp.GetDeploymentInfo().GetDeployment().GetSeriesName()) + a.Equal(buildID, resp.GetDeploymentInfo().GetDeployment().GetBuildId()) + + if !a.Equal(numberOfDeployments, len(resp.GetDeploymentInfo().GetTaskQueueInfos())) { + return + } + a.Equal(taskQueue.Name, resp.GetDeploymentInfo().GetTaskQueueInfos()[0].Name) + a.Equal(false, resp.GetDeploymentInfo().GetIsCurrent()) + // todo (Shivam) - please add a check for current time + }, time.Second*5, time.Millisecond*1000) +} + func (s *DeploymentSuite) TestGetCurrentDeployment_NoCurrentDeployment() { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() @@ -478,6 +531,22 @@ func (s *DeploymentSuite) checkDeploymentReachability( }, 5*time.Second, 50*time.Millisecond) } +// SDK will have a GetWorkflowExecutionOptions method that sends an empty mask and a default +// WorkflowExecutionOptions and expects to read the workflow execution's existing options with no write +func (s *DeploymentSuite) checkSDKGetWorkflowExecutionOptions(ctx context.Context, + wf *commonpb.WorkflowExecution, + expectedOpts *workflowpb.WorkflowExecutionOptions, +) { + getResp, err := s.FrontendClient().UpdateWorkflowExecutionOptions(ctx, &workflowservice.UpdateWorkflowExecutionOptionsRequest{ + Namespace: s.Namespace(), + WorkflowExecution: wf, + WorkflowExecutionOptions: &workflowpb.WorkflowExecutionOptions{}, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{}}, + }) + s.NoError(err) + s.True(proto.Equal(getResp.GetWorkflowExecutionOptions(), expectedOpts)) +} + func (s *DeploymentSuite) createDeploymentAndWaitForExist( ctx context.Context, deployment *deploymentpb.Deployment, @@ -529,6 +598,7 @@ func (s *DeploymentSuite) TestUpdateWorkflowExecutionOptions_SetUnpinnedThenUnse s.NoError(err) s.True(proto.Equal(updateResp.GetWorkflowExecutionOptions(), unpinnedOpts)) s.checkDescribeWorkflowAfterOverride(ctx, unversionedWFExec, unpinnedOpts.GetVersioningOverride()) + s.checkSDKGetWorkflowExecutionOptions(ctx, unversionedWFExec, unpinnedOpts) // 2. Unset using empty update opts with mutation mask --> describe workflow shows no more override updateResp, err = s.FrontendClient().UpdateWorkflowExecutionOptions(ctx, &workflowservice.UpdateWorkflowExecutionOptionsRequest{ @@ -585,6 +655,7 @@ func (s *DeploymentSuite) TestUpdateWorkflowExecutionOptions_SetPinnedThenUnset( s.True(proto.Equal(updateResp.GetWorkflowExecutionOptions(), pinnedOpts)) s.checkDescribeWorkflowAfterOverride(ctx, unversionedWFExec, pinnedOpts.GetVersioningOverride()) s.checkDeploymentReachability(ctx, workerDeployment, enumspb.DEPLOYMENT_REACHABILITY_REACHABLE) + s.checkSDKGetWorkflowExecutionOptions(ctx, unversionedWFExec, pinnedOpts) // 2. Unset with empty update opts with mutation mask --> describe workflow shows no more override + deployment is unreachable updateResp, err = s.FrontendClient().UpdateWorkflowExecutionOptions(ctx, &workflowservice.UpdateWorkflowExecutionOptionsRequest{ @@ -637,6 +708,7 @@ func (s *DeploymentSuite) TestUpdateWorkflowExecutionOptions_EmptyFields() { s.NoError(err) s.True(proto.Equal(updateResp.GetWorkflowExecutionOptions(), &workflowpb.WorkflowExecutionOptions{})) s.checkDescribeWorkflowAfterOverride(ctx, unversionedWFExec, nil) + s.checkSDKGetWorkflowExecutionOptions(ctx, unversionedWFExec, &workflowpb.WorkflowExecutionOptions{}) } func (s *DeploymentSuite) TestUpdateWorkflowExecutionOptions_SetPinnedSetPinned() { @@ -993,6 +1065,103 @@ func (s *DeploymentSuite) checkListAndWaitForBatchCompletion(ctx context.Context } } +func (s *DeploymentSuite) waitForChan(ctx context.Context, ch chan struct{}) { + s.T().Helper() + select { + case <-ch: + case <-ctx.Done(): + s.FailNow("context timeout") + } +} + +func (s *DeploymentSuite) TestUpdateWorkflowExecutionOptions_ChildWorkflowWithSDK() { + s.T().Skip("needs new sdk with deployment pollers to work") + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + deploymentA := &deploymentpb.Deployment{ + SeriesName: "seriesName", + BuildId: "A", + } + override := &workflowpb.VersioningOverride{ + Behavior: enumspb.VERSIONING_BEHAVIOR_PINNED, + Deployment: deploymentA, + } + pinnedOptsA := &workflowpb.WorkflowExecutionOptions{ + VersioningOverride: override, + } + tqName := "test-tq-child-override" + + // create deployment so that GetDeploymentReachability doesn't error + s.createDeploymentAndWaitForExist(context.Background(), deploymentA, &taskqueuepb.TaskQueue{Name: tqName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL}) + + // define parent and child worfklows + parentStarted := make(chan struct{}, 1) + childOverrideValidated := make(chan struct{}, 1) + child := func(cctx workflow.Context) (string, error) { + // no worker will take this, since we can't make a pinned worker with the old sdk + return "hello", nil + } + parent := func(ctx workflow.Context) error { + parentStarted <- struct{}{} + // after the test receives "parentStarted", we set the pinned override, and this workflow will + // make no more progress by itself, since we have no sdk workers that can handle this + // wait for signal + workflow.GetSignalChannel(ctx, "wait").Receive(ctx, nil) + + // check that parent's override is set + parentWE := workflow.GetInfo(ctx).WorkflowExecution + s.checkDescribeWorkflowAfterOverride( + context.Background(), + &commonpb.WorkflowExecution{WorkflowId: parentWE.ID, RunId: parentWE.RunID}, + override) + + // run child workflow + fut := workflow.ExecuteChildWorkflow(workflow.WithChildOptions(ctx, workflow.ChildWorkflowOptions{ + TaskQueue: tqName, + }), "child") + + // check that child's override is set + var childWE workflow.Execution + s.NoError(fut.GetChildWorkflowExecution().Get(ctx, &childWE)) + s.checkDescribeWorkflowAfterOverride( + context.Background(), + &commonpb.WorkflowExecution{WorkflowId: childWE.ID, RunId: childWE.RunID}, + override) + childOverrideValidated <- struct{}{} + return nil + } + + unversionedWorker := worker.New(s.sdkClient, tqName, worker.Options{MaxConcurrentWorkflowTaskPollers: numPollers}) + unversionedWorker.RegisterWorkflowWithOptions(parent, workflow.RegisterOptions{Name: "parent"}) + unversionedWorker.RegisterWorkflowWithOptions(child, workflow.RegisterOptions{Name: "child"}) + s.NoError(unversionedWorker.Start()) + defer unversionedWorker.Stop() + + run, err := s.sdkClient.ExecuteWorkflow(ctx, sdkclient.StartWorkflowOptions{TaskQueue: tqName}, "parent") + s.NoError(err) + // wait for parent to start + s.waitForChan(ctx, parentStarted) + close(parentStarted) // force panic if replayed + + // set override on parent + updateResp, err := s.FrontendClient().UpdateWorkflowExecutionOptions(ctx, &workflowservice.UpdateWorkflowExecutionOptionsRequest{ + Namespace: s.Namespace(), + WorkflowExecution: &commonpb.WorkflowExecution{WorkflowId: run.GetID(), RunId: run.GetRunID()}, + WorkflowExecutionOptions: pinnedOptsA, + UpdateMask: &fieldmaskpb.FieldMask{Paths: []string{"versioning_override"}}, + }) + s.NoError(err) + s.True(proto.Equal(updateResp.GetWorkflowExecutionOptions(), pinnedOptsA)) + + // unblock the parent workflow so it will start its child + s.NoError(s.sdkClient.SignalWorkflow(ctx, run.GetID(), run.GetRunID(), "wait", nil)) + + // wait for child override to be validated (parent workflow might not complete, because no worker is polling + // that matches the child) + s.waitForChan(ctx, childOverrideValidated) + close(childOverrideValidated) // force panic if replayed +} + func (s *DeploymentSuite) TestStartWorkflowExecution_WithPinnedOverride() { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) defer cancel() @@ -1316,5 +1485,5 @@ func (s *DeploymentSuite) TestSetCurrent_UpdateMetadata() { // do not grow larger that DB column limit (currently as low as 272 chars). func (s *DeploymentSuite) Name() string { fullName := s.T().Name() - return fullName[len(fullName)-20:] + return fullName[len(fullName)-30:] } diff --git a/tests/testcore/functional_test_base.go b/tests/testcore/functional_test_base.go index 6d57aa59847..95d640cfaca 100644 --- a/tests/testcore/functional_test_base.go +++ b/tests/testcore/functional_test_base.go @@ -184,14 +184,14 @@ func (s *FunctionalTestBase) SetupSuite(defaultClusterConfigFile string, options s.operatorClient = s.testCluster.OperatorClient() s.httpAPIAddress = cluster.Host().FrontendHTTPAddress() - s.namespace = RandomizeStr("functional-test-namespace") + s.namespace = RandomizeStr("namespace") s.Require().NoError(s.registerNamespaceWithDefaults(s.namespace)) - s.foreignNamespace = RandomizeStr("functional-foreign-test-namespace") + s.foreignNamespace = RandomizeStr("foreign-namespace") s.Require().NoError(s.registerNamespaceWithDefaults(s.foreignNamespace)) if clusterConfig.EnableArchival { - s.archivalNamespace = RandomizeStr("functional-archival-enabled-namespace") + s.archivalNamespace = RandomizeStr("archival-enabled-namespace") s.Require().NoError(s.registerArchivalNamespace(s.archivalNamespace)) } } diff --git a/tests/versioning_3_test.go b/tests/versioning_3_test.go index 1c6620767e3..aca0efef3dc 100644 --- a/tests/versioning_3_test.go +++ b/tests/versioning_3_test.go @@ -25,27 +25,557 @@ package tests import ( + "context" + "fmt" + "sync/atomic" + "testing" "time" + "github.com/dgryski/go-farm" "github.com/pborman/uuid" + "github.com/stretchr/testify/suite" + commandpb "go.temporal.io/api/command/v1" commonpb "go.temporal.io/api/common/v1" + deploymentpb "go.temporal.io/api/deployment/v1" enumspb "go.temporal.io/api/enums/v1" taskqueuepb "go.temporal.io/api/taskqueue/v1" + "go.temporal.io/api/workflow/v1" "go.temporal.io/api/workflowservice/v1" + deploymentspb "go.temporal.io/server/api/deployment/v1" + "go.temporal.io/server/api/historyservice/v1" + "go.temporal.io/server/api/matchingservice/v1" + "go.temporal.io/server/common" + "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/payloads" + "go.temporal.io/server/common/primitives/timestamp" + "go.temporal.io/server/common/testing/taskpoller" + "go.temporal.io/server/common/testing/testvars" + "go.temporal.io/server/common/tqid" "go.temporal.io/server/tests/testcore" "google.golang.org/protobuf/types/known/durationpb" ) -func (s *VersioningIntegSuite) TestPinnedWorkflow() { - // TODO (shahab) implement this with TaskPoller etc. - s.startWorkflow() +const ( + tqTypeWf = enumspb.TASK_QUEUE_TYPE_WORKFLOW + tqTypeAct = enumspb.TASK_QUEUE_TYPE_ACTIVITY + vbUnspecified = enumspb.VERSIONING_BEHAVIOR_UNSPECIFIED + vbPinned = enumspb.VERSIONING_BEHAVIOR_PINNED + vbUnpinned = enumspb.VERSIONING_BEHAVIOR_AUTO_UPGRADE + ver3MinPollTime = common.MinLongPollTimeout + time.Millisecond*200 +) + +type Versioning3Suite struct { + // override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test, + // not merely log an error + testcore.FunctionalTestBase +} + +func TestVersioning3FunctionalSuite(t *testing.T) { + t.Parallel() + suite.Run(t, new(Versioning3Suite)) +} + +func (s *Versioning3Suite) SetupSuite() { + dynamicConfigOverrides := map[dynamicconfig.Key]any{ + dynamicconfig.EnableDeployments.Key(): true, + dynamicconfig.FrontendEnableWorkerVersioningWorkflowAPIs.Key(): true, + dynamicconfig.MatchingForwarderMaxChildrenPerNode.Key(): partitionTreeDegree, + + // Make sure we don't hit the rate limiter in tests + dynamicconfig.FrontendGlobalNamespaceNamespaceReplicationInducingAPIsRPS.Key(): 1000, + dynamicconfig.FrontendMaxNamespaceNamespaceReplicationInducingAPIsBurstRatioPerInstance.Key(): 1, + dynamicconfig.FrontendNamespaceReplicationInducingAPIsRPS.Key(): 1000, + + // this is overridden for tests using RunTestWithMatchingBehavior + dynamicconfig.MatchingNumTaskqueueReadPartitions.Key(): 4, + dynamicconfig.MatchingNumTaskqueueWritePartitions.Key(): 4, + } + s.SetDynamicConfigOverrides(dynamicConfigOverrides) + s.FunctionalTestBase.SetupSuite("testdata/es_cluster.yaml") +} + +func (s *Versioning3Suite) TearDownSuite() { + s.FunctionalTestBase.TearDownSuite() +} + +func (s *Versioning3Suite) SetupTest() { + s.FunctionalTestBase.SetupTest() +} + +func (s *Versioning3Suite) TestPinnedTask_NoProperPoller() { + s.RunTestWithMatchingBehavior( + func() { + tv := testvars.New(s) + + other := tv.WithBuildId("other") + go s.idlePollWorkflow(other, true, ver3MinPollTime, "other deployment should not receive pinned task") + s.waitForDeploymentDataPropagation(other, tqTypeWf) + + s.startWorkflow(tv, makePinnedOverride(tv.Deployment())) + s.idlePollWorkflow(tv, false, ver3MinPollTime, "unversioned worker should not receive pinned task") + }) +} + +func (s *Versioning3Suite) TestUnpinnedTask_NonCurrentDeployment() { + s.RunTestWithMatchingBehavior( + func() { + tv := testvars.New(s) + go s.idlePollWorkflow(tv, true, ver3MinPollTime, "non-current versioned poller should not receive unpinned task") + s.waitForDeploymentDataPropagation(tv, tqTypeWf) + + s.startWorkflow(tv, nil) + }) +} + +func (s *Versioning3Suite) TestUnpinnedTask_OldDeployment() { + s.RunTestWithMatchingBehavior( + func() { + tv := testvars.New(s) + // previous current deployment + s.updateTaskQueueDeploymentData(tv.WithBuildId("older"), time.Minute, tqTypeWf) + // current deployment + s.updateTaskQueueDeploymentData(tv, 0, tqTypeWf) + + s.startWorkflow(tv, nil) + + s.idlePollWorkflow( + tv.WithBuildId("older"), + true, + ver3MinPollTime, + "old deployment should not receive unpinned task", + ) + }, + ) +} + +func (s *Versioning3Suite) TestWorkflowWithPinnedOverride_Sticky() { + s.RunTestWithMatchingBehavior( + func() { + s.testWorkflowWithPinnedOverride(true) + }, + ) +} + +func (s *Versioning3Suite) TestWorkflowWithPinnedOverride_NoSticky() { + s.RunTestWithMatchingBehavior( + func() { + s.testWorkflowWithPinnedOverride(false) + }, + ) +} + +func (s *Versioning3Suite) testWorkflowWithPinnedOverride(sticky bool) { + tv := testvars.New(s) + + if sticky { + s.warmUpSticky(tv) + } + + wftCompleted := make(chan interface{}) + s.pollWftAndHandle(tv, false, wftCompleted, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + return respondWftWithActivities(tv, sticky, vbUnpinned, "5"), nil + }) + s.waitForDeploymentDataPropagation(tv, tqTypeWf) + + actCompleted := make(chan interface{}) + s.pollActivityAndHandle(tv, actCompleted, + func(task *workflowservice.PollActivityTaskQueueResponse) (*workflowservice.RespondActivityTaskCompletedRequest, error) { + s.NotNil(task) + return respondActivity(), nil + }) + s.waitForDeploymentDataPropagation(tv, tqTypeAct) + + override := makePinnedOverride(tv.Deployment()) + we := s.startWorkflow(tv, override) + + <-wftCompleted + s.verifyWorkflowVersioning(tv, vbUnpinned, tv.Deployment(), override, nil) + if sticky { + s.verifyWorkflowStickyQueue(we, tv.StickyTaskQueue()) + } + + <-actCompleted + s.verifyWorkflowVersioning(tv, vbUnpinned, tv.Deployment(), override, nil) + + s.pollWftAndHandle(tv, sticky, nil, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + return respondCompleteWorkflow(tv, vbUnpinned), nil + }) + s.verifyWorkflowVersioning(tv, vbUnpinned, tv.Deployment(), override, nil) +} + +func (s *Versioning3Suite) TestUnpinnedWorkflow_Sticky() { + s.RunTestWithMatchingBehavior( + func() { + s.testUnpinnedWorkflow(true) + }, + ) +} + +func (s *Versioning3Suite) TestUnpinnedWorkflow_NoSticky() { + s.RunTestWithMatchingBehavior( + func() { + s.testUnpinnedWorkflow(false) + }, + ) +} + +func (s *Versioning3Suite) testUnpinnedWorkflow(sticky bool) { + tv := testvars.New(s) + d := tv.Deployment() + + if sticky { + s.warmUpSticky(tv) + } + + wftCompleted := make(chan interface{}) + s.pollWftAndHandle(tv, false, wftCompleted, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + s.verifyWorkflowVersioning(tv, vbUnspecified, nil, nil, transitionTo(d)) + return respondWftWithActivities(tv, sticky, vbUnpinned, "5"), nil + }) + s.waitForDeploymentDataPropagation(tv, tqTypeWf) + + actCompleted := make(chan interface{}) + s.pollActivityAndHandle(tv, actCompleted, + func(task *workflowservice.PollActivityTaskQueueResponse) (*workflowservice.RespondActivityTaskCompletedRequest, error) { + s.NotNil(task) + return respondActivity(), nil + }) + s.waitForDeploymentDataPropagation(tv, tqTypeAct) + + s.updateTaskQueueDeploymentData(tv, 0, tqTypeWf, tqTypeAct) + + we := s.startWorkflow(tv, nil) + + <-wftCompleted + s.verifyWorkflowVersioning(tv, vbUnpinned, tv.Deployment(), nil, nil) + if sticky { + s.verifyWorkflowStickyQueue(we, tv.StickyTaskQueue()) + } + + <-actCompleted + s.verifyWorkflowVersioning(tv, vbUnpinned, tv.Deployment(), nil, nil) + + s.pollWftAndHandle(tv, sticky, nil, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + return respondCompleteWorkflow(tv, vbUnpinned), nil + }) + s.verifyWorkflowVersioning(tv, vbUnpinned, tv.Deployment(), nil, nil) +} + +func (s *Versioning3Suite) TestTransitionFromWft_Sticky() { + s.testTransitionFromWft(true) +} + +func (s *Versioning3Suite) TestTransitionFromWft_NoSticky() { + s.testTransitionFromWft(false) +} + +func (s *Versioning3Suite) testTransitionFromWft(sticky bool) { + // Wf runs one TWF and one AC on dA, then the second WFT is redirected to dB and + // transitions the wf with it. + + tvA := testvars.New(s).WithBuildId("A") + tvB := tvA.WithBuildId("B") + dA := tvA.Deployment() + dB := tvB.Deployment() + + if sticky { + s.warmUpSticky(tvA) + } + + s.updateTaskQueueDeploymentData(tvA, 0, tqTypeWf, tqTypeAct) + we := s.startWorkflow(tvA, nil) + + s.pollWftAndHandle(tvA, false, nil, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + s.verifyWorkflowVersioning(tvA, vbUnspecified, nil, nil, transitionTo(dA)) + return respondWftWithActivities(tvA, sticky, vbUnpinned, "5"), nil + }) + s.verifyWorkflowVersioning(tvA, vbUnpinned, dA, nil, nil) + if sticky { + s.verifyWorkflowStickyQueue(we, tvA.StickyTaskQueue()) + } + + s.pollActivityAndHandle(tvA, nil, + func(task *workflowservice.PollActivityTaskQueueResponse) (*workflowservice.RespondActivityTaskCompletedRequest, error) { + s.NotNil(task) + return respondActivity(), nil + }) + s.verifyWorkflowVersioning(tvA, vbUnpinned, dA, nil, nil) + + // Set B as the current deployment + s.updateTaskQueueDeploymentData(tvB, 0, tqTypeWf, tqTypeAct) + + s.pollWftAndHandle(tvB, false, nil, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + s.verifyWorkflowVersioning(tvA, vbUnpinned, dA, nil, transitionTo(dB)) + return respondCompleteWorkflow(tvB, vbUnpinned), nil + }) + s.verifyWorkflowVersioning(tvB, vbUnpinned, dB, nil, nil) +} + +func (s *Versioning3Suite) TestTransitionFromActivity_Sticky() { + s.testTransitionFromActivity(true) +} + +func (s *Versioning3Suite) TestTransitionFromActivity_NoSticky() { + s.testTransitionFromActivity(false) +} + +func (s *Versioning3Suite) testTransitionFromActivity(sticky bool) { + // Wf runs one TWF on dA and schedules four activities, then: + // 1. The first and second activities starts on dA + // 2. Current deployment becomes dB + // 3. The third activity is redirected to dB and starts a transition in the wf, without being + // dispatched. + // 4. The 4th activity also does not start on any of the builds although there are pending + // pollers on both. + // 5. The transition generates a WFT and it is started in dB. + // 6. The 1st act is completed here while the transition is going on. + // 7. The 2nd act fails and makes another attempt. But it is not dispatched. + // 8. WFT completes and the transition completes. + // 9. All the 3 remaining activities are now dispatched and completed. + + tvA := testvars.New(s).WithBuildId("A") + tvB := tvA.WithBuildId("B") + dA := tvA.Deployment() + dB := tvB.Deployment() + + if sticky { + s.warmUpSticky(tvA) + } + + s.updateTaskQueueDeploymentData(tvA, 0, tqTypeWf, tqTypeAct) + we := s.startWorkflow(tvA, nil) + + s.pollWftAndHandle(tvA, false, nil, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + s.verifyWorkflowVersioning(tvA, vbUnspecified, nil, nil, transitionTo(dA)) + return respondWftWithActivities(tvA, sticky, vbUnpinned, "5", "6", "7", "8"), nil + }) + s.verifyWorkflowVersioning(tvA, vbUnpinned, dA, nil, nil) + if sticky { + s.verifyWorkflowStickyQueue(we, tvA.StickyTaskQueue()) + } + + // Set B as the current deployment + s.updateTaskQueueDeploymentData(tvB, 0, tqTypeWf, tqTypeAct) + + // The poller should be present to the activity task is redirected, but it should not receive a + // task until transition completes in the next wft. + transitionCompleted := atomic.Bool{} + actCompleted := make(chan interface{}) + s.pollActivityAndHandle(tvB, actCompleted, + func(task *workflowservice.PollActivityTaskQueueResponse) (*workflowservice.RespondActivityTaskCompletedRequest, error) { + // Activity should not start until the transition is completed + s.True(transitionCompleted.Load()) + s.NotNil(task) + return respondActivity(), nil + }) + s.verifyWorkflowVersioning(tvA, vbUnpinned, dA, nil, nil) + + // The transition should create a new WFT to be sent to dB. Poller responds with empty wft complete. + s.pollWftAndHandle(tvB, false, nil, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + s.verifyWorkflowVersioning(tvA, vbUnpinned, dA, nil, transitionTo(dB)) + transitionCompleted.Store(true) + return respondEmptyWft(tvB, sticky, vbUnpinned), nil + }) + s.verifyWorkflowVersioning(tvB, vbUnpinned, dB, nil, nil) + if sticky { + s.verifyWorkflowStickyQueue(we, tvB.StickyTaskQueue()) + } + + s.pollWftAndHandle(tvB, false, nil, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.NotNil(task) + return respondCompleteWorkflow(tvB, vbUnpinned), nil + }) + s.verifyWorkflowVersioning(tvB, vbUnpinned, dB, nil, nil) +} + +func transitionTo(d *deploymentpb.Deployment) *workflow.DeploymentTransition { + return &workflow.DeploymentTransition{ + Deployment: d, + } +} + +func (s *Versioning3Suite) updateTaskQueueDeploymentData( + tv *testvars.TestVars, + timeSinceCurrent time.Duration, + tqTypes ...enumspb.TaskQueueType, +) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + + lastBecameCurrent := time.Now().Add(-timeSinceCurrent) + for _, t := range tqTypes { + _, err := s.GetTestCluster().MatchingClient().SyncDeploymentUserData( + ctx, &matchingservice.SyncDeploymentUserDataRequest{ + NamespaceId: s.GetNamespaceID(s.Namespace()), + TaskQueue: tv.TaskQueue().GetName(), + TaskQueueType: t, + Deployment: tv.Deployment(), + Data: &deploymentspb.TaskQueueData{ + FirstPollerTime: timestamp.TimePtr(lastBecameCurrent), + LastBecameCurrentTime: timestamp.TimePtr(lastBecameCurrent), + }, + }, + ) + s.NoError(err) + } + s.waitForDeploymentDataPropagation(tv, tqTypes...) +} + +func (s *Versioning3Suite) verifyWorkflowVersioning( + tv *testvars.TestVars, + behavior enumspb.VersioningBehavior, + deployment *deploymentpb.Deployment, + override *workflow.VersioningOverride, + transition *workflow.DeploymentTransition, +) { + dwf, err := s.FrontendClient().DescribeWorkflowExecution( + context.Background(), &workflowservice.DescribeWorkflowExecutionRequest{ + Namespace: s.Namespace(), + Execution: &commonpb.WorkflowExecution{ + WorkflowId: tv.WorkflowID(), + }, + }, + ) + s.NoError(err) + + versioningInfo := dwf.WorkflowExecutionInfo.GetVersioningInfo() + s.Equal(behavior.String(), versioningInfo.GetBehavior().String()) + if !deployment.Equal(versioningInfo.GetDeployment()) { + s.Fail(fmt.Sprintf("deployment mismatch. expected: {%s}, actual: {%s}", + deployment, + versioningInfo.GetDeployment(), + )) + } + + s.Equal(override.GetBehavior().String(), versioningInfo.GetVersioningOverride().GetBehavior().String()) + if actualOverrideDeployment := versioningInfo.GetVersioningOverride().GetDeployment(); !override.GetDeployment().Equal(actualOverrideDeployment) { + s.Fail(fmt.Sprintf("deployment override mismatch. expected: {%s}, actual: {%s}", + override.GetDeployment(), + actualOverrideDeployment, + )) + } + + if !versioningInfo.GetDeploymentTransition().Equal(transition) { + s.Fail(fmt.Sprintf("deployment override mismatch. expected: {%s}, actual: {%s}", + transition, + versioningInfo.GetDeploymentTransition(), + )) + } } -func (s *VersioningIntegSuite) startWorkflow() *commonpb.WorkflowExecution { - id := s.randomizeName("my-wf") +func respondActivity() *workflowservice.RespondActivityTaskCompletedRequest { + return &workflowservice.RespondActivityTaskCompletedRequest{} +} + +func respondWftWithActivities( + tv *testvars.TestVars, + sticky bool, + behavior enumspb.VersioningBehavior, + activityIds ...string, +) *workflowservice.RespondWorkflowTaskCompletedRequest { + var stickyAttr *taskqueuepb.StickyExecutionAttributes + if sticky { + stickyAttr = &taskqueuepb.StickyExecutionAttributes{ + WorkerTaskQueue: tv.StickyTaskQueue(), + ScheduleToStartTimeout: durationpb.New(5 * time.Second), + } + } + var commands []*commandpb.Command + for _, a := range activityIds { + commands = append(commands, &commandpb.Command{ + CommandType: enumspb.COMMAND_TYPE_SCHEDULE_ACTIVITY_TASK, + Attributes: &commandpb.Command_ScheduleActivityTaskCommandAttributes{ + ScheduleActivityTaskCommandAttributes: &commandpb.ScheduleActivityTaskCommandAttributes{ + ActivityId: a, + ActivityType: &commonpb.ActivityType{Name: "act"}, + TaskQueue: tv.TaskQueue(), + Input: payloads.EncodeString("input"), + // TODO (shahab): tests with forced task forward take multiple seconds. Need to know why? + ScheduleToCloseTimeout: durationpb.New(10 * time.Second), + ScheduleToStartTimeout: durationpb.New(10 * time.Second), + StartToCloseTimeout: durationpb.New(1 * time.Second), + HeartbeatTimeout: durationpb.New(1 * time.Second), + RequestEagerExecution: false, + }, + }, + }) + } + return &workflowservice.RespondWorkflowTaskCompletedRequest{ + Commands: commands, + StickyAttributes: stickyAttr, + ReturnNewWorkflowTask: false, + ForceCreateNewWorkflowTask: false, + Deployment: tv.Deployment(), + VersioningBehavior: behavior, + } +} + +func respondEmptyWft( + tv *testvars.TestVars, + sticky bool, + behavior enumspb.VersioningBehavior, +) *workflowservice.RespondWorkflowTaskCompletedRequest { + var stickyAttr *taskqueuepb.StickyExecutionAttributes + if sticky { + stickyAttr = &taskqueuepb.StickyExecutionAttributes{ + WorkerTaskQueue: tv.StickyTaskQueue(), + ScheduleToStartTimeout: durationpb.New(5 * time.Second), + } + } + return &workflowservice.RespondWorkflowTaskCompletedRequest{ + StickyAttributes: stickyAttr, + ReturnNewWorkflowTask: false, + ForceCreateNewWorkflowTask: false, + Deployment: tv.Deployment(), + VersioningBehavior: behavior, + } +} + +func respondCompleteWorkflow( + tv *testvars.TestVars, + behavior enumspb.VersioningBehavior, +) *workflowservice.RespondWorkflowTaskCompletedRequest { + return &workflowservice.RespondWorkflowTaskCompletedRequest{ + Commands: []*commandpb.Command{ + { + CommandType: enumspb.COMMAND_TYPE_COMPLETE_WORKFLOW_EXECUTION, + Attributes: &commandpb.Command_CompleteWorkflowExecutionCommandAttributes{ + CompleteWorkflowExecutionCommandAttributes: &commandpb.CompleteWorkflowExecutionCommandAttributes{ + Result: payloads.EncodeString("done"), + }, + }, + }, + }, + ReturnNewWorkflowTask: false, + ForceCreateNewWorkflowTask: false, + Deployment: tv.Deployment(), + VersioningBehavior: behavior, + } +} + +func (s *Versioning3Suite) startWorkflow( + tv *testvars.TestVars, + override *workflow.VersioningOverride, +) *commonpb.WorkflowExecution { + id := tv.WorkflowID() wt := "MyWfType" - tqName := "my-tq" - tq := &taskqueuepb.TaskQueue{Name: tqName, Kind: enumspb.TASK_QUEUE_KIND_NORMAL} identity := "worker1" request := &workflowservice.StartWorkflowExecutionRequest{ @@ -53,11 +583,12 @@ func (s *VersioningIntegSuite) startWorkflow() *commonpb.WorkflowExecution { Namespace: s.Namespace(), WorkflowId: id, WorkflowType: &commonpb.WorkflowType{Name: wt}, - TaskQueue: tq, + TaskQueue: tv.TaskQueue(), Input: nil, WorkflowRunTimeout: durationpb.New(100 * time.Second), - WorkflowTaskTimeout: durationpb.New(1 * time.Second), + WorkflowTaskTimeout: durationpb.New(10 * time.Second), Identity: identity, + VersioningOverride: override, } we, err0 := s.FrontendClient().StartWorkflowExecution(testcore.NewContext(), request) @@ -68,7 +599,225 @@ func (s *VersioningIntegSuite) startWorkflow() *commonpb.WorkflowExecution { } } -// Adds the test name and a random string as postfix to the given name -func (s *VersioningIntegSuite) randomizeName(name string) string { - return testcore.RandomizeStr(name + "_" + s.T().Name()) +// Name is used by testvars. We use a shorten test name in variables so that physical task queue IDs +// do not grow larger that DB column limit (currently as low as 272 chars). +func (s *Versioning3Suite) Name() string { + fullName := s.T().Name() + if len(fullName) <= 30 { + return fullName + } + return fmt.Sprintf("%s-%08x", + fullName[len(fullName)-21:], + farm.Fingerprint32([]byte(fullName)), + ) +} + +// pollWftAndHandle can be used in sync and async mode. For async mode pass the async channel. It +// will be closed when the task is handled. +func (s *Versioning3Suite) pollWftAndHandle( + tv *testvars.TestVars, + sticky bool, + async chan<- interface{}, + handler func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error), +) { + poller := taskpoller.New(s.T(), s.FrontendClient(), s.Namespace()) + d := tv.Deployment() + tq := tv.TaskQueue() + if sticky { + tq = tv.StickyTaskQueue() + } + f := func() { + _, err := poller.PollWorkflowTask( + &workflowservice.PollWorkflowTaskQueueRequest{ + WorkerVersionCapabilities: &commonpb.WorkerVersionCapabilities{ + BuildId: d.BuildId, + DeploymentSeriesName: d.SeriesName, + UseVersioning: true, + }, + TaskQueue: tq, + }, + ).HandleTask(tv, handler) + s.NoError(err) + } + if async == nil { + f() + } else { + go func() { + f() + close(async) + }() + } +} + +func (s *Versioning3Suite) pollActivityAndHandle( + tv *testvars.TestVars, + async chan<- interface{}, + handler func(task *workflowservice.PollActivityTaskQueueResponse) (*workflowservice.RespondActivityTaskCompletedRequest, error), +) { + poller := taskpoller.New(s.T(), s.FrontendClient(), s.Namespace()) + d := tv.Deployment() + f := func() { + _, err := poller.PollActivityTask( + &workflowservice.PollActivityTaskQueueRequest{ + WorkerVersionCapabilities: &commonpb.WorkerVersionCapabilities{ + BuildId: d.BuildId, + DeploymentSeriesName: d.SeriesName, + UseVersioning: true, + }, + }, + ).HandleTask(tv, handler, taskpoller.WithTimeout(time.Minute)) + s.NoError(err) + } + if async == nil { + f() + } else { + go func() { + f() + close(async) + }() + } +} + +func (s *Versioning3Suite) idlePollWorkflow( + tv *testvars.TestVars, + versioned bool, + timeout time.Duration, + unexpectedTaskMessage string, +) { + poller := taskpoller.New(s.T(), s.FrontendClient(), s.Namespace()) + d := tv.Deployment() + _, _ = poller.PollWorkflowTask( + &workflowservice.PollWorkflowTaskQueueRequest{ + WorkerVersionCapabilities: &commonpb.WorkerVersionCapabilities{ + BuildId: d.BuildId, + DeploymentSeriesName: d.SeriesName, + UseVersioning: versioned, + }, + }, + ).HandleTask( + tv, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.Fail(unexpectedTaskMessage) + return nil, nil + }, + taskpoller.WithTimeout(timeout), + ) +} + +func (s *Versioning3Suite) idlePollActivity( + tv *testvars.TestVars, + versioned bool, + timeout time.Duration, + unexpectedTaskMessage string, +) { + poller := taskpoller.New(s.T(), s.FrontendClient(), s.Namespace()) + d := tv.Deployment() + _, _ = poller.PollActivityTask( + &workflowservice.PollActivityTaskQueueRequest{ + WorkerVersionCapabilities: &commonpb.WorkerVersionCapabilities{ + BuildId: d.BuildId, + DeploymentSeriesName: d.SeriesName, + UseVersioning: versioned, + }, + }, + ).HandleTask( + tv, + func(task *workflowservice.PollActivityTaskQueueResponse) (*workflowservice.RespondActivityTaskCompletedRequest, error) { + s.Fail(unexpectedTaskMessage) + return nil, nil + }, + taskpoller.WithTimeout(timeout), + ) +} + +func (s *Versioning3Suite) verifyWorkflowStickyQueue( + we *commonpb.WorkflowExecution, + stickyQ *taskqueuepb.TaskQueue, +) { + ms, err := s.GetTestCluster().HistoryClient().GetMutableState( + context.Background(), &historyservice.GetMutableStateRequest{ + NamespaceId: s.GetNamespaceID(s.Namespace()), + Execution: we, + }, + ) + s.NoError(err) + s.Equal(stickyQ.GetName(), ms.StickyTaskQueue.GetName()) +} + +// Sticky queue needs to be created in server before tasks can schedule in it. Call to this method +// create the sticky queue by polling it. +func (s *Versioning3Suite) warmUpSticky( + tv *testvars.TestVars, +) { + poller := taskpoller.New(s.T(), s.FrontendClient(), s.Namespace()) + _, _ = poller.PollWorkflowTask( + &workflowservice.PollWorkflowTaskQueueRequest{ + TaskQueue: tv.StickyTaskQueue(), + }, + ).HandleTask( + tv, + func(task *workflowservice.PollWorkflowTaskQueueResponse) (*workflowservice.RespondWorkflowTaskCompletedRequest, error) { + s.Fail("sticky task is not expected") + return nil, nil + }, + taskpoller.WithTimeout(ver3MinPollTime), + ) +} + +func (s *Versioning3Suite) waitForDeploymentDataPropagation( + tv *testvars.TestVars, + tqTypes ...enumspb.TaskQueueType, +) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + v := s.GetTestCluster().Host().DcClient().GetValue(dynamicconfig.MatchingNumTaskqueueReadPartitions.Key()) + s.NotEmpty(v, "versioning tests require setting explicit number of partitions") + count, ok := v[0].Value.(int) + s.True(ok, "partition count is not an int") + partitionCount := count + + type partAndType struct { + part int + tp enumspb.TaskQueueType + } + remaining := make(map[partAndType]struct{}) + for i := 0; i < partitionCount; i++ { + for _, tqt := range tqTypes { + remaining[partAndType{i, tqt}] = struct{}{} + } + } + nsId := s.GetNamespaceID(s.Namespace()) + f, err := tqid.NewTaskQueueFamily(nsId, tv.TaskQueue().GetName()) + deployment := tv.Deployment() + s.Eventually(func() bool { + for pt := range remaining { + s.NoError(err) + partition := f.TaskQueue(pt.tp).NormalPartition(pt.part) + // Use lower-level GetTaskQueueUserData instead of GetWorkerBuildIdCompatibility + // here so that we can target activity queues. + res, err := s.GetTestCluster().MatchingClient().GetTaskQueueUserData( + ctx, + &matchingservice.GetTaskQueueUserDataRequest{ + NamespaceId: nsId, + TaskQueue: partition.RpcName(), + TaskQueueType: partition.TaskType(), + }) + s.NoError(err) + perTypes := res.GetUserData().GetData().GetPerType() + if perTypes != nil { + deps := perTypes[int32(pt.tp)].GetDeploymentData().GetDeployments() + for _, d := range deps { + if d.GetDeployment().Equal(deployment) { + delete(remaining, pt) + } + } + } + } + return len(remaining) == 0 + }, 10*time.Second, 100*time.Millisecond) +} + +func makePinnedOverride(d *deploymentpb.Deployment) *workflow.VersioningOverride { + return &workflow.VersioningOverride{Behavior: vbPinned, Deployment: d} } diff --git a/tests/versioning_test.go b/tests/versioning_test.go index fcead1ca710..0f09fda385f 100644 --- a/tests/versioning_test.go +++ b/tests/versioning_test.go @@ -86,6 +86,7 @@ func TestVersioningFunctionalSuite(t *testing.T) { func (s *VersioningIntegSuite) SetupSuite() { dynamicConfigOverrides := map[dynamicconfig.Key]any{ + dynamicconfig.EnableDeployments.Key(): true, dynamicconfig.FrontendEnableWorkerVersioningDataAPIs.Key(): true, dynamicconfig.FrontendEnableWorkerVersioningWorkflowAPIs.Key(): true, dynamicconfig.FrontendEnableWorkerVersioningRuleAPIs.Key(): true,