diff --git a/service/worker/migration/activities.go b/service/worker/migration/activities.go index f706e94dff8..cd960fff316 100644 --- a/service/worker/migration/activities.go +++ b/service/worker/migration/activities.go @@ -61,14 +61,16 @@ type ( } replicationTasksHeartbeatDetails struct { - NextIndex int - CheckPoint time.Time - LastNotFoundWorkflowExecution commonpb.WorkflowExecution + NextIndex int + CheckPoint time.Time + LastNotVerifiedWorkflowExecution commonpb.WorkflowExecution + LastVerifiedIndex int } - verifyReplicationTasksTimeoutErr struct { - timeout time.Duration - details replicationTasksHeartbeatDetails + verifyStatus int + verifyResult struct { + status verifyStatus + reason string } ) @@ -76,13 +78,14 @@ const ( reasonZombieWorkflow = "Zombie workflow" reasonWorkflowNotFound = "Workflow not found" reasonWorkflowCloseToRetention = "Workflow close to retention" + + notVerified verifyStatus = 0 + verified verifyStatus = 1 + skipped verifyStatus = 2 ) -func (e verifyReplicationTasksTimeoutErr) Error() string { - return fmt.Sprintf("verifyReplicationTasks was not able to make progress for more than %v minutes (retryable). Not found WorkflowExecution: %v,", - e.timeout, - e.details.LastNotFoundWorkflowExecution, - ) +func (r verifyResult) isVerified() bool { + return r.status == verified || r.status == skipped } // TODO: CallerTypePreemptablee should be set in activity background context for all migration activities. @@ -540,12 +543,12 @@ func isCloseToCurrentTime(t time.Time, duration time.Duration) bool { return true } -func (a *activities) canSkipWorkflowExecution( +func (a *activities) checkSkipWorkflowExecution( ctx context.Context, request *verifyReplicationTasksRequest, we *commonpb.WorkflowExecution, ns *namespace.Namespace, -) (bool, string, error) { +) (verifyResult, error) { namespaceID := request.NamespaceID tags := []tag.Tag{tag.WorkflowNamespaceID(namespaceID), tag.WorkflowID(we.WorkflowId), tag.WorkflowRunID(we.RunId)} resp, err := a.historyClient.DescribeMutableState(ctx, &historyservice.DescribeMutableStateRequest{ @@ -558,10 +561,15 @@ func (a *activities) canSkipWorkflowExecution( // The outstanding workflow execution may be deleted (due to retention) on source cluster after replication tasks were generated. // Since retention runs on both source/target clusters, such execution may also be deleted (hence not found) from target cluster. a.forceReplicationMetricsHandler.Counter(metrics.EncounterNotFoundWorkflowCount.GetMetricName()).Record(1) - return true, reasonWorkflowNotFound, nil + return verifyResult{ + status: skipped, + reason: reasonWorkflowNotFound, + }, nil } - return false, "", err + return verifyResult{ + status: notVerified, + }, err } // Zombie workflow should be a transient state. However, if there is Zombie workflow on the source cluster, @@ -569,7 +577,10 @@ func (a *activities) canSkipWorkflowExecution( if resp.GetDatabaseMutableState().GetExecutionState().GetState() == enumsspb.WORKFLOW_EXECUTION_STATE_ZOMBIE { a.forceReplicationMetricsHandler.Counter(metrics.EncounterZombieWorkflowCount.GetMetricName()).Record(1) a.logger.Info("createReplicationTasks skip Zombie workflow", tags...) - return true, reasonZombieWorkflow, nil + return verifyResult{ + status: skipped, + reason: reasonZombieWorkflow, + }, nil } // Skip verifying workflow which has already passed retention time. @@ -577,11 +588,71 @@ func (a *activities) canSkipWorkflowExecution( deleteTime := closeTime.Add(ns.Retention()) if deleteTime.Before(time.Now()) { a.forceReplicationMetricsHandler.Counter(metrics.EncounterPassRetentionWorkflowCount.GetMetricName()).Record(1) - return true, reasonWorkflowCloseToRetention, nil + return verifyResult{ + status: skipped, + reason: reasonWorkflowCloseToRetention, + }, nil } } - return false, "", nil + return verifyResult{ + status: notVerified, + }, nil +} + +func (a *activities) verifySingleReplicationTask( + ctx context.Context, + request *verifyReplicationTasksRequest, + remoteClient adminservice.AdminServiceClient, + ns *namespace.Namespace, + cachedResults map[int]verifyResult, + idx int, +) (result verifyResult, rerr error) { + if r, ok := cachedResults[idx]; ok { + return r, nil + } + + defer func() { + if result.isVerified() { + cachedResults[idx] = result + } + }() + + we := request.Executions[idx] + s := time.Now() + // Check if execution exists on remote cluster + _, err := remoteClient.DescribeMutableState(ctx, &adminservice.DescribeMutableStateRequest{ + Namespace: request.Namespace, + Execution: &we, + }) + a.forceReplicationMetricsHandler.Timer(metrics.VerifyDescribeMutableStateLatency.GetMetricName()).Record(time.Since(s)) + + switch err.(type) { + case nil: + a.forceReplicationMetricsHandler.WithTags(metrics.NamespaceTag(request.Namespace)).Counter(metrics.VerifyReplicationTaskSuccess.GetMetricName()).Record(1) + return verifyResult{ + status: verified, + }, nil + + case *serviceerror.NotFound: + a.forceReplicationMetricsHandler.WithTags(metrics.NamespaceTag(request.Namespace)).Counter(metrics.VerifyReplicationTaskNotFound.GetMetricName()).Record(1) + // Calling checkSkipWorkflowExecution for every NotFound is sub-optimal as most common case to skip is workfow being deleted due to retention. + // A better solution is to only check the existence for workflow which is close to retention period. + return a.checkSkipWorkflowExecution(ctx, request, &we, ns) + + case *serviceerror.NamespaceNotFound: + return verifyResult{ + status: notVerified, + }, temporal.NewNonRetryableApplicationError("remoteClient.DescribeMutableState call failed", "NamespaceNotFound", err) + + default: + a.forceReplicationMetricsHandler.WithTags(metrics.NamespaceTag(request.Namespace), metrics.ServiceErrorTypeTag(err)). + Counter(metrics.VerifyReplicationTaskFailed.GetMetricName()).Record(1) + + return verifyResult{ + status: notVerified, + }, errors.WithMessage(err, "remoteClient.DescribeMutableState call failed") + } } func (a *activities) verifyReplicationTasks( @@ -590,8 +661,9 @@ func (a *activities) verifyReplicationTasks( details *replicationTasksHeartbeatDetails, remoteClient adminservice.AdminServiceClient, ns *namespace.Namespace, + cachedResults map[int]verifyResult, heartbeat func(details replicationTasksHeartbeatDetails), -) (bool, []SkippedWorkflowExecution, error) { +) (bool, error) { start := time.Now() progress := false defer func() { @@ -604,55 +676,49 @@ func (a *activities) verifyReplicationTasks( a.forceReplicationMetricsHandler.Timer(metrics.VerifyReplicationTasksLatency.GetMetricName()).Record(time.Since(start)) }() - var skippedList []SkippedWorkflowExecution for ; details.NextIndex < len(request.Executions); details.NextIndex++ { - we := request.Executions[details.NextIndex] - s := time.Now() - // Check if execution exists on remote cluster - _, err := remoteClient.DescribeMutableState(ctx, &adminservice.DescribeMutableStateRequest{ - Namespace: request.Namespace, - Execution: &we, - }) - a.forceReplicationMetricsHandler.Timer(metrics.VerifyDescribeMutableStateLatency.GetMetricName()).Record(time.Since(s)) - - switch err.(type) { - case nil: - a.forceReplicationMetricsHandler.WithTags(metrics.NamespaceTag(request.Namespace)).Counter(metrics.VerifyReplicationTaskSuccess.GetMetricName()).Record(1) - - case *serviceerror.NotFound: - a.forceReplicationMetricsHandler.WithTags(metrics.NamespaceTag(request.Namespace)).Counter(metrics.VerifyReplicationTaskNotFound.GetMetricName()).Record(1) - // Calling canSkipWorkflowExecution for every NotFound is sub-optimal as most common case to skip is workfow being deleted due to retention. - // A better solution is to only check the existence for workflow which is close to retention period. - canSkip, reason, err := a.canSkipWorkflowExecution(ctx, request, &we, ns) - if err != nil { - return false, skippedList, err - } + r, err := a.verifySingleReplicationTask(ctx, request, remoteClient, ns, cachedResults, details.NextIndex) + if err != nil { + return false, err + } - if !canSkip { - details.LastNotFoundWorkflowExecution = we - return false, skippedList, nil - } + if !r.isVerified() { + details.LastNotVerifiedWorkflowExecution = request.Executions[details.NextIndex] + break + } - skippedList = append(skippedList, SkippedWorkflowExecution{ - WorkflowExecution: we, - Reason: reason, - }) + details.LastVerifiedIndex = details.NextIndex + heartbeat(*details) + progress = true + } + + if details.NextIndex >= len(request.Executions) { + // Done with verification. + return true, nil + } - case *serviceerror.NamespaceNotFound: - return false, skippedList, temporal.NewNonRetryableApplicationError("remoteClient.DescribeMutableState call failed", "NamespaceNotFound", err) + // Look ahead and see if there is any new workflow being replicated on target cluster. If yes, then consider it is a progress. + // This is to avoid verifyReplicationTasks from failing due to LastNotFoundWorkflowExecution being slow. + for idx := details.NextIndex + 1; idx < len(request.Executions); idx++ { + // Cache results don't count for progress. + if _, ok := cachedResults[idx]; ok { + continue + } - default: - a.forceReplicationMetricsHandler.WithTags(metrics.NamespaceTag(request.Namespace), metrics.ServiceErrorTypeTag(err)). - Counter(metrics.VerifyReplicationTaskFailed.GetMetricName()).Record(1) + r, err := a.verifySingleReplicationTask(ctx, request, remoteClient, ns, cachedResults, idx) + if err != nil { + return false, err + } - return false, skippedList, errors.WithMessage(err, "remoteClient.DescribeMutableState call failed") + if r.isVerified() { + details.LastVerifiedIndex = idx + progress = true } heartbeat(*details) - progress = true } - return true, skippedList, nil + return false, nil } const ( @@ -684,6 +750,8 @@ func (a *activities) VerifyReplicationTasks(ctx context.Context, request *verify return response, err } + cachedResults := make(map[int]verifyResult) + // Verify if replication tasks exist on target cluster. There are several cases where execution was not found on target cluster. // 1. replication lag // 2. Zombie workflow execution @@ -700,20 +768,14 @@ func (a *activities) VerifyReplicationTasks(ctx context.Context, request *verify // Since replication has a lag, sleep first. time.Sleep(request.VerifyInterval) - verified, skippedList, err := a.verifyReplicationTasks(ctx, request, &details, remoteClient, nsEntry, + verified, err := a.verifyReplicationTasks(ctx, request, &details, remoteClient, nsEntry, cachedResults, func(d replicationTasksHeartbeatDetails) { activity.RecordHeartbeat(ctx, d) }) - if err != nil { return response, err } - if len(skippedList) > 0 { - response.SkippedWorkflowExecutions = append(response.SkippedWorkflowExecutions, skippedList...) - response.SkippedWorkflowCount = len(response.SkippedWorkflowExecutions) - } - if verified == true { return response, nil } @@ -724,7 +786,7 @@ func (a *activities) VerifyReplicationTasks(ctx context.Context, request *verify return response, temporal.NewNonRetryableApplicationError( fmt.Sprintf("verifyReplicationTasks was not able to make progress for more than %v minutes (not retryable). Not found WorkflowExecution: %v, Checkpoint: %v", diff.Minutes(), - details.LastNotFoundWorkflowExecution, details.CheckPoint), + details.LastNotVerifiedWorkflowExecution, details.CheckPoint), "", nil) } } diff --git a/service/worker/migration/activities_test.go b/service/worker/migration/activities_test.go index 15bc8929c67..d13834c0513 100644 --- a/service/worker/migration/activities_test.go +++ b/service/worker/migration/activities_test.go @@ -205,16 +205,13 @@ func (s *activitiesSuite) TestVerifyReplicationTasks_Success() { Execution: &execution2, }).Return(&completeState, nil).Times(2) - r, err := env.ExecuteActivity(s.a.VerifyReplicationTasks, &request) + _, err := env.ExecuteActivity(s.a.VerifyReplicationTasks, &request) s.NoError(err) - var response verifyReplicationTasksResponse - s.NoError(r.Get(&response)) - s.Empty(response.SkippedWorkflowExecutions) s.Greater(len(iceptor.replicationRecordedHeartbeats), 0) lastHeartBeat := iceptor.replicationRecordedHeartbeats[len(iceptor.replicationRecordedHeartbeats)-1] s.Equal(len(request.Executions), lastHeartBeat.NextIndex) - s.Equal(execution2, lastHeartBeat.LastNotFoundWorkflowExecution) + s.Equal(execution2, lastHeartBeat.LastNotVerifiedWorkflowExecution) } func (s *activitiesSuite) TestVerifyReplicationTasks_SkipWorkflowExecution() { @@ -254,7 +251,6 @@ func (s *activitiesSuite) TestVerifyReplicationTasks_SkipWorkflowExecution() { for _, t := range testcases { env, iceptor := s.initEnv() - // Call DescribeMutableState for enough times to trigger trySkipWorkflowExecution s.mockRemoteAdminClient.EXPECT().DescribeMutableState(gomock.Any(), &adminservice.DescribeMutableStateRequest{ Namespace: mockedNamespace, Execution: &execution1, @@ -265,23 +261,18 @@ func (s *activitiesSuite) TestVerifyReplicationTasks_SkipWorkflowExecution() { Execution: &execution1, }).Return(t.resp, t.err).Times(1) - r, err := env.ExecuteActivity(s.a.VerifyReplicationTasks, &request) + _, err := env.ExecuteActivity(s.a.VerifyReplicationTasks, &request) s.Greater(len(iceptor.replicationRecordedHeartbeats), 0) lastHeartBeat := iceptor.replicationRecordedHeartbeats[len(iceptor.replicationRecordedHeartbeats)-1] if t.expectedErr == nil { s.NoError(err) s.Equal(len(request.Executions), lastHeartBeat.NextIndex) - - var response verifyReplicationTasksResponse - s.NoError(r.Get(&response)) - s.Equal(request.Executions[0], response.SkippedWorkflowExecutions[0].WorkflowExecution) - s.Equal(t.expectedReason, response.SkippedWorkflowExecutions[0].Reason) } else { s.ErrorContains(err, "mock error") s.Equal(0, lastHeartBeat.NextIndex) } - s.Equal(emptyExecutions, lastHeartBeat.LastNotFoundWorkflowExecution) + s.Equal(emptyExecutions, lastHeartBeat.LastNotVerifiedWorkflowExecution) s.True(lastHeartBeat.CheckPoint.After(start)) } } @@ -319,7 +310,7 @@ func (s *activitiesSuite) TestVerifyReplicationTasks_FailedNotFound() { s.Greater(len(iceptor.replicationRecordedHeartbeats), 0) lastHeartBeat := iceptor.replicationRecordedHeartbeats[len(iceptor.replicationRecordedHeartbeats)-1] s.Equal(0, lastHeartBeat.NextIndex) - s.Equal(execution1, lastHeartBeat.LastNotFoundWorkflowExecution) + s.Equal(execution1, lastHeartBeat.LastNotVerifiedWorkflowExecution) } func (s *activitiesSuite) TestVerifyReplicationTasks_AlreadyVerified() { @@ -343,6 +334,50 @@ func (s *activitiesSuite) TestVerifyReplicationTasks_AlreadyVerified() { s.Equal(len(iceptor.replicationRecordedHeartbeats), 1) } +func (s *activitiesSuite) Test_verifySingleReplicationTask() { + request := verifyReplicationTasksRequest{ + Namespace: mockedNamespace, + NamespaceID: mockedNamespaceID, + TargetClusterEndpoint: remoteRpcAddress, + Executions: []commonpb.WorkflowExecution{execution1, execution2}, + } + ctx := context.TODO() + + cachedResults := make(map[int]verifyResult) + + mockRemoteAdminClient := adminservicemock.NewMockAdminServiceClient(s.controller) + mockRemoteAdminClient.EXPECT().DescribeMutableState(gomock.Any(), &adminservice.DescribeMutableStateRequest{ + Namespace: mockedNamespace, + Execution: &execution1, + }).Return(&adminservice.DescribeMutableStateResponse{}, nil).Times(1) + result, err := s.a.verifySingleReplicationTask(ctx, &request, mockRemoteAdminClient, &testNamespace, cachedResults, 0) + s.NoError(err) + s.True(result.isVerified()) + s.Equal(result, cachedResults[0]) + + // Second call should hit cache therefore no mock is needed. + result, err = s.a.verifySingleReplicationTask(ctx, &request, mockRemoteAdminClient, &testNamespace, cachedResults, 0) + s.NoError(err) + s.True(result.isVerified()) + + // Test not verified workflow + mockRemoteAdminClient.EXPECT().DescribeMutableState(gomock.Any(), &adminservice.DescribeMutableStateRequest{ + Namespace: mockedNamespace, + Execution: &execution2, + }).Return(&adminservice.DescribeMutableStateResponse{}, serviceerror.NewNotFound("")).Times(1) + + s.mockHistoryClient.EXPECT().DescribeMutableState(gomock.Any(), &historyservice.DescribeMutableStateRequest{ + NamespaceId: mockedNamespaceID, + Execution: &execution2, + }).Return(&completeState, nil).AnyTimes() + + result, err = s.a.verifySingleReplicationTask(ctx, &request, mockRemoteAdminClient, &testNamespace, cachedResults, 1) + s.NoError(err) + s.False(result.isVerified()) + _, ok := cachedResults[1] + s.False(ok) +} + type executionState int const ( @@ -403,7 +438,8 @@ func (s *activitiesSuite) Test_verifyReplicationTasks() { nextIndex int expectedVerified bool expectedErr error - expectedIndex int + expectedNextIndex int + expectedVerifiedIndex int }{ { expectedVerified: true, @@ -414,21 +450,32 @@ func (s *activitiesSuite) Test_verifyReplicationTasks() { nextIndex: 0, expectedVerified: true, expectedErr: nil, - expectedIndex: 4, + expectedNextIndex: 4, + expectedVerifiedIndex: 3, }, { remoteExecutionStates: []executionState{executionFound, executionFound, executionFound, executionFound}, nextIndex: 2, expectedVerified: true, expectedErr: nil, - expectedIndex: 4, + expectedNextIndex: 4, + expectedVerifiedIndex: 3, }, { remoteExecutionStates: []executionState{executionFound, executionFound, executionNotfound}, nextIndex: 0, expectedVerified: false, expectedErr: nil, - expectedIndex: 2, + expectedNextIndex: 2, + expectedVerifiedIndex: 1, + }, + { + remoteExecutionStates: []executionState{executionFound, executionFound, executionNotfound, executionFound, executionNotfound}, + nextIndex: 0, + expectedVerified: false, + expectedErr: nil, + expectedNextIndex: 2, + expectedVerifiedIndex: 3, }, } @@ -437,28 +484,84 @@ func (s *activitiesSuite) Test_verifyReplicationTasks() { Execution: &execution1, }).Return(&completeState, nil).AnyTimes() + checkPointTime := time.Now() for _, tc := range tests { var recorder mockHeartBeatRecorder mockRemoteAdminClient := adminservicemock.NewMockAdminServiceClient(s.controller) request.Executions = createExecutions(mockRemoteAdminClient, tc.remoteExecutionStates, tc.nextIndex) details := replicationTasksHeartbeatDetails{ - NextIndex: tc.nextIndex, + NextIndex: tc.nextIndex, + CheckPoint: checkPointTime, } - verified, _, err := s.a.verifyReplicationTasks(ctx, &request, &details, mockRemoteAdminClient, &testNamespace, recorder.hearbeat) + cachedResults := make(map[int]verifyResult) + verified, err := s.a.verifyReplicationTasks(ctx, &request, &details, mockRemoteAdminClient, &testNamespace, cachedResults, recorder.hearbeat) if tc.expectedErr == nil { s.NoError(err) } s.Equal(tc.expectedVerified, verified) - s.Equal(tc.expectedIndex, details.NextIndex) + s.Equal(tc.expectedNextIndex, details.NextIndex) + s.Equal(tc.expectedVerifiedIndex, details.LastVerifiedIndex) s.GreaterOrEqual(len(tc.remoteExecutionStates), details.NextIndex) s.Equal(recorder.lastHeartBeat, details) if details.NextIndex < len(tc.remoteExecutionStates) && tc.remoteExecutionStates[details.NextIndex] == executionNotfound { - s.Equal(execution1, details.LastNotFoundWorkflowExecution) + s.Equal(execution1, details.LastNotVerifiedWorkflowExecution) + } + + if len(request.Executions) > 0 { + // Except for empty Executions, all should set new CheckPoint to indicate making progress. + s.True(checkPointTime.Before(details.CheckPoint)) } } } +func (s *activitiesSuite) Test_verifyReplicationTasksNoProgress() { + var recorder mockHeartBeatRecorder + mockRemoteAdminClient := adminservicemock.NewMockAdminServiceClient(s.controller) + + request := verifyReplicationTasksRequest{ + Namespace: mockedNamespace, + NamespaceID: mockedNamespaceID, + TargetClusterEndpoint: remoteRpcAddress, + Executions: createExecutions(mockRemoteAdminClient, []executionState{executionFound, executionFound, executionNotfound, executionFound}, 0), + } + + s.mockHistoryClient.EXPECT().DescribeMutableState(gomock.Any(), &historyservice.DescribeMutableStateRequest{ + NamespaceId: mockedNamespaceID, + Execution: &execution1, + }).Return(&completeState, nil).AnyTimes() + + checkPointTime := time.Now() + details := replicationTasksHeartbeatDetails{ + NextIndex: 0, + CheckPoint: checkPointTime, + } + + ctx := context.TODO() + cachedResults := make(map[int]verifyResult) + verified, err := s.a.verifyReplicationTasks(ctx, &request, &details, mockRemoteAdminClient, &testNamespace, cachedResults, recorder.hearbeat) + s.NoError(err) + s.False(verified) + // Verify has made progress. + s.True(checkPointTime.Before(details.CheckPoint)) + s.Equal(3, details.LastVerifiedIndex) + + prevCheckPoint := details.CheckPoint + + // Mock for one more NotFound call + mockRemoteAdminClient.EXPECT().DescribeMutableState(gomock.Any(), &adminservice.DescribeMutableStateRequest{ + Namespace: mockedNamespace, + Execution: &execution1, + }).Return(nil, serviceerror.NewNotFound("")).Times(1) + + // All results should be either NotFound or cached and no progress should be made. + verified, err = s.a.verifyReplicationTasks(ctx, &request, &details, mockRemoteAdminClient, &testNamespace, cachedResults, recorder.hearbeat) + s.NoError(err) + s.False(verified) + s.Equal(prevCheckPoint, details.CheckPoint) + s.Equal(3, details.LastVerifiedIndex) +} + func (s *activitiesSuite) Test_verifyReplicationTasksSkipRetention() { bias := time.Minute request := verifyReplicationTasksRequest{ @@ -521,7 +624,8 @@ func (s *activitiesSuite) Test_verifyReplicationTasksSkipRetention() { details := replicationTasksHeartbeatDetails{} ctx := context.TODO() - verified, _, err := s.a.verifyReplicationTasks(ctx, &request, &details, mockRemoteAdminClient, ns, recorder.hearbeat) + cachedResults := make(map[int]verifyResult) + verified, err := s.a.verifyReplicationTasks(ctx, &request, &details, mockRemoteAdminClient, ns, cachedResults, recorder.hearbeat) s.NoError(err) s.Equal(tc.verified, verified) s.Equal(recorder.lastHeartBeat, details) diff --git a/service/worker/migration/force_replication_workflow.go b/service/worker/migration/force_replication_workflow.go index 1aeab7aed8c..fbf9efaa31c 100644 --- a/service/worker/migration/force_replication_workflow.go +++ b/service/worker/migration/force_replication_workflow.go @@ -112,10 +112,7 @@ type ( Executions []commonpb.WorkflowExecution } - verifyReplicationTasksResponse struct { - SkippedWorkflowExecutions []SkippedWorkflowExecution - SkippedWorkflowCount int - } + verifyReplicationTasksResponse struct{} metadataRequest struct { Namespace string