diff --git a/service/worker/migration/replication.go b/service/worker/migration/workflow.go similarity index 78% rename from service/worker/migration/replication.go rename to service/worker/migration/workflow.go index ca57632c45c..b51c980ddda 100644 --- a/service/worker/migration/replication.go +++ b/service/worker/migration/workflow.go @@ -53,7 +53,10 @@ import ( const ( forceReplicationWorkflowName = "force-replication" namespaceHandoverWorkflowName = "namespace-handover" - listExecutionPageSize = 1000 + + defaultListWorkflowsPageSize = 1000 + defaultPageCountPerExecution = 200 + maxPageCountPerExecution = 1000 minimumAllowedLaggingSeconds = 5 minimumHandoverTimeoutSeconds = 30 @@ -62,10 +65,12 @@ const ( type ( ForceReplicationParams struct { Namespace string - SkipAfterTime time.Time // skip workflows that are updated after this time - ConcurrentActivityCount int32 - RemoteCluster string // remote cluster name + Query string // query to list workflows for replication + ConcurrentActivityCount int RpsPerActivity int // RPS per each activity + ListWorkflowsPageSize int // PageSize of ListWorkflow, will paginate through results. + PageCountPerExecution int // number of pages to be processed before continue as new, max is 1000. + NextPageToken []byte // used by continue as new } NamespaceHandoverParams struct { @@ -88,27 +93,15 @@ type ( metricsClient metrics.Client } - genReplicationForShardRange struct { - BeginShardID int32 // inclusive - EndShardID int32 // inclusive - NamespaceID string // only generate replication tasks for workflows in this namespace - SkipAfterTime time.Time // skip workflows whose LastUpdateTime is after this time - RpsPerActivity int // RPS per activity - } - - genReplicationForShard struct { - ShardID int32 - NamespaceID string - SkipAfterTime time.Time - PageToken []byte - Index int - RPS int + listWorkflowsResponse struct { + Executions []commonpb.WorkflowExecution + NextPageToken []byte } - heartbeatProgress struct { - ShardID int32 - PageToken []byte - Index int + generateReplicationTasksRequest struct { + NamespaceID string + Executions []commonpb.WorkflowExecution + RPS int } metadataRequest struct { @@ -150,81 +143,102 @@ type ( var ( historyServiceRetryPolicy = common.CreateHistoryServiceRetryPolicy() - persistenceRetryPolicy = common.CreateHistoryServiceRetryPolicy() ) func ForceReplicationWorkflow(ctx workflow.Context, params ForceReplicationParams) error { if len(params.Namespace) == 0 { return errors.New("InvalidArgument: Namespace is required") } - if len(params.RemoteCluster) == 0 { - return errors.New("InvalidArgument: RemoteCluster is required") - } if params.ConcurrentActivityCount <= 0 { params.ConcurrentActivityCount = 1 } if params.RpsPerActivity <= 0 { params.RpsPerActivity = 1 } + if params.ListWorkflowsPageSize <= 0 { + params.ListWorkflowsPageSize = defaultListWorkflowsPageSize + } + if params.PageCountPerExecution <= 0 { + params.PageCountPerExecution = defaultPageCountPerExecution + } + if params.PageCountPerExecution > maxPageCountPerExecution { + params.PageCountPerExecution = maxPageCountPerExecution + } retryPolicy := &temporal.RetryPolicy{ InitialInterval: time.Second, MaximumInterval: time.Second * 10, } - // ** Step 1, Get cluster metadata ** - ao := workflow.ActivityOptions{ + // Get cluster metadata, we need namespace ID for history API call. + // TODO: remove this step. + lao := workflow.LocalActivityOptions{ StartToCloseTimeout: time.Second * 10, RetryPolicy: retryPolicy, } - ctx1 := workflow.WithActivityOptions(ctx, ao) + ctx1 := workflow.WithLocalActivityOptions(ctx, lao) var a *activities var metadataResp metadataResponse metadataRequest := metadataRequest{Namespace: params.Namespace} - err := workflow.ExecuteActivity(ctx1, a.GetMetadata, metadataRequest).Get(ctx1, &metadataResp) + err := workflow.ExecuteLocalActivity(ctx1, a.GetMetadata, metadataRequest).Get(ctx1, &metadataResp) if err != nil { return err } - // ** Step 2, Force replication ** - ao2 := workflow.ActivityOptions{ - StartToCloseTimeout: time.Hour * 10, + selector := workflow.NewSelector(ctx) + pendingActivities := 0 + + ao := workflow.ActivityOptions{ + StartToCloseTimeout: time.Hour, HeartbeatTimeout: time.Second * 30, RetryPolicy: retryPolicy, } - ctx2 := workflow.WithActivityOptions(ctx, ao2) + ctx2 := workflow.WithActivityOptions(ctx, ao) - concurrentCount := params.ConcurrentActivityCount - shardCount := metadataResp.ShardCount - skipAfter := params.SkipAfterTime - if skipAfter.IsZero() { - skipAfter = workflow.Now(ctx2) - } - var futures []workflow.Future - batchSize := (shardCount + concurrentCount - 1) / concurrentCount - for beginShardID := int32(1); beginShardID <= shardCount; beginShardID += batchSize { - endShardID := beginShardID + batchSize - 1 - if endShardID > shardCount { - endShardID = shardCount + for i := 0; i < params.PageCountPerExecution; i++ { + listFuture := workflow.ExecuteLocalActivity(ctx1, a.ListWorkflows, &workflowservice.ListWorkflowExecutionsRequest{ + Namespace: params.Namespace, + PageSize: int32(params.ListWorkflowsPageSize), + NextPageToken: params.NextPageToken, + Query: params.Query, + }) + var listResp listWorkflowsResponse + err = listFuture.Get(ctx1, &listResp) + if err != nil { + return err } - rangeRequest := genReplicationForShardRange{ - BeginShardID: beginShardID, - EndShardID: endShardID, - NamespaceID: metadataResp.NamespaceID, - SkipAfterTime: skipAfter, - RpsPerActivity: params.RpsPerActivity, + + workerFuture := workflow.ExecuteActivity(ctx2, a.GenerateReplicationTasks, &generateReplicationTasksRequest{ + NamespaceID: metadataResp.NamespaceID, + Executions: listResp.Executions, + RPS: params.RpsPerActivity, + }) + pendingActivities++ + selector.AddFuture(workerFuture, func(f workflow.Future) { + pendingActivities-- + }) + + if pendingActivities >= params.ConcurrentActivityCount { + selector.Select(ctx) // this will block until one of the pending activities complete } - future := workflow.ExecuteActivity(ctx2, a.GenerateReplicationTasks, rangeRequest) - futures = append(futures, future) - } - for _, f := range futures { - if err := f.Get(ctx2, nil); err != nil { - return err + params.NextPageToken = listResp.NextPageToken + if params.NextPageToken == nil { + break } } + // wait until all pending activities are done + for pendingActivities > 0 { + selector.Select(ctx) + } - return nil + if params.NextPageToken == nil { + // we are all done + return nil + } + + // too many pages, and we exceed PageCountPerExecution, so move on to next execution + return workflow.NewContinueAsNewError(ctx, ForceReplicationWorkflow, params) } func NamespaceHandoverWorkflow(ctx workflow.Context, params NamespaceHandoverParams) error { @@ -350,30 +364,39 @@ func (a *activities) GetMetadata(ctx context.Context, request metadataRequest) ( }, nil } -// GenerateReplicationTasks generates replication task for last history event for each workflow. -func (a *activities) GenerateReplicationTasks(ctx context.Context, request genReplicationForShardRange) error { - perShard := genReplicationForShard{ - ShardID: request.BeginShardID, - NamespaceID: request.NamespaceID, - SkipAfterTime: request.SkipAfterTime, - RPS: request.RpsPerActivity, +func (a *activities) ListWorkflows(ctx context.Context, request *workflowservice.ListWorkflowExecutionsRequest) (*listWorkflowsResponse, error) { + resp, err := a.frontendClient.ListWorkflowExecutions(ctx, request) + if err != nil { + return nil, err + } + executions := make([]commonpb.WorkflowExecution, len(resp.Executions)) + for i, e := range resp.Executions { + executions[i] = *e.Execution } - var progress heartbeatProgress + return &listWorkflowsResponse{Executions: executions, NextPageToken: resp.NextPageToken}, nil +} + +func (a *activities) GenerateReplicationTasks(ctx context.Context, request *generateReplicationTasksRequest) error { + rateLimiter := quotas.NewRateLimiter(float64(request.RPS), request.RPS) + + startIndex := 0 if activity.HasHeartbeatDetails(ctx) { - if err := activity.GetHeartbeatDetails(ctx, &progress); err == nil { - perShard.ShardID = progress.ShardID - perShard.PageToken = progress.PageToken - perShard.Index = progress.Index + var finishedIndex int + if err := activity.GetHeartbeatDetails(ctx, &finishedIndex); err == nil { + startIndex = finishedIndex + 1 // start from next one } } - for ; perShard.ShardID <= request.EndShardID; perShard.ShardID++ { - if err := a.genReplicationTasks(ctx, perShard); err != nil { + + for i := startIndex; i < len(request.Executions); i++ { + rateLimiter.Wait(ctx) + we := request.Executions[i] + err := a.generateWorkflowReplicationTask(ctx, definition.NewWorkflowKey(request.NamespaceID, we.WorkflowId, we.RunId)) + if err != nil { return err } - // heartbeat progress only apply for first shard - perShard.PageToken = nil - perShard.Index = 0 + activity.RecordHeartbeat(ctx, i) } + return nil } @@ -533,63 +556,7 @@ func (a *activities) checkHandoverOnce(ctx context.Context, waitRequest waitHand return readyShardCount == len(resp.Shards), nil } -func (a *activities) genReplicationTasks(ctx context.Context, request genReplicationForShard) error { - pageToken := request.PageToken - startIndex := request.Index - rateLimiter := quotas.NewRateLimiter(float64(request.RPS), request.RPS) - - for { - var listResult *persistence.ListConcreteExecutionsResponse - op := func(ctx context.Context) error { - var err error - listResult, err = a.executionManager.ListConcreteExecutions(&persistence.ListConcreteExecutionsRequest{ - ShardID: request.ShardID, - PageSize: listExecutionPageSize, - PageToken: pageToken, - }) - return err - } - - rateLimiter.Wait(ctx) - err := backoff.RetryContext(ctx, op, persistenceRetryPolicy, common.IsPersistenceTransientError) - if err != nil { - return err - } - - for i := startIndex; i < len(listResult.States); i++ { - activity.RecordHeartbeat(ctx, heartbeatProgress{ - ShardID: request.ShardID, - PageToken: pageToken, - Index: i, - }) - - ms := listResult.States[i] - if ms.ExecutionInfo.LastUpdateTime != nil && ms.ExecutionInfo.LastUpdateTime.After(request.SkipAfterTime) { - // workflow was updated after SkipAfterTime, no need to generate replication task - continue - } - if ms.ExecutionInfo.NamespaceId != request.NamespaceID { - // skip if not target namespace - continue - } - rateLimiter.Wait(ctx) - err := a.genReplicationTaskForOneWorkflow(ctx, definition.NewWorkflowKey(request.NamespaceID, ms.ExecutionInfo.WorkflowId, ms.ExecutionState.RunId)) - if err != nil { - return err - } - } - - pageToken = listResult.PageToken - startIndex = 0 - if pageToken == nil { - break - } - } - - return nil -} - -func (a *activities) genReplicationTaskForOneWorkflow(ctx context.Context, wKey definition.WorkflowKey) error { +func (a *activities) generateWorkflowReplicationTask(ctx context.Context, wKey definition.WorkflowKey) error { // will generate replication task op := func(ctx context.Context) error { var err error diff --git a/service/worker/migration/workflow_test.go b/service/worker/migration/workflow_test.go new file mode 100644 index 00000000000..e7e4fc41441 --- /dev/null +++ b/service/worker/migration/workflow_test.go @@ -0,0 +1,127 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package migration + +import ( + "context" + "testing" + + "github.com/pborman/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + commonpb "go.temporal.io/api/common/v1" + "go.temporal.io/api/workflowservice/v1" + "go.temporal.io/sdk/testsuite" +) + +func TestForceReplicationWorkflow(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestWorkflowEnvironment() + + namespaceID := uuid.New() + + var a *activities + env.OnActivity(a.GetMetadata, mock.Anything, metadataRequest{Namespace: "test-ns"}).Return(&metadataResponse{ShardCount: 4, NamespaceID: namespaceID}, nil) + + totalPageCount := 4 + currentPageCount := 0 + env.OnActivity(a.ListWorkflows, mock.Anything, mock.Anything).Return(func(ctx context.Context, request *workflowservice.ListWorkflowExecutionsRequest) (*listWorkflowsResponse, error) { + assert.Equal(t, "test-ns", request.Namespace) + currentPageCount++ + if currentPageCount < totalPageCount { + return &listWorkflowsResponse{ + Executions: []commonpb.WorkflowExecution{}, + NextPageToken: []byte("fake-page-token"), + }, nil + } + // your mock function implementation + return &listWorkflowsResponse{ + Executions: []commonpb.WorkflowExecution{}, + NextPageToken: nil, // last page + }, nil + }).Times(totalPageCount) + + env.OnActivity(a.GenerateReplicationTasks, mock.Anything, mock.Anything).Return(nil).Times(totalPageCount) + + env.ExecuteWorkflow(ForceReplicationWorkflow, ForceReplicationParams{ + Namespace: "test-ns", + Query: "", + ConcurrentActivityCount: 2, + RpsPerActivity: 10, + ListWorkflowsPageSize: 1, + PageCountPerExecution: 4, + }) + + require.True(t, env.IsWorkflowCompleted()) + require.NoError(t, env.GetWorkflowError()) + env.AssertExpectations(t) +} + +func TestForceReplicationWorkflow_ContinueAsNew(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestWorkflowEnvironment() + + namespaceID := uuid.New() + + var a *activities + env.OnActivity(a.GetMetadata, mock.Anything, metadataRequest{Namespace: "test-ns"}).Return(&metadataResponse{ShardCount: 4, NamespaceID: namespaceID}, nil) + + totalPageCount := 4 + currentPageCount := 0 + maxPageCountPerExecution := 2 + env.OnActivity(a.ListWorkflows, mock.Anything, mock.Anything).Return(func(ctx context.Context, request *workflowservice.ListWorkflowExecutionsRequest) (*listWorkflowsResponse, error) { + assert.Equal(t, "test-ns", request.Namespace) + currentPageCount++ + if currentPageCount < totalPageCount { + return &listWorkflowsResponse{ + Executions: []commonpb.WorkflowExecution{}, + NextPageToken: []byte("fake-page-token"), + }, nil + } + // your mock function implementation + return &listWorkflowsResponse{ + Executions: []commonpb.WorkflowExecution{}, + NextPageToken: nil, // last page + }, nil + }).Times(maxPageCountPerExecution) + + env.OnActivity(a.GenerateReplicationTasks, mock.Anything, mock.Anything).Return(nil).Times(maxPageCountPerExecution) + + env.ExecuteWorkflow(ForceReplicationWorkflow, ForceReplicationParams{ + Namespace: "test-ns", + Query: "", + ConcurrentActivityCount: 2, + RpsPerActivity: 10, + ListWorkflowsPageSize: 1, + PageCountPerExecution: maxPageCountPerExecution, + }) + + require.True(t, env.IsWorkflowCompleted()) + err := env.GetWorkflowError() + require.Error(t, err) + require.Contains(t, err.Error(), "continue as new") + env.AssertExpectations(t) +}