Skip to content

Commit

Permalink
Add unit test for replication task executor (cadence-workflow#6012)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll authored and timl3136 committed Jun 6, 2024
1 parent 8d18680 commit 695ffdd
Show file tree
Hide file tree
Showing 4 changed files with 341 additions and 50 deletions.
1 change: 1 addition & 0 deletions codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ ignore:
- "service/frontend/service.go"
- "service/history/constants/test_constants.go"
- "service/history/execution/mutable_state.go"
- "service/history/shard/contextTest.go"
- "service/history/workflow/errors.go"
- "service/history/service.go"
- "service/matching/service.go"
Expand Down
83 changes: 39 additions & 44 deletions service/history/replication/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,43 +147,45 @@ func (e *taskExecutorImpl) handleActivityTask(
}

err = syncActivityAction()
retryErr, ok := toRetryTaskV2Error(err)
if !ok {
return err
}
// Handle resend error
if retryErr, ok := e.convertRetryTaskV2Error(err); ok {
e.metricsClient.IncCounter(metrics.HistoryRereplicationByActivityReplicationScope, metrics.CadenceClientRequests)
stopwatch := e.metricsClient.StartTimer(metrics.HistoryRereplicationByActivityReplicationScope, metrics.CadenceClientLatency)
defer stopwatch.Stop()
e.metricsClient.IncCounter(metrics.HistoryRereplicationByActivityReplicationScope, metrics.CadenceClientRequests)
stopwatch := e.metricsClient.StartTimer(metrics.HistoryRereplicationByActivityReplicationScope, metrics.CadenceClientLatency)
defer stopwatch.Stop()

resendErr := e.historyResender.SendSingleWorkflowHistory(
retryErr.GetDomainID(),
retryErr.GetWorkflowID(),
retryErr.GetRunID(),
retryErr.StartEventID,
retryErr.StartEventVersion,
retryErr.EndEventID,
retryErr.EndEventVersion,
resendErr := e.historyResender.SendSingleWorkflowHistory(
retryErr.GetDomainID(),
retryErr.GetWorkflowID(),
retryErr.GetRunID(),
retryErr.StartEventID,
retryErr.StartEventVersion,
retryErr.EndEventID,
retryErr.EndEventVersion,
)
switch {
case resendErr == nil:
break
case resendErr == ndc.ErrSkipTask:
e.logger.Error(
"skip replication sync activity task",
tag.WorkflowDomainID(retryErr.GetDomainID()),
tag.WorkflowID(retryErr.GetWorkflowID()),
tag.WorkflowRunID(retryErr.GetRunID()),
)
switch {
case resendErr == nil:
break
case resendErr == ndc.ErrSkipTask:
e.logger.Error(
"skip replication sync activity task",
tag.WorkflowDomainID(retryErr.GetDomainID()),
tag.WorkflowID(retryErr.GetWorkflowID()),
tag.WorkflowRunID(retryErr.GetRunID()),
)
return nil
default:
e.logger.Error(
"error resend history for sync activity",
tag.WorkflowDomainID(retryErr.GetDomainID()),
tag.WorkflowID(retryErr.GetWorkflowID()),
tag.WorkflowRunID(retryErr.GetRunID()),
tag.Error(resendErr),
)
// should return the replication error, not the resending error
return err
}
return nil
default:
e.logger.Error(
"error resend history for sync activity",
tag.WorkflowDomainID(retryErr.GetDomainID()),
tag.WorkflowID(retryErr.GetWorkflowID()),
tag.WorkflowRunID(retryErr.GetRunID()),
tag.Error(resendErr),
)
// should return the replication error, not the resending error
return err
}
// should try again after back fill the history
return syncActivityAction()
Expand Down Expand Up @@ -229,7 +231,7 @@ func (e *taskExecutorImpl) handleHistoryReplicationTaskV2(
}

err = historyReplicationAction()
retryErr, ok := e.convertRetryTaskV2Error(err)
retryErr, ok := toRetryTaskV2Error(err)
if !ok {
return err
}
Expand Down Expand Up @@ -284,31 +286,24 @@ func (e *taskExecutorImpl) filterTask(
domainID string,
forceApply bool,
) (bool, error) {

if forceApply {
return true, nil
}

domainEntry, err := e.domainCache.GetDomainByID(domainID)
if err != nil {
return false, err
}

shouldProcessTask := false
FilterLoop:
for _, targetCluster := range domainEntry.GetReplicationConfig().Clusters {
if e.currentCluster == targetCluster.ClusterName {
shouldProcessTask = true
break FilterLoop
break
}
}
return shouldProcessTask, nil
}

func (e *taskExecutorImpl) convertRetryTaskV2Error(
err error,
) (*types.RetryTaskV2Error, bool) {

func toRetryTaskV2Error(err error) (*types.RetryTaskV2Error, bool) {
retError, ok := err.(*types.RetryTaskV2Error)
return retError, ok
}
Loading

0 comments on commit 695ffdd

Please sign in to comment.