Skip to content

Commit

Permalink
Switch force-replication to scan over visibility (#2386)
Browse files Browse the repository at this point in the history
* Switch force-replication to scan over visibility
  • Loading branch information
yiminc authored Jan 20, 2022
1 parent e16c038 commit b8a36d6
Show file tree
Hide file tree
Showing 2 changed files with 228 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ import (
const (
forceReplicationWorkflowName = "force-replication"
namespaceHandoverWorkflowName = "namespace-handover"
listExecutionPageSize = 1000

defaultListWorkflowsPageSize = 1000
defaultPageCountPerExecution = 200
maxPageCountPerExecution = 1000

minimumAllowedLaggingSeconds = 5
minimumHandoverTimeoutSeconds = 30
Expand All @@ -62,10 +65,12 @@ const (
type (
ForceReplicationParams struct {
Namespace string
SkipAfterTime time.Time // skip workflows that are updated after this time
ConcurrentActivityCount int32
RemoteCluster string // remote cluster name
Query string // query to list workflows for replication
ConcurrentActivityCount int
RpsPerActivity int // RPS per each activity
ListWorkflowsPageSize int // PageSize of ListWorkflow, will paginate through results.
PageCountPerExecution int // number of pages to be processed before continue as new, max is 1000.
NextPageToken []byte // used by continue as new
}

NamespaceHandoverParams struct {
Expand All @@ -88,27 +93,15 @@ type (
metricsClient metrics.Client
}

genReplicationForShardRange struct {
BeginShardID int32 // inclusive
EndShardID int32 // inclusive
NamespaceID string // only generate replication tasks for workflows in this namespace
SkipAfterTime time.Time // skip workflows whose LastUpdateTime is after this time
RpsPerActivity int // RPS per activity
}

genReplicationForShard struct {
ShardID int32
NamespaceID string
SkipAfterTime time.Time
PageToken []byte
Index int
RPS int
listWorkflowsResponse struct {
Executions []commonpb.WorkflowExecution
NextPageToken []byte
}

heartbeatProgress struct {
ShardID int32
PageToken []byte
Index int
generateReplicationTasksRequest struct {
NamespaceID string
Executions []commonpb.WorkflowExecution
RPS int
}

metadataRequest struct {
Expand Down Expand Up @@ -150,81 +143,102 @@ type (

var (
historyServiceRetryPolicy = common.CreateHistoryServiceRetryPolicy()
persistenceRetryPolicy = common.CreateHistoryServiceRetryPolicy()
)

func ForceReplicationWorkflow(ctx workflow.Context, params ForceReplicationParams) error {
if len(params.Namespace) == 0 {
return errors.New("InvalidArgument: Namespace is required")
}
if len(params.RemoteCluster) == 0 {
return errors.New("InvalidArgument: RemoteCluster is required")
}
if params.ConcurrentActivityCount <= 0 {
params.ConcurrentActivityCount = 1
}
if params.RpsPerActivity <= 0 {
params.RpsPerActivity = 1
}
if params.ListWorkflowsPageSize <= 0 {
params.ListWorkflowsPageSize = defaultListWorkflowsPageSize
}
if params.PageCountPerExecution <= 0 {
params.PageCountPerExecution = defaultPageCountPerExecution
}
if params.PageCountPerExecution > maxPageCountPerExecution {
params.PageCountPerExecution = maxPageCountPerExecution
}

retryPolicy := &temporal.RetryPolicy{
InitialInterval: time.Second,
MaximumInterval: time.Second * 10,
}

// ** Step 1, Get cluster metadata **
ao := workflow.ActivityOptions{
// Get cluster metadata, we need namespace ID for history API call.
// TODO: remove this step.
lao := workflow.LocalActivityOptions{
StartToCloseTimeout: time.Second * 10,
RetryPolicy: retryPolicy,
}
ctx1 := workflow.WithActivityOptions(ctx, ao)
ctx1 := workflow.WithLocalActivityOptions(ctx, lao)
var a *activities
var metadataResp metadataResponse
metadataRequest := metadataRequest{Namespace: params.Namespace}
err := workflow.ExecuteActivity(ctx1, a.GetMetadata, metadataRequest).Get(ctx1, &metadataResp)
err := workflow.ExecuteLocalActivity(ctx1, a.GetMetadata, metadataRequest).Get(ctx1, &metadataResp)
if err != nil {
return err
}

// ** Step 2, Force replication **
ao2 := workflow.ActivityOptions{
StartToCloseTimeout: time.Hour * 10,
selector := workflow.NewSelector(ctx)
pendingActivities := 0

ao := workflow.ActivityOptions{
StartToCloseTimeout: time.Hour,
HeartbeatTimeout: time.Second * 30,
RetryPolicy: retryPolicy,
}
ctx2 := workflow.WithActivityOptions(ctx, ao2)
ctx2 := workflow.WithActivityOptions(ctx, ao)

concurrentCount := params.ConcurrentActivityCount
shardCount := metadataResp.ShardCount
skipAfter := params.SkipAfterTime
if skipAfter.IsZero() {
skipAfter = workflow.Now(ctx2)
}
var futures []workflow.Future
batchSize := (shardCount + concurrentCount - 1) / concurrentCount
for beginShardID := int32(1); beginShardID <= shardCount; beginShardID += batchSize {
endShardID := beginShardID + batchSize - 1
if endShardID > shardCount {
endShardID = shardCount
for i := 0; i < params.PageCountPerExecution; i++ {
listFuture := workflow.ExecuteLocalActivity(ctx1, a.ListWorkflows, &workflowservice.ListWorkflowExecutionsRequest{
Namespace: params.Namespace,
PageSize: int32(params.ListWorkflowsPageSize),
NextPageToken: params.NextPageToken,
Query: params.Query,
})
var listResp listWorkflowsResponse
err = listFuture.Get(ctx1, &listResp)
if err != nil {
return err
}
rangeRequest := genReplicationForShardRange{
BeginShardID: beginShardID,
EndShardID: endShardID,
NamespaceID: metadataResp.NamespaceID,
SkipAfterTime: skipAfter,
RpsPerActivity: params.RpsPerActivity,

workerFuture := workflow.ExecuteActivity(ctx2, a.GenerateReplicationTasks, &generateReplicationTasksRequest{
NamespaceID: metadataResp.NamespaceID,
Executions: listResp.Executions,
RPS: params.RpsPerActivity,
})
pendingActivities++
selector.AddFuture(workerFuture, func(f workflow.Future) {
pendingActivities--
})

if pendingActivities >= params.ConcurrentActivityCount {
selector.Select(ctx) // this will block until one of the pending activities complete
}
future := workflow.ExecuteActivity(ctx2, a.GenerateReplicationTasks, rangeRequest)
futures = append(futures, future)
}

for _, f := range futures {
if err := f.Get(ctx2, nil); err != nil {
return err
params.NextPageToken = listResp.NextPageToken
if params.NextPageToken == nil {
break
}
}
// wait until all pending activities are done
for pendingActivities > 0 {
selector.Select(ctx)
}

return nil
if params.NextPageToken == nil {
// we are all done
return nil
}

// too many pages, and we exceed PageCountPerExecution, so move on to next execution
return workflow.NewContinueAsNewError(ctx, ForceReplicationWorkflow, params)
}

func NamespaceHandoverWorkflow(ctx workflow.Context, params NamespaceHandoverParams) error {
Expand Down Expand Up @@ -350,30 +364,39 @@ func (a *activities) GetMetadata(ctx context.Context, request metadataRequest) (
}, nil
}

// GenerateReplicationTasks generates replication task for last history event for each workflow.
func (a *activities) GenerateReplicationTasks(ctx context.Context, request genReplicationForShardRange) error {
perShard := genReplicationForShard{
ShardID: request.BeginShardID,
NamespaceID: request.NamespaceID,
SkipAfterTime: request.SkipAfterTime,
RPS: request.RpsPerActivity,
func (a *activities) ListWorkflows(ctx context.Context, request *workflowservice.ListWorkflowExecutionsRequest) (*listWorkflowsResponse, error) {
resp, err := a.frontendClient.ListWorkflowExecutions(ctx, request)
if err != nil {
return nil, err
}
executions := make([]commonpb.WorkflowExecution, len(resp.Executions))
for i, e := range resp.Executions {
executions[i] = *e.Execution
}
var progress heartbeatProgress
return &listWorkflowsResponse{Executions: executions, NextPageToken: resp.NextPageToken}, nil
}

func (a *activities) GenerateReplicationTasks(ctx context.Context, request *generateReplicationTasksRequest) error {
rateLimiter := quotas.NewRateLimiter(float64(request.RPS), request.RPS)

startIndex := 0
if activity.HasHeartbeatDetails(ctx) {
if err := activity.GetHeartbeatDetails(ctx, &progress); err == nil {
perShard.ShardID = progress.ShardID
perShard.PageToken = progress.PageToken
perShard.Index = progress.Index
var finishedIndex int
if err := activity.GetHeartbeatDetails(ctx, &finishedIndex); err == nil {
startIndex = finishedIndex + 1 // start from next one
}
}
for ; perShard.ShardID <= request.EndShardID; perShard.ShardID++ {
if err := a.genReplicationTasks(ctx, perShard); err != nil {

for i := startIndex; i < len(request.Executions); i++ {
rateLimiter.Wait(ctx)
we := request.Executions[i]
err := a.generateWorkflowReplicationTask(ctx, definition.NewWorkflowKey(request.NamespaceID, we.WorkflowId, we.RunId))
if err != nil {
return err
}
// heartbeat progress only apply for first shard
perShard.PageToken = nil
perShard.Index = 0
activity.RecordHeartbeat(ctx, i)
}

return nil
}

Expand Down Expand Up @@ -533,63 +556,7 @@ func (a *activities) checkHandoverOnce(ctx context.Context, waitRequest waitHand
return readyShardCount == len(resp.Shards), nil
}

func (a *activities) genReplicationTasks(ctx context.Context, request genReplicationForShard) error {
pageToken := request.PageToken
startIndex := request.Index
rateLimiter := quotas.NewRateLimiter(float64(request.RPS), request.RPS)

for {
var listResult *persistence.ListConcreteExecutionsResponse
op := func(ctx context.Context) error {
var err error
listResult, err = a.executionManager.ListConcreteExecutions(&persistence.ListConcreteExecutionsRequest{
ShardID: request.ShardID,
PageSize: listExecutionPageSize,
PageToken: pageToken,
})
return err
}

rateLimiter.Wait(ctx)
err := backoff.RetryContext(ctx, op, persistenceRetryPolicy, common.IsPersistenceTransientError)
if err != nil {
return err
}

for i := startIndex; i < len(listResult.States); i++ {
activity.RecordHeartbeat(ctx, heartbeatProgress{
ShardID: request.ShardID,
PageToken: pageToken,
Index: i,
})

ms := listResult.States[i]
if ms.ExecutionInfo.LastUpdateTime != nil && ms.ExecutionInfo.LastUpdateTime.After(request.SkipAfterTime) {
// workflow was updated after SkipAfterTime, no need to generate replication task
continue
}
if ms.ExecutionInfo.NamespaceId != request.NamespaceID {
// skip if not target namespace
continue
}
rateLimiter.Wait(ctx)
err := a.genReplicationTaskForOneWorkflow(ctx, definition.NewWorkflowKey(request.NamespaceID, ms.ExecutionInfo.WorkflowId, ms.ExecutionState.RunId))
if err != nil {
return err
}
}

pageToken = listResult.PageToken
startIndex = 0
if pageToken == nil {
break
}
}

return nil
}

func (a *activities) genReplicationTaskForOneWorkflow(ctx context.Context, wKey definition.WorkflowKey) error {
func (a *activities) generateWorkflowReplicationTask(ctx context.Context, wKey definition.WorkflowKey) error {
// will generate replication task
op := func(ctx context.Context) error {
var err error
Expand Down
Loading

0 comments on commit b8a36d6

Please sign in to comment.