diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index 33d3d5ad3e4..fdb36958a4f 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -1844,12 +1844,6 @@ const ( // Default value: true // Allowed filters: N/A EnableFailoverManager - // EnableWorkflowShadower indicates if workflow shadower is enabled - // KeyName: system.enableWorkflowShadower - // Value type: Bool - // Default value: true - // Allowed filters: N/A - EnableWorkflowShadower // ConcreteExecutionFixerDomainAllow is which domains are allowed to be fixed by concrete fixer workflow // KeyName: worker.concreteExecutionFixerDomainAllow // Value type: Bool @@ -4085,11 +4079,6 @@ var BoolKeys = map[BoolKey]DynamicBool{ Description: "EnableFailoverManager indicates if failover manager is enabled", DefaultValue: true, }, - EnableWorkflowShadower: DynamicBool{ - KeyName: "system.enableWorkflowShadower", - Description: "EnableWorkflowShadower indicates if workflow shadower is enabled", - DefaultValue: true, - }, ConcreteExecutionFixerDomainAllow: DynamicBool{ KeyName: "worker.concreteExecutionFixerDomainAllow", Filters: []Filter{DomainName}, diff --git a/service/worker/service.go b/service/worker/service.go index f15599631fb..6092af0d74b 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -49,7 +49,6 @@ import ( "github.com/uber/cadence/service/worker/scanner/shardscanner" "github.com/uber/cadence/service/worker/scanner/tasklist" "github.com/uber/cadence/service/worker/scanner/timers" - "github.com/uber/cadence/service/worker/shadower" ) type ( @@ -81,10 +80,8 @@ type ( EnableParentClosePolicyWorker dynamicconfig.BoolPropertyFn NumParentClosePolicySystemWorkflows dynamicconfig.IntPropertyFn EnableFailoverManager dynamicconfig.BoolPropertyFn - EnableWorkflowShadower dynamicconfig.BoolPropertyFn DomainReplicationMaxRetryDuration dynamicconfig.DurationPropertyFn EnableESAnalyzer dynamicconfig.BoolPropertyFn - EnableWatchDog dynamicconfig.BoolPropertyFn EnableAsyncWorkflowConsumption dynamicconfig.BoolPropertyFn HostName string } @@ -177,7 +174,6 @@ func NewConfig(params *resource.Params) *Config { NumParentClosePolicySystemWorkflows: dc.GetIntProperty(dynamicconfig.NumParentClosePolicySystemWorkflows), EnableESAnalyzer: dc.GetBoolProperty(dynamicconfig.EnableESAnalyzer), EnableFailoverManager: dc.GetBoolProperty(dynamicconfig.EnableFailoverManager), - EnableWorkflowShadower: dc.GetBoolProperty(dynamicconfig.EnableWorkflowShadower), ThrottledLogRPS: dc.GetIntProperty(dynamicconfig.WorkerThrottledLogRPS), PersistenceGlobalMaxQPS: dc.GetIntProperty(dynamicconfig.WorkerPersistenceGlobalMaxQPS), PersistenceMaxQPS: dc.GetIntProperty(dynamicconfig.WorkerPersistenceMaxQPS), @@ -238,10 +234,6 @@ func (s *Service) Start() { if s.config.EnableFailoverManager() { s.startFailoverManager() } - if s.config.EnableWorkflowShadower() { - s.ensureDomainExists(common.ShadowerLocalDomainName) - s.startWorkflowShadower() - } if s.config.EnableAsyncWorkflowConsumption() { cm := s.startAsyncWorkflowConsumerManager() @@ -407,18 +399,6 @@ func (s *Service) startFailoverManager() { } } -func (s *Service) startWorkflowShadower() { - params := &shadower.BootstrapParams{ - ServiceClient: s.params.PublicClient, - DomainCache: s.GetDomainCache(), - TallyScope: s.params.MetricScope, - } - if err := shadower.New(params).Start(); err != nil { - s.Stop() - s.GetLogger().Fatal("error starting workflow shadower", tag.Error(err)) - } -} - func (s *Service) startAsyncWorkflowConsumerManager() common.Daemon { cm := asyncworkflow.NewConsumerManager( s.GetLogger(), diff --git a/service/worker/shadower/profile.go b/service/worker/shadower/profile.go deleted file mode 100644 index 8280901cf7a..00000000000 --- a/service/worker/shadower/profile.go +++ /dev/null @@ -1,96 +0,0 @@ -// Copyright (c) 2017-2021 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package shadower - -import ( - "time" - - "github.com/uber-go/tally" - "go.uber.org/cadence/workflow" - "go.uber.org/zap" - - "github.com/uber/cadence/.gen/go/shadower" -) - -type ( - workflowProfile struct { - ctx workflow.Context - startTime time.Time - scope tally.Scope - logger *zap.Logger - } -) - -const ( - tagShadowDomain = "shadow-domain" - tagShadowTaskList = "shadow-tasklist" -) - -const ( - shadowWorkflowLatency = "shadow-workflow-latency" - shadowWorkflowStarted = "shadow-workflow-started" - shadowWorkflowCompleted = "shadow-workflow-completed" - shadowWorkflowContinueAsNew = "shadow-workflow-continueasnew" - shadowWorkflowFailed = "shadow-workflow-failed" -) - -func beginWorkflow( - ctx workflow.Context, - params *shadower.WorkflowParams, -) *workflowProfile { - taggedScope := workflow.GetMetricsScope(ctx).Tagged(map[string]string{ - tagShadowDomain: params.GetDomain(), - tagShadowTaskList: params.GetTaskList(), - }) - taggedLogger := workflow.GetLogger(ctx).With( - zap.String(tagShadowDomain, params.GetDomain()), - zap.String(tagShadowTaskList, params.GetTaskList()), - ) - if params.LastRunResult == nil { - taggedScope.Counter(shadowWorkflowStarted).Inc(1) - taggedLogger.Info("Shadow workflow started") - } - return &workflowProfile{ - ctx: ctx, - startTime: workflow.Now(ctx), - scope: taggedScope, - logger: taggedLogger, - } -} - -func (p *workflowProfile) endWorkflow( - err error, -) error { - now := workflow.Now(p.ctx) - p.scope.Timer(shadowWorkflowLatency).Record(now.Sub(p.startTime)) - switch err.(type) { - case nil: - p.scope.Counter(shadowWorkflowCompleted).Inc(1) - p.logger.Info("Shadow workflow completed") - case *workflow.ContinueAsNewError: - p.scope.Counter(shadowWorkflowContinueAsNew).Inc(1) - p.logger.Info("Shadow workflow continued as new") - default: - p.scope.Counter(shadowWorkflowFailed).Inc(1) - p.logger.With(zap.Error(err)).Error("Shadow workflow failed") - } - return err -} diff --git a/service/worker/shadower/worker.go b/service/worker/shadower/worker.go deleted file mode 100644 index 08cab5150fa..00000000000 --- a/service/worker/shadower/worker.go +++ /dev/null @@ -1,83 +0,0 @@ -// Copyright (c) 2017-2021 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package shadower - -import ( - "context" - - "github.com/uber-go/tally" - "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" - "go.uber.org/cadence/worker" - - "github.com/uber/cadence/.gen/go/shadower" - "github.com/uber/cadence/common" - "github.com/uber/cadence/common/cache" -) - -type ( - // BootstrapParams contains the set of params needed to bootstrap workflow shadower worker - BootstrapParams struct { - ServiceClient workflowserviceclient.Interface - DomainCache cache.DomainCache - TallyScope tally.Scope - } - - // Worker is for executing decision task generated by shadowing workflows - Worker struct { - decisionWorker worker.Worker - domainCache cache.DomainCache - } - - contextKey string -) - -const ( - workerContextKey contextKey = "shadower-worker-context" -) - -// New creates a new worker for processing decision tasks from shadow workflow -func New(params *BootstrapParams) *Worker { - w := &Worker{ - domainCache: params.DomainCache, - } - ctx := context.WithValue(context.Background(), workerContextKey, w) - w.decisionWorker = worker.New( - params.ServiceClient, - common.ShadowerLocalDomainName, - shadower.TaskList, - worker.Options{ - BackgroundActivityContext: ctx, - DisableActivityWorker: true, - MetricsScope: params.TallyScope, - }, - ) - register(w.decisionWorker) - return w -} - -// Start starts the decision worker -func (w *Worker) Start() error { - if err := w.decisionWorker.Start(); err != nil { - w.decisionWorker.Stop() - return err - } - return nil -} diff --git a/service/worker/shadower/workflow.go b/service/worker/shadower/workflow.go deleted file mode 100644 index 1e2bf87cfb0..00000000000 --- a/service/worker/shadower/workflow.go +++ /dev/null @@ -1,308 +0,0 @@ -// Copyright (c) 2017-2021 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package shadower - -import ( - "errors" - "time" - - "go.uber.org/cadence" - "go.uber.org/cadence/worker" - "go.uber.org/cadence/workflow" - - "github.com/uber/cadence/.gen/go/shadower" - "github.com/uber/cadence/.gen/go/shared" - "github.com/uber/cadence/common" -) - -const ( - defaultScanWorkflowPageSize = 1000 - defaultSamplingRate = 1.0 - defaultReplayConcurrency = 1 - defaultMaxReplayConcurrency = 50 - defaultMaxShadowCountPerRun = 20000 - defaultWaitDurationPerIteration = 5 * time.Minute -) - -type ( - workflowConfig struct { - ScanWorkflowPageSize int32 - DefaultSamplingRate float64 - DefaultReplayConcurrency int32 - MaxReplayConcurrency int32 - MaxShadowCountPerRun int32 - WaitDurationPerIteration time.Duration - } -) - -func register(worker worker.Worker) { - worker.RegisterWorkflowWithOptions( - shadowWorkflow, - workflow.RegisterOptions{Name: shadower.WorkflowName}, - ) -} - -func shadowWorkflow( - ctx workflow.Context, - params shadower.WorkflowParams, -) (shadower.WorkflowResult, error) { - profile := beginWorkflow(ctx, ¶ms) - - var config workflowConfig - config, err := getWorkflowConfig(ctx) - if err != nil { - return shadower.WorkflowResult{}, profile.endWorkflow(err) - } - - if err := validateAndFillWorkflowParams(¶ms, &config); err != nil { - return shadower.WorkflowResult{}, profile.endWorkflow(err) - } - - workflowTimeout := time.Duration(workflow.GetInfo(ctx).ExecutionStartToCloseTimeoutSeconds) * time.Second - retryPolicy := &cadence.RetryPolicy{ - InitialInterval: time.Second, - BackoffCoefficient: 2, - MaximumInterval: time.Minute, - ExpirationInterval: workflowTimeout, // retry until workflow timeout - NonRetriableErrorReasons: []string{ - shadower.ErrReasonDomainNotExists, - shadower.ErrReasonInvalidQuery, - shadower.ErrReasonWorkflowTypeNotRegistered, - shadower.ErrNonRetryableType, // java non-retryable error type - }, - } - scanWorkflowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - TaskList: params.GetTaskList(), - ScheduleToStartTimeout: time.Minute, - StartToCloseTimeout: time.Minute, - RetryPolicy: retryPolicy, - }) - replayWorkflowCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - TaskList: params.GetTaskList(), - ScheduleToStartTimeout: time.Minute, - StartToCloseTimeout: time.Duration(config.ScanWorkflowPageSize/params.GetConcurrency()+1) * time.Minute, - // do not use a short heartbeat timeout here, - // as replay may take some time if workflow history is large or retrying due to some transient errors - // this is mainly for java, go replay activity can enable auto heartbeating - HeartbeatTimeout: 2 * time.Minute, - RetryPolicy: retryPolicy, - }) - - shadowResult := shadower.WorkflowResult{ - Succeeded: common.Int32Ptr(0), - Skipped: common.Int32Ptr(0), - Failed: common.Int32Ptr(0), - } - scanParams := shadower.ScanWorkflowActivityParams{ - Domain: params.Domain, - WorkflowQuery: params.WorkflowQuery, - NextPageToken: params.NextPageToken, - PageSize: common.Int32Ptr(config.ScanWorkflowPageSize), - SamplingRate: params.SamplingRate, - } - for { - var scanResult shadower.ScanWorkflowActivityResult - if err := workflow.ExecuteActivity(scanWorkflowCtx, shadower.ScanWorkflowActivityName, scanParams).Get(scanWorkflowCtx, &scanResult); err != nil { - return shadower.WorkflowResult{}, profile.endWorkflow(err) - } - - replayFutures := make([]workflow.Future, 0, params.GetConcurrency()) - for _, executions := range splitExecutions(scanResult.Executions, int(params.GetConcurrency())) { - replayParams := shadower.ReplayWorkflowActivityParams{ - Domain: params.Domain, - Executions: executions, - } - future := workflow.ExecuteActivity(replayWorkflowCtx, shadower.ReplayWorkflowActivityName, replayParams) - replayFutures = append(replayFutures, future) - } - - for _, future := range replayFutures { - var replayResult shadower.ReplayWorkflowActivityResult - if err := future.Get(replayWorkflowCtx, &replayResult); err != nil { - return shadower.WorkflowResult{}, profile.endWorkflow(err) - } - *shadowResult.Succeeded += replayResult.GetSucceeded() - *shadowResult.Skipped += replayResult.GetSkipped() - *shadowResult.Failed += replayResult.GetFailed() - - if exitConditionMet(ctx, params.GetExitCondition(), profile.startTime, shadowResult) { - return combineShadowResults(shadowResult, params.GetLastRunResult()), profile.endWorkflow(nil) - } - } - - scanParams.NextPageToken = scanResult.NextPageToken - if len(scanParams.NextPageToken) == 0 { - break - } - - if shouldContinueAsNew(shadowResult, &config) { - continueAsNewErr := getContinueAsNewError(ctx, params, profile.startTime, params.GetLastRunResult(), shadowResult, scanParams.NextPageToken) - return shadower.WorkflowResult{}, profile.endWorkflow(continueAsNewErr) - } - } - - if params.GetShadowMode() == shadower.ModeContinuous { - if err := workflow.Sleep(ctx, config.WaitDurationPerIteration); err != nil { - return shadower.WorkflowResult{}, profile.endWorkflow(err) - } - continueAsNewErr := getContinueAsNewError(ctx, params, profile.startTime, params.GetLastRunResult(), shadowResult, nil) - return shadower.WorkflowResult{}, profile.endWorkflow(continueAsNewErr) - } - - return combineShadowResults(shadowResult, params.GetLastRunResult()), profile.endWorkflow(nil) -} - -func getWorkflowConfig( - ctx workflow.Context, -) (workflowConfig, error) { - var config workflowConfig - if err := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} { - return workflowConfig{ - ScanWorkflowPageSize: defaultScanWorkflowPageSize, - DefaultSamplingRate: defaultSamplingRate, - DefaultReplayConcurrency: defaultReplayConcurrency, - MaxReplayConcurrency: defaultMaxReplayConcurrency, - MaxShadowCountPerRun: defaultMaxShadowCountPerRun, - WaitDurationPerIteration: defaultWaitDurationPerIteration, - } - }).Get(&config); err != nil { - return workflowConfig{}, err - } - return config, nil -} - -func validateAndFillWorkflowParams( - params *shadower.WorkflowParams, - config *workflowConfig, -) error { - if len(params.GetDomain()) == 0 { - return errors.New("domain is not set on shadower workflow params") - } - - if len(params.GetTaskList()) == 0 { - return errors.New("TaskList is not set on shadower workflow params") - } - - if params.GetSamplingRate() == 0 { - params.SamplingRate = common.Float64Ptr(config.DefaultSamplingRate) - } - - if params.GetConcurrency() == 0 { - params.Concurrency = common.Int32Ptr(config.DefaultReplayConcurrency) - } - - if params.GetConcurrency() > config.MaxReplayConcurrency { - params.Concurrency = common.Int32Ptr(config.MaxReplayConcurrency) - } - - return nil -} - -func splitExecutions( - executions []*shared.WorkflowExecution, - concurrency int, -) [][]*shared.WorkflowExecution { - var result [][]*shared.WorkflowExecution - size := (len(executions) + concurrency - 1) / concurrency - for start := 0; start < len(executions); start += size { - end := start + size - if end > len(executions) { - end = len(executions) - } - result = append(result, executions[start:end]) - } - return result -} - -func exitConditionMet( - ctx workflow.Context, - exitCondition *shadower.ExitCondition, - startTime time.Time, - currentResult shadower.WorkflowResult, -) bool { - if exitCondition == nil { - return false - } - - expirationInterval := time.Duration(exitCondition.GetExpirationIntervalInSeconds()) * time.Second - if expirationInterval != 0 && - workflow.Now(ctx).Sub(startTime) > expirationInterval { - return true - } - - shadowCount := exitCondition.GetShadowCount() - if shadowCount != 0 && - currentResult.GetSucceeded()+currentResult.GetFailed() >= shadowCount { - return true - } - - return false -} - -func shouldContinueAsNew( - currentResult shadower.WorkflowResult, - config *workflowConfig, -) bool { - return currentResult.GetSucceeded()+currentResult.GetSkipped()+currentResult.GetFailed() >= config.MaxShadowCountPerRun -} - -func getContinueAsNewError( - ctx workflow.Context, - params shadower.WorkflowParams, - startTime time.Time, - lastRunResult *shadower.WorkflowResult, - currentResult shadower.WorkflowResult, - nextPageToken []byte, -) error { - params.NextPageToken = nextPageToken - if params.GetExitCondition() != nil { - if expirationInterval := params.ExitCondition.GetExpirationIntervalInSeconds(); expirationInterval != 0 { - params.ExitCondition.ExpirationIntervalInSeconds = common.Int32Ptr(expirationInterval - int32(workflow.Now(ctx).Sub(startTime).Seconds())) - } - - if shadowCount := params.ExitCondition.GetShadowCount(); shadowCount != 0 { - params.ExitCondition.ShadowCount = common.Int32Ptr(shadowCount - (currentResult.GetSucceeded() + currentResult.GetFailed())) - } - } - - combineShadowResults(currentResult, lastRunResult) - params.LastRunResult = ¤tResult - - return workflow.NewContinueAsNewError( - ctx, - shadower.WorkflowName, - params, - ) -} - -func combineShadowResults( - currentResult shadower.WorkflowResult, - lastRunResult *shadower.WorkflowResult, -) shadower.WorkflowResult { - if lastRunResult == nil { - return currentResult - } - - *currentResult.Succeeded += lastRunResult.GetSucceeded() - *currentResult.Skipped += lastRunResult.GetSkipped() - *currentResult.Failed += lastRunResult.GetFailed() - return currentResult -} diff --git a/service/worker/shadower/workflow_test.go b/service/worker/shadower/workflow_test.go deleted file mode 100644 index 9d2ab66334b..00000000000 --- a/service/worker/shadower/workflow_test.go +++ /dev/null @@ -1,438 +0,0 @@ -// Copyright (c) 2017-2021 Uber Technologies, Inc. -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package shadower - -import ( - "context" - "fmt" - "testing" - "time" - - "github.com/golang/mock/gomock" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - "go.uber.org/cadence" - "go.uber.org/cadence/activity" - "go.uber.org/cadence/testsuite" - "go.uber.org/cadence/worker" - "go.uber.org/cadence/workflow" - - "github.com/uber/cadence/.gen/go/shadower" - "github.com/uber/cadence/.gen/go/shared" - "github.com/uber/cadence/common" - "github.com/uber/cadence/common/cache" - "github.com/uber/cadence/common/cluster" - "github.com/uber/cadence/common/persistence" -) - -const ( - testActiveDomainName = "active-domain" - testStandbyDomainName = "standby-domain" - - testTaskListName = "test-tl" - testWorkflowQuery = "some random workflow query" -) - -type workflowSuite struct { - *require.Assertions - suite.Suite - testsuite.WorkflowTestSuite - - controller *gomock.Controller - mockDomainCache *cache.MockDomainCache - - env *testsuite.TestWorkflowEnvironment -} - -func TestWorkflowSuite(t *testing.T) { - s := new(workflowSuite) - suite.Run(t, s) -} - -func (s *workflowSuite) SetupTest() { - s.Assertions = require.New(s.T()) - - s.controller = gomock.NewController(s.T()) - s.mockDomainCache = cache.NewMockDomainCache(s.controller) - - activeDomainCache := cache.NewGlobalDomainCacheEntryForTest( - &persistence.DomainInfo{ID: "random domainID", Name: testActiveDomainName}, - &persistence.DomainConfig{Retention: 1}, - &persistence.DomainReplicationConfig{ - ActiveClusterName: cluster.TestCurrentClusterName, - Clusters: []*persistence.ClusterReplicationConfig{ - {ClusterName: cluster.TestCurrentClusterName}, - {ClusterName: cluster.TestAlternativeClusterName}, - }, - }, - 1234, - ) - s.mockDomainCache.EXPECT().GetDomain(testActiveDomainName).Return(activeDomainCache, nil).AnyTimes() - - standbyDomainCache := cache.NewGlobalDomainCacheEntryForTest( - &persistence.DomainInfo{ID: "random domainID", Name: testStandbyDomainName}, - &persistence.DomainConfig{Retention: 1}, - &persistence.DomainReplicationConfig{ - ActiveClusterName: cluster.TestAlternativeClusterName, - Clusters: []*persistence.ClusterReplicationConfig{ - {ClusterName: cluster.TestCurrentClusterName}, - {ClusterName: cluster.TestAlternativeClusterName}, - }, - }, - 1234, - ) - s.mockDomainCache.EXPECT().GetDomain(testStandbyDomainName).Return(standbyDomainCache, nil).AnyTimes() - - activityContext := context.Background() - activityContext = context.WithValue(activityContext, workerContextKey, &Worker{ - domainCache: s.mockDomainCache, - }) - - s.env = s.NewTestWorkflowEnvironment() - s.env.SetWorkerOptions(worker.Options{ - BackgroundActivityContext: activityContext, - }) - s.env.RegisterWorkflowWithOptions( - shadowWorkflow, - workflow.RegisterOptions{Name: shadower.WorkflowName}, - ) - s.env.RegisterActivityWithOptions( - testScanWorkflowActivity, - activity.RegisterOptions{Name: shadower.ScanWorkflowActivityName}, - ) - s.env.RegisterActivityWithOptions( - testReplayWorkflowActivity, - activity.RegisterOptions{Name: shadower.ReplayWorkflowActivityName}, - ) -} - -func (s *workflowSuite) TearDownTest() { - s.controller.Finish() - s.env.AssertExpectations(s.T()) -} - -func (s *workflowSuite) TestShadowWorkflow_DomainNotSpecified() { - s.env.ExecuteWorkflow(shadowWorkflow, shadower.WorkflowParams{ - TaskList: common.StringPtr(testTaskListName), - }) - - s.True(s.env.IsWorkflowCompleted()) - s.Error(s.env.GetWorkflowError()) -} - -func (s *workflowSuite) TestShadowWorkflow_TaskListNotSpecified() { - s.env.ExecuteWorkflow(shadowWorkflow, shadower.WorkflowParams{ - Domain: common.StringPtr(testStandbyDomainName), - }) - - s.True(s.env.IsWorkflowCompleted()) - s.Error(s.env.GetWorkflowError()) -} - -func (s *workflowSuite) TestShadowWorkflow_StandbyDomain() { - s.env.ExecuteWorkflow(shadowWorkflow, shadower.WorkflowParams{ - Domain: common.StringPtr(testStandbyDomainName), - TaskList: common.StringPtr(testTaskListName), - }) - - s.True(s.env.IsWorkflowCompleted()) - s.NoError(s.env.GetWorkflowError()) - - var result shadower.WorkflowResult - err := s.env.GetWorkflowResult(&result) - s.NoError(err) -} - -func (s *workflowSuite) TestShadowWorkflow_ScanWorkflowNonRetryableError() { - s.env.OnActivity(shadower.ScanWorkflowActivityName, mock.Anything, mock.Anything).Return( - shadower.ScanWorkflowActivityResult{}, - cadence.NewCustomError(shadower.ErrReasonInvalidQuery, "invalid query"), - ).Once() - s.env.ExecuteWorkflow(shadowWorkflow, shadower.WorkflowParams{ - Domain: common.StringPtr(testActiveDomainName), - TaskList: common.StringPtr(testTaskListName), - WorkflowQuery: common.StringPtr("invalid workflow query"), - }) - - s.True(s.env.IsWorkflowCompleted()) - s.Error(s.env.GetWorkflowError()) -} - -func (s *workflowSuite) TestShadowWorkflow_ReplayWorkflowNonRetryableError() { - s.env.OnActivity(shadower.ScanWorkflowActivityName, mock.Anything, mock.Anything).Return( - shadower.ScanWorkflowActivityResult{ - Executions: make([]*shared.WorkflowExecution, 10), - }, - nil, - ).Once() - s.env.OnActivity(shadower.ReplayWorkflowActivityName, mock.Anything, mock.Anything).Return( - shadower.ReplayWorkflowActivityResult{Succeeded: common.Int32Ptr(0)}, - nil, - ).Once() - s.env.OnActivity(shadower.ReplayWorkflowActivityName, mock.Anything, mock.Anything).Return( - shadower.ReplayWorkflowActivityResult{}, - cadence.NewCustomError(shadower.ErrReasonWorkflowTypeNotRegistered, "workflow not registered"), - ).Once() - s.env.ExecuteWorkflow(shadowWorkflow, shadower.WorkflowParams{ - Domain: common.StringPtr(testActiveDomainName), - TaskList: common.StringPtr(testTaskListName), - WorkflowQuery: common.StringPtr(testWorkflowQuery), - Concurrency: common.Int32Ptr(3), - }) - - s.True(s.env.IsWorkflowCompleted()) - s.Error(s.env.GetWorkflowError()) -} - -func (s *workflowSuite) TestShadowWorkflow_ExitCondition_ShadowCount_NoLastResult() { - shadowCount := 10 - s.env.OnActivity(shadower.ScanWorkflowActivityName, mock.Anything, mock.Anything).Return( - shadower.ScanWorkflowActivityResult{ - Executions: make([]*shared.WorkflowExecution, shadowCount/2+1), - NextPageToken: []byte{1, 2, 3}, - }, - nil, - ).Times(2) - s.env.OnActivity(shadower.ReplayWorkflowActivityName, mock.Anything, mock.Anything).Return( - shadower.ReplayWorkflowActivityResult{Succeeded: common.Int32Ptr(int32(shadowCount)/2 + 1)}, - nil, - ).Times(2) - - s.env.ExecuteWorkflow(shadowWorkflow, shadower.WorkflowParams{ - Domain: common.StringPtr(testActiveDomainName), - TaskList: common.StringPtr(testTaskListName), - WorkflowQuery: common.StringPtr(testWorkflowQuery), - ExitCondition: &shadower.ExitCondition{ - ShadowCount: common.Int32Ptr(int32(shadowCount)), - }, - }) - - s.True(s.env.IsWorkflowCompleted()) - s.NoError(s.env.GetWorkflowError()) - - var result shadower.WorkflowResult - err := s.env.GetWorkflowResult(&result) - s.NoError(err) - s.True(result.GetSucceeded() >= int32(shadowCount)) -} - -func (s *workflowSuite) TestShadowWorkflow_ExitCondition_ShadowCount_WithLastResult() { - shadowCount := 10 - s.env.OnActivity(shadower.ScanWorkflowActivityName, mock.Anything, mock.Anything).Return( - shadower.ScanWorkflowActivityResult{ - Executions: make([]*shared.WorkflowExecution, shadowCount/2+1), - NextPageToken: []byte{1, 2, 3}, - }, - nil, - ).Once() - s.env.OnActivity(shadower.ReplayWorkflowActivityName, mock.Anything, mock.Anything).Return( - shadower.ReplayWorkflowActivityResult{Succeeded: common.Int32Ptr(int32(shadowCount)/2 + 1)}, - nil, - ).Once() - - lastFailed := shadowCount / 2 - s.env.ExecuteWorkflow(shadowWorkflow, shadower.WorkflowParams{ - Domain: common.StringPtr(testActiveDomainName), - TaskList: common.StringPtr(testTaskListName), - WorkflowQuery: common.StringPtr(testWorkflowQuery), - ExitCondition: &shadower.ExitCondition{ - ShadowCount: common.Int32Ptr(int32(shadowCount - lastFailed)), - }, - LastRunResult: &shadower.WorkflowResult{ - Failed: common.Int32Ptr(int32(lastFailed)), - }, - }) - - s.True(s.env.IsWorkflowCompleted()) - s.NoError(s.env.GetWorkflowError()) - - var result shadower.WorkflowResult - err := s.env.GetWorkflowResult(&result) - s.NoError(err) - s.True(result.GetSucceeded()+result.GetFailed() >= int32(shadowCount)) -} - -func (s *workflowSuite) TestShadowWorkflow_ExitCondition_ExpirationInterval() { - expirationInterval := time.Minute - numExecutions := 10 - now := time.Now() - s.env.OnActivity(shadower.ScanWorkflowActivityName, mock.Anything, mock.Anything).Return( - shadower.ScanWorkflowActivityResult{ - Executions: make([]*shared.WorkflowExecution, numExecutions), - NextPageToken: []byte{1, 2, 3}, - }, - nil, - ).Once() - s.env.OnActivity(shadower.ReplayWorkflowActivityName, mock.Anything, mock.Anything).Return( - func(_ context.Context, params shadower.ReplayWorkflowActivityParams) (shadower.ReplayWorkflowActivityResult, error) { - s.env.SetStartTime(now.Add(2 * expirationInterval)) - return shadower.ReplayWorkflowActivityResult{Succeeded: common.Int32Ptr(int32(len(params.Executions)))}, nil - }, - ).Once() - - s.env.SetStartTime(now) - s.env.ExecuteWorkflow(shadowWorkflow, shadower.WorkflowParams{ - Domain: common.StringPtr(testActiveDomainName), - TaskList: common.StringPtr(testTaskListName), - WorkflowQuery: common.StringPtr(testWorkflowQuery), - ExitCondition: &shadower.ExitCondition{ - ExpirationIntervalInSeconds: common.Int32Ptr(int32(expirationInterval.Seconds())), - }, - }) - - s.True(s.env.IsWorkflowCompleted()) - s.NoError(s.env.GetWorkflowError()) - - var result shadower.WorkflowResult - err := s.env.GetWorkflowResult(&result) - s.NoError(err) - s.Equal(int32(numExecutions), result.GetSucceeded()) -} - -func (s *workflowSuite) TestShadowWorkflow_ContinueAsNew_MaxShadowCount() { - pages := 100 - pageSize := defaultMaxShadowCountPerRun / pages - startTime := time.Now() - replayTimePerPage := time.Second - s.env.OnActivity(shadower.ScanWorkflowActivityName, mock.Anything, mock.Anything).Return( - shadower.ScanWorkflowActivityResult{ - Executions: make([]*shared.WorkflowExecution, pageSize), - NextPageToken: []byte{1, 2, 3}, - }, - nil, - ).Times(pages) - s.env.OnActivity(shadower.ReplayWorkflowActivityName, mock.Anything, mock.Anything).Return( - func(_ context.Context, params shadower.ReplayWorkflowActivityParams) (shadower.ReplayWorkflowActivityResult, error) { - startTime = startTime.Add(replayTimePerPage) - s.env.SetStartTime(startTime) - return shadower.ReplayWorkflowActivityResult{Succeeded: common.Int32Ptr(int32(len(params.Executions)))}, nil - }, - ).Times(pages) - - s.env.SetStartTime(startTime) - exitCondition := &shadower.ExitCondition{ - ShadowCount: common.Int32Ptr(defaultMaxShadowCountPerRun * 10), - ExpirationIntervalInSeconds: common.Int32Ptr(1 * int32(pages) * 10), - } - timerFired := 0 - s.env.SetOnTimerFiredListener(func(_ string) { - timerFired++ - }) - s.env.ExecuteWorkflow(shadowWorkflow, shadower.WorkflowParams{ - Domain: common.StringPtr(testActiveDomainName), - TaskList: common.StringPtr(testTaskListName), - WorkflowQuery: common.StringPtr(testWorkflowQuery), - ExitCondition: exitCondition, - }) - - s.True(s.env.IsWorkflowCompleted()) - fmt.Print("error", s.env.GetWorkflowError()) - continueAsNewErr, ok := s.env.GetWorkflowError().(*workflow.ContinueAsNewError) - s.True(ok) - s.Equal(shadower.WorkflowName, continueAsNewErr.WorkflowType().Name) - shadowParams, ok := continueAsNewErr.Args()[0].(shadower.WorkflowParams) - s.True(ok) - s.Equal(testActiveDomainName, shadowParams.GetDomain()) - s.Equal(testTaskListName, shadowParams.GetTaskList()) - s.Equal(testWorkflowQuery, shadowParams.GetWorkflowQuery()) - s.NotNil(shadowParams.NextPageToken) - s.Equal(1.0, shadowParams.GetSamplingRate()) - s.Equal(shadower.ModeNormal, shadowParams.GetShadowMode()) - shadowedWorkflows := shadowParams.GetLastRunResult().GetSucceeded() + shadowParams.GetLastRunResult().GetFailed() - s.Equal(exitCondition.GetShadowCount()-shadowedWorkflows, shadowParams.ExitCondition.GetShadowCount()) - s.Equal(int32(pages)*int32(replayTimePerPage.Seconds()), exitCondition.GetExpirationIntervalInSeconds()-shadowParams.GetExitCondition().GetExpirationIntervalInSeconds()) - s.Equal(int32(defaultReplayConcurrency), shadowParams.GetConcurrency()) - totalWorkflows := shadowedWorkflows + shadowParams.GetLastRunResult().GetSkipped() - s.GreaterOrEqual(totalWorkflows, int32(defaultMaxShadowCountPerRun)) - s.Equal(0, timerFired) -} - -func (s *workflowSuite) TestShadowWorkflow_ContinueAsNew_ContinuousShadowing() { - pageSize := 10 - pages := 5 - samplingRate := 0.5 - s.env.OnActivity(shadower.ScanWorkflowActivityName, mock.Anything, mock.Anything).Return( - shadower.ScanWorkflowActivityResult{ - Executions: make([]*shared.WorkflowExecution, pageSize), - NextPageToken: []byte{1, 2, 3}, - }, - nil, - ).Times(pages - 1) - s.env.OnActivity(shadower.ScanWorkflowActivityName, mock.Anything, mock.Anything).Return( - shadower.ScanWorkflowActivityResult{ - Executions: make([]*shared.WorkflowExecution, pageSize), - }, - nil, - ).Times(1) - s.env.OnActivity(shadower.ReplayWorkflowActivityName, mock.Anything, mock.Anything).Return( - shadower.ReplayWorkflowActivityResult{Succeeded: common.Int32Ptr(int32(pageSize))}, - nil, - ).Times(pages) - - timerFired := 0 - s.env.SetOnTimerFiredListener(func(_ string) { - timerFired++ - }) - - s.env.ExecuteWorkflow(shadowWorkflow, shadower.WorkflowParams{ - Domain: common.StringPtr(testActiveDomainName), - TaskList: common.StringPtr(testTaskListName), - WorkflowQuery: common.StringPtr("some random workflow query"), - ShadowMode: shadower.ModeContinuous.Ptr(), - SamplingRate: common.Float64Ptr(samplingRate), - }) - - s.True(s.env.IsWorkflowCompleted()) - fmt.Print("error", s.env.GetWorkflowError()) - continueAsNewErr, ok := s.env.GetWorkflowError().(*workflow.ContinueAsNewError) - s.True(ok) - s.Equal(shadower.WorkflowName, continueAsNewErr.WorkflowType().Name) - shadowParams, ok := continueAsNewErr.Args()[0].(shadower.WorkflowParams) - s.True(ok) - s.Equal(testActiveDomainName, shadowParams.GetDomain()) - s.Equal(testTaskListName, shadowParams.GetTaskList()) - s.Equal(testWorkflowQuery, shadowParams.GetWorkflowQuery()) - s.Nil(shadowParams.NextPageToken) - s.Equal(samplingRate, shadowParams.GetSamplingRate()) - s.Equal(shadower.ModeContinuous, shadowParams.GetShadowMode()) - s.Empty(shadowParams.ExitCondition) - s.Equal(int32(defaultReplayConcurrency), shadowParams.GetConcurrency()) - s.Equal(int32(pages*pageSize), shadowParams.LastRunResult.GetSucceeded()) - s.Equal(1, timerFired) -} - -// dummy activity implementations for test -// so that we can register and mock the implementation/result - -func testScanWorkflowActivity( - ctx context.Context, - params shadower.ScanWorkflowActivityParams, -) (shadower.ScanWorkflowActivityResult, error) { - return shadower.ScanWorkflowActivityResult{}, nil -} - -func testReplayWorkflowActivity( - ctx context.Context, - params shadower.ReplayWorkflowActivityParams, -) (shadower.ReplayWorkflowActivityResult, error) { - return shadower.ReplayWorkflowActivityResult{}, nil -}