Skip to content

Commit

Permalink
Continue VerifyReplicationTasks if there is any new workflow being ve…
Browse files Browse the repository at this point in the history
…rified (#4791)

<!-- Describe what has changed in this PR -->
**What changed?**


<!-- Tell your future self why have you made these changes -->
**Why?**
Currently VerifyReplicationTasks checks one workflow at a time.
Workflows are replicated by shards but force replication isn't aware of
shards. Given a batch of workflows to be replicated, if the first
workflow is the last one to be replicated, VerifyReplicationTasks will
not make progress until the first one is replicated, which can lead
VerifyReplicationTasks to to fail prematurely.


<!-- How have you verified this change? Tested locally? Added a unit
test? Checked in staging env? -->
**How did you test it?**
Unit tests. will do more cluster tests. 

<!-- Assuming the worst case, what can be broken when deploying this
change to production? -->
**Potential risks**


<!-- Is this PR a hotfix candidate or require that a notification be
sent to the broader community? (Yes/No) -->
**Is hotfix candidate?**
  • Loading branch information
hehaifengcn committed Aug 24, 2023
1 parent 5795ad2 commit 46bbbf7
Show file tree
Hide file tree
Showing 3 changed files with 255 additions and 92 deletions.
192 changes: 127 additions & 65 deletions service/worker/migration/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,28 +61,31 @@ type (
}

replicationTasksHeartbeatDetails struct {
NextIndex int
CheckPoint time.Time
LastNotFoundWorkflowExecution commonpb.WorkflowExecution
NextIndex int
CheckPoint time.Time
LastNotVerifiedWorkflowExecution commonpb.WorkflowExecution
LastVerifiedIndex int
}

verifyReplicationTasksTimeoutErr struct {
timeout time.Duration
details replicationTasksHeartbeatDetails
verifyStatus int
verifyResult struct {
status verifyStatus
reason string
}
)

const (
reasonZombieWorkflow = "Zombie workflow"
reasonWorkflowNotFound = "Workflow not found"
reasonWorkflowCloseToRetention = "Workflow close to retention"

notVerified verifyStatus = 0
verified verifyStatus = 1
skipped verifyStatus = 2
)

func (e verifyReplicationTasksTimeoutErr) Error() string {
return fmt.Sprintf("verifyReplicationTasks was not able to make progress for more than %v minutes (retryable). Not found WorkflowExecution: %v,",
e.timeout,
e.details.LastNotFoundWorkflowExecution,
)
func (r verifyResult) isVerified() bool {
return r.status == verified || r.status == skipped
}

// TODO: CallerTypePreemptablee should be set in activity background context for all migration activities.
Expand Down Expand Up @@ -540,12 +543,12 @@ func isCloseToCurrentTime(t time.Time, duration time.Duration) bool {
return true
}

func (a *activities) canSkipWorkflowExecution(
func (a *activities) checkSkipWorkflowExecution(
ctx context.Context,
request *verifyReplicationTasksRequest,
we *commonpb.WorkflowExecution,
ns *namespace.Namespace,
) (bool, string, error) {
) (verifyResult, error) {
namespaceID := request.NamespaceID
tags := []tag.Tag{tag.WorkflowNamespaceID(namespaceID), tag.WorkflowID(we.WorkflowId), tag.WorkflowRunID(we.RunId)}
resp, err := a.historyClient.DescribeMutableState(ctx, &historyservice.DescribeMutableStateRequest{
Expand All @@ -558,30 +561,98 @@ func (a *activities) canSkipWorkflowExecution(
// The outstanding workflow execution may be deleted (due to retention) on source cluster after replication tasks were generated.
// Since retention runs on both source/target clusters, such execution may also be deleted (hence not found) from target cluster.
a.forceReplicationMetricsHandler.Counter(metrics.EncounterNotFoundWorkflowCount.GetMetricName()).Record(1)
return true, reasonWorkflowNotFound, nil
return verifyResult{
status: skipped,
reason: reasonWorkflowNotFound,
}, nil
}

return false, "", err
return verifyResult{
status: notVerified,
}, err
}

// Zombie workflow should be a transient state. However, if there is Zombie workflow on the source cluster,
// it is skipped to avoid such workflow being processed on the target cluster.
if resp.GetDatabaseMutableState().GetExecutionState().GetState() == enumsspb.WORKFLOW_EXECUTION_STATE_ZOMBIE {
a.forceReplicationMetricsHandler.Counter(metrics.EncounterZombieWorkflowCount.GetMetricName()).Record(1)
a.logger.Info("createReplicationTasks skip Zombie workflow", tags...)
return true, reasonZombieWorkflow, nil
return verifyResult{
status: skipped,
reason: reasonZombieWorkflow,
}, nil
}

// Skip verifying workflow which has already passed retention time.
if closeTime := resp.GetDatabaseMutableState().GetExecutionInfo().GetCloseTime(); closeTime != nil && ns != nil && ns.Retention() > 0 {
deleteTime := closeTime.Add(ns.Retention())
if deleteTime.Before(time.Now()) {
a.forceReplicationMetricsHandler.Counter(metrics.EncounterPassRetentionWorkflowCount.GetMetricName()).Record(1)
return true, reasonWorkflowCloseToRetention, nil
return verifyResult{
status: skipped,
reason: reasonWorkflowCloseToRetention,
}, nil
}
}

return false, "", nil
return verifyResult{
status: notVerified,
}, nil
}

func (a *activities) verifySingleReplicationTask(
ctx context.Context,
request *verifyReplicationTasksRequest,
remoteClient adminservice.AdminServiceClient,
ns *namespace.Namespace,
cachedResults map[int]verifyResult,
idx int,
) (result verifyResult, rerr error) {
if r, ok := cachedResults[idx]; ok {
return r, nil
}

defer func() {
if result.isVerified() {
cachedResults[idx] = result
}
}()

we := request.Executions[idx]
s := time.Now()
// Check if execution exists on remote cluster
_, err := remoteClient.DescribeMutableState(ctx, &adminservice.DescribeMutableStateRequest{
Namespace: request.Namespace,
Execution: &we,
})
a.forceReplicationMetricsHandler.Timer(metrics.VerifyDescribeMutableStateLatency.GetMetricName()).Record(time.Since(s))

switch err.(type) {
case nil:
a.forceReplicationMetricsHandler.WithTags(metrics.NamespaceTag(request.Namespace)).Counter(metrics.VerifyReplicationTaskSuccess.GetMetricName()).Record(1)
return verifyResult{
status: verified,
}, nil

case *serviceerror.NotFound:
a.forceReplicationMetricsHandler.WithTags(metrics.NamespaceTag(request.Namespace)).Counter(metrics.VerifyReplicationTaskNotFound.GetMetricName()).Record(1)
// Calling checkSkipWorkflowExecution for every NotFound is sub-optimal as most common case to skip is workfow being deleted due to retention.
// A better solution is to only check the existence for workflow which is close to retention period.
return a.checkSkipWorkflowExecution(ctx, request, &we, ns)

case *serviceerror.NamespaceNotFound:
return verifyResult{
status: notVerified,
}, temporal.NewNonRetryableApplicationError("remoteClient.DescribeMutableState call failed", "NamespaceNotFound", err)

default:
a.forceReplicationMetricsHandler.WithTags(metrics.NamespaceTag(request.Namespace), metrics.ServiceErrorTypeTag(err)).
Counter(metrics.VerifyReplicationTaskFailed.GetMetricName()).Record(1)

return verifyResult{
status: notVerified,
}, errors.WithMessage(err, "remoteClient.DescribeMutableState call failed")
}
}

func (a *activities) verifyReplicationTasks(
Expand All @@ -590,8 +661,9 @@ func (a *activities) verifyReplicationTasks(
details *replicationTasksHeartbeatDetails,
remoteClient adminservice.AdminServiceClient,
ns *namespace.Namespace,
cachedResults map[int]verifyResult,
heartbeat func(details replicationTasksHeartbeatDetails),
) (bool, []SkippedWorkflowExecution, error) {
) (bool, error) {
start := time.Now()
progress := false
defer func() {
Expand All @@ -604,55 +676,49 @@ func (a *activities) verifyReplicationTasks(
a.forceReplicationMetricsHandler.Timer(metrics.VerifyReplicationTasksLatency.GetMetricName()).Record(time.Since(start))
}()

var skippedList []SkippedWorkflowExecution
for ; details.NextIndex < len(request.Executions); details.NextIndex++ {
we := request.Executions[details.NextIndex]
s := time.Now()
// Check if execution exists on remote cluster
_, err := remoteClient.DescribeMutableState(ctx, &adminservice.DescribeMutableStateRequest{
Namespace: request.Namespace,
Execution: &we,
})
a.forceReplicationMetricsHandler.Timer(metrics.VerifyDescribeMutableStateLatency.GetMetricName()).Record(time.Since(s))

switch err.(type) {
case nil:
a.forceReplicationMetricsHandler.WithTags(metrics.NamespaceTag(request.Namespace)).Counter(metrics.VerifyReplicationTaskSuccess.GetMetricName()).Record(1)

case *serviceerror.NotFound:
a.forceReplicationMetricsHandler.WithTags(metrics.NamespaceTag(request.Namespace)).Counter(metrics.VerifyReplicationTaskNotFound.GetMetricName()).Record(1)
// Calling canSkipWorkflowExecution for every NotFound is sub-optimal as most common case to skip is workfow being deleted due to retention.
// A better solution is to only check the existence for workflow which is close to retention period.
canSkip, reason, err := a.canSkipWorkflowExecution(ctx, request, &we, ns)
if err != nil {
return false, skippedList, err
}
r, err := a.verifySingleReplicationTask(ctx, request, remoteClient, ns, cachedResults, details.NextIndex)
if err != nil {
return false, err
}

if !canSkip {
details.LastNotFoundWorkflowExecution = we
return false, skippedList, nil
}
if !r.isVerified() {
details.LastNotVerifiedWorkflowExecution = request.Executions[details.NextIndex]
break
}

skippedList = append(skippedList, SkippedWorkflowExecution{
WorkflowExecution: we,
Reason: reason,
})
details.LastVerifiedIndex = details.NextIndex
heartbeat(*details)
progress = true
}

if details.NextIndex >= len(request.Executions) {
// Done with verification.
return true, nil
}

case *serviceerror.NamespaceNotFound:
return false, skippedList, temporal.NewNonRetryableApplicationError("remoteClient.DescribeMutableState call failed", "NamespaceNotFound", err)
// Look ahead and see if there is any new workflow being replicated on target cluster. If yes, then consider it is a progress.
// This is to avoid verifyReplicationTasks from failing due to LastNotFoundWorkflowExecution being slow.
for idx := details.NextIndex + 1; idx < len(request.Executions); idx++ {
// Cache results don't count for progress.
if _, ok := cachedResults[idx]; ok {
continue
}

default:
a.forceReplicationMetricsHandler.WithTags(metrics.NamespaceTag(request.Namespace), metrics.ServiceErrorTypeTag(err)).
Counter(metrics.VerifyReplicationTaskFailed.GetMetricName()).Record(1)
r, err := a.verifySingleReplicationTask(ctx, request, remoteClient, ns, cachedResults, idx)
if err != nil {
return false, err
}

return false, skippedList, errors.WithMessage(err, "remoteClient.DescribeMutableState call failed")
if r.isVerified() {
details.LastVerifiedIndex = idx
progress = true
}

heartbeat(*details)
progress = true
}

return true, skippedList, nil
return false, nil
}

const (
Expand Down Expand Up @@ -684,6 +750,8 @@ func (a *activities) VerifyReplicationTasks(ctx context.Context, request *verify
return response, err
}

cachedResults := make(map[int]verifyResult)

// Verify if replication tasks exist on target cluster. There are several cases where execution was not found on target cluster.
// 1. replication lag
// 2. Zombie workflow execution
Expand All @@ -700,20 +768,14 @@ func (a *activities) VerifyReplicationTasks(ctx context.Context, request *verify
// Since replication has a lag, sleep first.
time.Sleep(request.VerifyInterval)

verified, skippedList, err := a.verifyReplicationTasks(ctx, request, &details, remoteClient, nsEntry,
verified, err := a.verifyReplicationTasks(ctx, request, &details, remoteClient, nsEntry, cachedResults,
func(d replicationTasksHeartbeatDetails) {
activity.RecordHeartbeat(ctx, d)
})

if err != nil {
return response, err
}

if len(skippedList) > 0 {
response.SkippedWorkflowExecutions = append(response.SkippedWorkflowExecutions, skippedList...)
response.SkippedWorkflowCount = len(response.SkippedWorkflowExecutions)
}

if verified == true {
return response, nil
}
Expand All @@ -724,7 +786,7 @@ func (a *activities) VerifyReplicationTasks(ctx context.Context, request *verify
return response, temporal.NewNonRetryableApplicationError(
fmt.Sprintf("verifyReplicationTasks was not able to make progress for more than %v minutes (not retryable). Not found WorkflowExecution: %v, Checkpoint: %v",
diff.Minutes(),
details.LastNotFoundWorkflowExecution, details.CheckPoint),
details.LastNotVerifiedWorkflowExecution, details.CheckPoint),
"", nil)
}
}
Expand Down
Loading

0 comments on commit 46bbbf7

Please sign in to comment.