From 46bbbf74317769eb911f455749e00c1d4fa402de Mon Sep 17 00:00:00 2001 From: Haifeng He Date: Thu, 24 Aug 2023 09:06:19 -0700 Subject: [PATCH] Continue VerifyReplicationTasks if there is any new workflow being verified (#4791) **What changed?** **Why?** Currently VerifyReplicationTasks checks one workflow at a time. Workflows are replicated by shards but force replication isn't aware of shards. Given a batch of workflows to be replicated, if the first workflow is the last one to be replicated, VerifyReplicationTasks will not make progress until the first one is replicated, which can lead VerifyReplicationTasks to to fail prematurely. **How did you test it?** Unit tests. will do more cluster tests. **Potential risks** **Is hotfix candidate?** --- service/worker/migration/activities.go | 192 ++++++++++++------ service/worker/migration/activities_test.go | 150 +++++++++++--- .../migration/force_replication_workflow.go | 5 +- 3 files changed, 255 insertions(+), 92 deletions(-) diff --git a/service/worker/migration/activities.go b/service/worker/migration/activities.go index f706e94dff8..cd960fff316 100644 --- a/service/worker/migration/activities.go +++ b/service/worker/migration/activities.go @@ -61,14 +61,16 @@ type ( } replicationTasksHeartbeatDetails struct { - NextIndex int - CheckPoint time.Time - LastNotFoundWorkflowExecution commonpb.WorkflowExecution + NextIndex int + CheckPoint time.Time + LastNotVerifiedWorkflowExecution commonpb.WorkflowExecution + LastVerifiedIndex int } - verifyReplicationTasksTimeoutErr struct { - timeout time.Duration - details replicationTasksHeartbeatDetails + verifyStatus int + verifyResult struct { + status verifyStatus + reason string } ) @@ -76,13 +78,14 @@ const ( reasonZombieWorkflow = "Zombie workflow" reasonWorkflowNotFound = "Workflow not found" reasonWorkflowCloseToRetention = "Workflow close to retention" + + notVerified verifyStatus = 0 + verified verifyStatus = 1 + skipped verifyStatus = 2 ) -func (e verifyReplicationTasksTimeoutErr) Error() string { - return fmt.Sprintf("verifyReplicationTasks was not able to make progress for more than %v minutes (retryable). Not found WorkflowExecution: %v,", - e.timeout, - e.details.LastNotFoundWorkflowExecution, - ) +func (r verifyResult) isVerified() bool { + return r.status == verified || r.status == skipped } // TODO: CallerTypePreemptablee should be set in activity background context for all migration activities. @@ -540,12 +543,12 @@ func isCloseToCurrentTime(t time.Time, duration time.Duration) bool { return true } -func (a *activities) canSkipWorkflowExecution( +func (a *activities) checkSkipWorkflowExecution( ctx context.Context, request *verifyReplicationTasksRequest, we *commonpb.WorkflowExecution, ns *namespace.Namespace, -) (bool, string, error) { +) (verifyResult, error) { namespaceID := request.NamespaceID tags := []tag.Tag{tag.WorkflowNamespaceID(namespaceID), tag.WorkflowID(we.WorkflowId), tag.WorkflowRunID(we.RunId)} resp, err := a.historyClient.DescribeMutableState(ctx, &historyservice.DescribeMutableStateRequest{ @@ -558,10 +561,15 @@ func (a *activities) canSkipWorkflowExecution( // The outstanding workflow execution may be deleted (due to retention) on source cluster after replication tasks were generated. // Since retention runs on both source/target clusters, such execution may also be deleted (hence not found) from target cluster. a.forceReplicationMetricsHandler.Counter(metrics.EncounterNotFoundWorkflowCount.GetMetricName()).Record(1) - return true, reasonWorkflowNotFound, nil + return verifyResult{ + status: skipped, + reason: reasonWorkflowNotFound, + }, nil } - return false, "", err + return verifyResult{ + status: notVerified, + }, err } // Zombie workflow should be a transient state. However, if there is Zombie workflow on the source cluster, @@ -569,7 +577,10 @@ func (a *activities) canSkipWorkflowExecution( if resp.GetDatabaseMutableState().GetExecutionState().GetState() == enumsspb.WORKFLOW_EXECUTION_STATE_ZOMBIE { a.forceReplicationMetricsHandler.Counter(metrics.EncounterZombieWorkflowCount.GetMetricName()).Record(1) a.logger.Info("createReplicationTasks skip Zombie workflow", tags...) - return true, reasonZombieWorkflow, nil + return verifyResult{ + status: skipped, + reason: reasonZombieWorkflow, + }, nil } // Skip verifying workflow which has already passed retention time. @@ -577,11 +588,71 @@ func (a *activities) canSkipWorkflowExecution( deleteTime := closeTime.Add(ns.Retention()) if deleteTime.Before(time.Now()) { a.forceReplicationMetricsHandler.Counter(metrics.EncounterPassRetentionWorkflowCount.GetMetricName()).Record(1) - return true, reasonWorkflowCloseToRetention, nil + return verifyResult{ + status: skipped, + reason: reasonWorkflowCloseToRetention, + }, nil } } - return false, "", nil + return verifyResult{ + status: notVerified, + }, nil +} + +func (a *activities) verifySingleReplicationTask( + ctx context.Context, + request *verifyReplicationTasksRequest, + remoteClient adminservice.AdminServiceClient, + ns *namespace.Namespace, + cachedResults map[int]verifyResult, + idx int, +) (result verifyResult, rerr error) { + if r, ok := cachedResults[idx]; ok { + return r, nil + } + + defer func() { + if result.isVerified() { + cachedResults[idx] = result + } + }() + + we := request.Executions[idx] + s := time.Now() + // Check if execution exists on remote cluster + _, err := remoteClient.DescribeMutableState(ctx, &adminservice.DescribeMutableStateRequest{ + Namespace: request.Namespace, + Execution: &we, + }) + a.forceReplicationMetricsHandler.Timer(metrics.VerifyDescribeMutableStateLatency.GetMetricName()).Record(time.Since(s)) + + switch err.(type) { + case nil: + a.forceReplicationMetricsHandler.WithTags(metrics.NamespaceTag(request.Namespace)).Counter(metrics.VerifyReplicationTaskSuccess.GetMetricName()).Record(1) + return verifyResult{ + status: verified, + }, nil + + case *serviceerror.NotFound: + a.forceReplicationMetricsHandler.WithTags(metrics.NamespaceTag(request.Namespace)).Counter(metrics.VerifyReplicationTaskNotFound.GetMetricName()).Record(1) + // Calling checkSkipWorkflowExecution for every NotFound is sub-optimal as most common case to skip is workfow being deleted due to retention. + // A better solution is to only check the existence for workflow which is close to retention period. + return a.checkSkipWorkflowExecution(ctx, request, &we, ns) + + case *serviceerror.NamespaceNotFound: + return verifyResult{ + status: notVerified, + }, temporal.NewNonRetryableApplicationError("remoteClient.DescribeMutableState call failed", "NamespaceNotFound", err) + + default: + a.forceReplicationMetricsHandler.WithTags(metrics.NamespaceTag(request.Namespace), metrics.ServiceErrorTypeTag(err)). + Counter(metrics.VerifyReplicationTaskFailed.GetMetricName()).Record(1) + + return verifyResult{ + status: notVerified, + }, errors.WithMessage(err, "remoteClient.DescribeMutableState call failed") + } } func (a *activities) verifyReplicationTasks( @@ -590,8 +661,9 @@ func (a *activities) verifyReplicationTasks( details *replicationTasksHeartbeatDetails, remoteClient adminservice.AdminServiceClient, ns *namespace.Namespace, + cachedResults map[int]verifyResult, heartbeat func(details replicationTasksHeartbeatDetails), -) (bool, []SkippedWorkflowExecution, error) { +) (bool, error) { start := time.Now() progress := false defer func() { @@ -604,55 +676,49 @@ func (a *activities) verifyReplicationTasks( a.forceReplicationMetricsHandler.Timer(metrics.VerifyReplicationTasksLatency.GetMetricName()).Record(time.Since(start)) }() - var skippedList []SkippedWorkflowExecution for ; details.NextIndex < len(request.Executions); details.NextIndex++ { - we := request.Executions[details.NextIndex] - s := time.Now() - // Check if execution exists on remote cluster - _, err := remoteClient.DescribeMutableState(ctx, &adminservice.DescribeMutableStateRequest{ - Namespace: request.Namespace, - Execution: &we, - }) - a.forceReplicationMetricsHandler.Timer(metrics.VerifyDescribeMutableStateLatency.GetMetricName()).Record(time.Since(s)) - - switch err.(type) { - case nil: - a.forceReplicationMetricsHandler.WithTags(metrics.NamespaceTag(request.Namespace)).Counter(metrics.VerifyReplicationTaskSuccess.GetMetricName()).Record(1) - - case *serviceerror.NotFound: - a.forceReplicationMetricsHandler.WithTags(metrics.NamespaceTag(request.Namespace)).Counter(metrics.VerifyReplicationTaskNotFound.GetMetricName()).Record(1) - // Calling canSkipWorkflowExecution for every NotFound is sub-optimal as most common case to skip is workfow being deleted due to retention. - // A better solution is to only check the existence for workflow which is close to retention period. - canSkip, reason, err := a.canSkipWorkflowExecution(ctx, request, &we, ns) - if err != nil { - return false, skippedList, err - } + r, err := a.verifySingleReplicationTask(ctx, request, remoteClient, ns, cachedResults, details.NextIndex) + if err != nil { + return false, err + } - if !canSkip { - details.LastNotFoundWorkflowExecution = we - return false, skippedList, nil - } + if !r.isVerified() { + details.LastNotVerifiedWorkflowExecution = request.Executions[details.NextIndex] + break + } - skippedList = append(skippedList, SkippedWorkflowExecution{ - WorkflowExecution: we, - Reason: reason, - }) + details.LastVerifiedIndex = details.NextIndex + heartbeat(*details) + progress = true + } + + if details.NextIndex >= len(request.Executions) { + // Done with verification. + return true, nil + } - case *serviceerror.NamespaceNotFound: - return false, skippedList, temporal.NewNonRetryableApplicationError("remoteClient.DescribeMutableState call failed", "NamespaceNotFound", err) + // Look ahead and see if there is any new workflow being replicated on target cluster. If yes, then consider it is a progress. + // This is to avoid verifyReplicationTasks from failing due to LastNotFoundWorkflowExecution being slow. + for idx := details.NextIndex + 1; idx < len(request.Executions); idx++ { + // Cache results don't count for progress. + if _, ok := cachedResults[idx]; ok { + continue + } - default: - a.forceReplicationMetricsHandler.WithTags(metrics.NamespaceTag(request.Namespace), metrics.ServiceErrorTypeTag(err)). - Counter(metrics.VerifyReplicationTaskFailed.GetMetricName()).Record(1) + r, err := a.verifySingleReplicationTask(ctx, request, remoteClient, ns, cachedResults, idx) + if err != nil { + return false, err + } - return false, skippedList, errors.WithMessage(err, "remoteClient.DescribeMutableState call failed") + if r.isVerified() { + details.LastVerifiedIndex = idx + progress = true } heartbeat(*details) - progress = true } - return true, skippedList, nil + return false, nil } const ( @@ -684,6 +750,8 @@ func (a *activities) VerifyReplicationTasks(ctx context.Context, request *verify return response, err } + cachedResults := make(map[int]verifyResult) + // Verify if replication tasks exist on target cluster. There are several cases where execution was not found on target cluster. // 1. replication lag // 2. Zombie workflow execution @@ -700,20 +768,14 @@ func (a *activities) VerifyReplicationTasks(ctx context.Context, request *verify // Since replication has a lag, sleep first. time.Sleep(request.VerifyInterval) - verified, skippedList, err := a.verifyReplicationTasks(ctx, request, &details, remoteClient, nsEntry, + verified, err := a.verifyReplicationTasks(ctx, request, &details, remoteClient, nsEntry, cachedResults, func(d replicationTasksHeartbeatDetails) { activity.RecordHeartbeat(ctx, d) }) - if err != nil { return response, err } - if len(skippedList) > 0 { - response.SkippedWorkflowExecutions = append(response.SkippedWorkflowExecutions, skippedList...) - response.SkippedWorkflowCount = len(response.SkippedWorkflowExecutions) - } - if verified == true { return response, nil } @@ -724,7 +786,7 @@ func (a *activities) VerifyReplicationTasks(ctx context.Context, request *verify return response, temporal.NewNonRetryableApplicationError( fmt.Sprintf("verifyReplicationTasks was not able to make progress for more than %v minutes (not retryable). Not found WorkflowExecution: %v, Checkpoint: %v", diff.Minutes(), - details.LastNotFoundWorkflowExecution, details.CheckPoint), + details.LastNotVerifiedWorkflowExecution, details.CheckPoint), "", nil) } } diff --git a/service/worker/migration/activities_test.go b/service/worker/migration/activities_test.go index 15bc8929c67..d13834c0513 100644 --- a/service/worker/migration/activities_test.go +++ b/service/worker/migration/activities_test.go @@ -205,16 +205,13 @@ func (s *activitiesSuite) TestVerifyReplicationTasks_Success() { Execution: &execution2, }).Return(&completeState, nil).Times(2) - r, err := env.ExecuteActivity(s.a.VerifyReplicationTasks, &request) + _, err := env.ExecuteActivity(s.a.VerifyReplicationTasks, &request) s.NoError(err) - var response verifyReplicationTasksResponse - s.NoError(r.Get(&response)) - s.Empty(response.SkippedWorkflowExecutions) s.Greater(len(iceptor.replicationRecordedHeartbeats), 0) lastHeartBeat := iceptor.replicationRecordedHeartbeats[len(iceptor.replicationRecordedHeartbeats)-1] s.Equal(len(request.Executions), lastHeartBeat.NextIndex) - s.Equal(execution2, lastHeartBeat.LastNotFoundWorkflowExecution) + s.Equal(execution2, lastHeartBeat.LastNotVerifiedWorkflowExecution) } func (s *activitiesSuite) TestVerifyReplicationTasks_SkipWorkflowExecution() { @@ -254,7 +251,6 @@ func (s *activitiesSuite) TestVerifyReplicationTasks_SkipWorkflowExecution() { for _, t := range testcases { env, iceptor := s.initEnv() - // Call DescribeMutableState for enough times to trigger trySkipWorkflowExecution s.mockRemoteAdminClient.EXPECT().DescribeMutableState(gomock.Any(), &adminservice.DescribeMutableStateRequest{ Namespace: mockedNamespace, Execution: &execution1, @@ -265,23 +261,18 @@ func (s *activitiesSuite) TestVerifyReplicationTasks_SkipWorkflowExecution() { Execution: &execution1, }).Return(t.resp, t.err).Times(1) - r, err := env.ExecuteActivity(s.a.VerifyReplicationTasks, &request) + _, err := env.ExecuteActivity(s.a.VerifyReplicationTasks, &request) s.Greater(len(iceptor.replicationRecordedHeartbeats), 0) lastHeartBeat := iceptor.replicationRecordedHeartbeats[len(iceptor.replicationRecordedHeartbeats)-1] if t.expectedErr == nil { s.NoError(err) s.Equal(len(request.Executions), lastHeartBeat.NextIndex) - - var response verifyReplicationTasksResponse - s.NoError(r.Get(&response)) - s.Equal(request.Executions[0], response.SkippedWorkflowExecutions[0].WorkflowExecution) - s.Equal(t.expectedReason, response.SkippedWorkflowExecutions[0].Reason) } else { s.ErrorContains(err, "mock error") s.Equal(0, lastHeartBeat.NextIndex) } - s.Equal(emptyExecutions, lastHeartBeat.LastNotFoundWorkflowExecution) + s.Equal(emptyExecutions, lastHeartBeat.LastNotVerifiedWorkflowExecution) s.True(lastHeartBeat.CheckPoint.After(start)) } } @@ -319,7 +310,7 @@ func (s *activitiesSuite) TestVerifyReplicationTasks_FailedNotFound() { s.Greater(len(iceptor.replicationRecordedHeartbeats), 0) lastHeartBeat := iceptor.replicationRecordedHeartbeats[len(iceptor.replicationRecordedHeartbeats)-1] s.Equal(0, lastHeartBeat.NextIndex) - s.Equal(execution1, lastHeartBeat.LastNotFoundWorkflowExecution) + s.Equal(execution1, lastHeartBeat.LastNotVerifiedWorkflowExecution) } func (s *activitiesSuite) TestVerifyReplicationTasks_AlreadyVerified() { @@ -343,6 +334,50 @@ func (s *activitiesSuite) TestVerifyReplicationTasks_AlreadyVerified() { s.Equal(len(iceptor.replicationRecordedHeartbeats), 1) } +func (s *activitiesSuite) Test_verifySingleReplicationTask() { + request := verifyReplicationTasksRequest{ + Namespace: mockedNamespace, + NamespaceID: mockedNamespaceID, + TargetClusterEndpoint: remoteRpcAddress, + Executions: []commonpb.WorkflowExecution{execution1, execution2}, + } + ctx := context.TODO() + + cachedResults := make(map[int]verifyResult) + + mockRemoteAdminClient := adminservicemock.NewMockAdminServiceClient(s.controller) + mockRemoteAdminClient.EXPECT().DescribeMutableState(gomock.Any(), &adminservice.DescribeMutableStateRequest{ + Namespace: mockedNamespace, + Execution: &execution1, + }).Return(&adminservice.DescribeMutableStateResponse{}, nil).Times(1) + result, err := s.a.verifySingleReplicationTask(ctx, &request, mockRemoteAdminClient, &testNamespace, cachedResults, 0) + s.NoError(err) + s.True(result.isVerified()) + s.Equal(result, cachedResults[0]) + + // Second call should hit cache therefore no mock is needed. + result, err = s.a.verifySingleReplicationTask(ctx, &request, mockRemoteAdminClient, &testNamespace, cachedResults, 0) + s.NoError(err) + s.True(result.isVerified()) + + // Test not verified workflow + mockRemoteAdminClient.EXPECT().DescribeMutableState(gomock.Any(), &adminservice.DescribeMutableStateRequest{ + Namespace: mockedNamespace, + Execution: &execution2, + }).Return(&adminservice.DescribeMutableStateResponse{}, serviceerror.NewNotFound("")).Times(1) + + s.mockHistoryClient.EXPECT().DescribeMutableState(gomock.Any(), &historyservice.DescribeMutableStateRequest{ + NamespaceId: mockedNamespaceID, + Execution: &execution2, + }).Return(&completeState, nil).AnyTimes() + + result, err = s.a.verifySingleReplicationTask(ctx, &request, mockRemoteAdminClient, &testNamespace, cachedResults, 1) + s.NoError(err) + s.False(result.isVerified()) + _, ok := cachedResults[1] + s.False(ok) +} + type executionState int const ( @@ -403,7 +438,8 @@ func (s *activitiesSuite) Test_verifyReplicationTasks() { nextIndex int expectedVerified bool expectedErr error - expectedIndex int + expectedNextIndex int + expectedVerifiedIndex int }{ { expectedVerified: true, @@ -414,21 +450,32 @@ func (s *activitiesSuite) Test_verifyReplicationTasks() { nextIndex: 0, expectedVerified: true, expectedErr: nil, - expectedIndex: 4, + expectedNextIndex: 4, + expectedVerifiedIndex: 3, }, { remoteExecutionStates: []executionState{executionFound, executionFound, executionFound, executionFound}, nextIndex: 2, expectedVerified: true, expectedErr: nil, - expectedIndex: 4, + expectedNextIndex: 4, + expectedVerifiedIndex: 3, }, { remoteExecutionStates: []executionState{executionFound, executionFound, executionNotfound}, nextIndex: 0, expectedVerified: false, expectedErr: nil, - expectedIndex: 2, + expectedNextIndex: 2, + expectedVerifiedIndex: 1, + }, + { + remoteExecutionStates: []executionState{executionFound, executionFound, executionNotfound, executionFound, executionNotfound}, + nextIndex: 0, + expectedVerified: false, + expectedErr: nil, + expectedNextIndex: 2, + expectedVerifiedIndex: 3, }, } @@ -437,28 +484,84 @@ func (s *activitiesSuite) Test_verifyReplicationTasks() { Execution: &execution1, }).Return(&completeState, nil).AnyTimes() + checkPointTime := time.Now() for _, tc := range tests { var recorder mockHeartBeatRecorder mockRemoteAdminClient := adminservicemock.NewMockAdminServiceClient(s.controller) request.Executions = createExecutions(mockRemoteAdminClient, tc.remoteExecutionStates, tc.nextIndex) details := replicationTasksHeartbeatDetails{ - NextIndex: tc.nextIndex, + NextIndex: tc.nextIndex, + CheckPoint: checkPointTime, } - verified, _, err := s.a.verifyReplicationTasks(ctx, &request, &details, mockRemoteAdminClient, &testNamespace, recorder.hearbeat) + cachedResults := make(map[int]verifyResult) + verified, err := s.a.verifyReplicationTasks(ctx, &request, &details, mockRemoteAdminClient, &testNamespace, cachedResults, recorder.hearbeat) if tc.expectedErr == nil { s.NoError(err) } s.Equal(tc.expectedVerified, verified) - s.Equal(tc.expectedIndex, details.NextIndex) + s.Equal(tc.expectedNextIndex, details.NextIndex) + s.Equal(tc.expectedVerifiedIndex, details.LastVerifiedIndex) s.GreaterOrEqual(len(tc.remoteExecutionStates), details.NextIndex) s.Equal(recorder.lastHeartBeat, details) if details.NextIndex < len(tc.remoteExecutionStates) && tc.remoteExecutionStates[details.NextIndex] == executionNotfound { - s.Equal(execution1, details.LastNotFoundWorkflowExecution) + s.Equal(execution1, details.LastNotVerifiedWorkflowExecution) + } + + if len(request.Executions) > 0 { + // Except for empty Executions, all should set new CheckPoint to indicate making progress. + s.True(checkPointTime.Before(details.CheckPoint)) } } } +func (s *activitiesSuite) Test_verifyReplicationTasksNoProgress() { + var recorder mockHeartBeatRecorder + mockRemoteAdminClient := adminservicemock.NewMockAdminServiceClient(s.controller) + + request := verifyReplicationTasksRequest{ + Namespace: mockedNamespace, + NamespaceID: mockedNamespaceID, + TargetClusterEndpoint: remoteRpcAddress, + Executions: createExecutions(mockRemoteAdminClient, []executionState{executionFound, executionFound, executionNotfound, executionFound}, 0), + } + + s.mockHistoryClient.EXPECT().DescribeMutableState(gomock.Any(), &historyservice.DescribeMutableStateRequest{ + NamespaceId: mockedNamespaceID, + Execution: &execution1, + }).Return(&completeState, nil).AnyTimes() + + checkPointTime := time.Now() + details := replicationTasksHeartbeatDetails{ + NextIndex: 0, + CheckPoint: checkPointTime, + } + + ctx := context.TODO() + cachedResults := make(map[int]verifyResult) + verified, err := s.a.verifyReplicationTasks(ctx, &request, &details, mockRemoteAdminClient, &testNamespace, cachedResults, recorder.hearbeat) + s.NoError(err) + s.False(verified) + // Verify has made progress. + s.True(checkPointTime.Before(details.CheckPoint)) + s.Equal(3, details.LastVerifiedIndex) + + prevCheckPoint := details.CheckPoint + + // Mock for one more NotFound call + mockRemoteAdminClient.EXPECT().DescribeMutableState(gomock.Any(), &adminservice.DescribeMutableStateRequest{ + Namespace: mockedNamespace, + Execution: &execution1, + }).Return(nil, serviceerror.NewNotFound("")).Times(1) + + // All results should be either NotFound or cached and no progress should be made. + verified, err = s.a.verifyReplicationTasks(ctx, &request, &details, mockRemoteAdminClient, &testNamespace, cachedResults, recorder.hearbeat) + s.NoError(err) + s.False(verified) + s.Equal(prevCheckPoint, details.CheckPoint) + s.Equal(3, details.LastVerifiedIndex) +} + func (s *activitiesSuite) Test_verifyReplicationTasksSkipRetention() { bias := time.Minute request := verifyReplicationTasksRequest{ @@ -521,7 +624,8 @@ func (s *activitiesSuite) Test_verifyReplicationTasksSkipRetention() { details := replicationTasksHeartbeatDetails{} ctx := context.TODO() - verified, _, err := s.a.verifyReplicationTasks(ctx, &request, &details, mockRemoteAdminClient, ns, recorder.hearbeat) + cachedResults := make(map[int]verifyResult) + verified, err := s.a.verifyReplicationTasks(ctx, &request, &details, mockRemoteAdminClient, ns, cachedResults, recorder.hearbeat) s.NoError(err) s.Equal(tc.verified, verified) s.Equal(recorder.lastHeartBeat, details) diff --git a/service/worker/migration/force_replication_workflow.go b/service/worker/migration/force_replication_workflow.go index 1aeab7aed8c..fbf9efaa31c 100644 --- a/service/worker/migration/force_replication_workflow.go +++ b/service/worker/migration/force_replication_workflow.go @@ -112,10 +112,7 @@ type ( Executions []commonpb.WorkflowExecution } - verifyReplicationTasksResponse struct { - SkippedWorkflowExecutions []SkippedWorkflowExecution - SkippedWorkflowCount int - } + verifyReplicationTasksResponse struct{} metadataRequest struct { Namespace string