Skip to content

Commit

Permalink
Extract workflow execution delete code to workflow.DeleteManager (#2388)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexshtin authored Jan 22, 2022
1 parent 65e4958 commit 43b4531
Show file tree
Hide file tree
Showing 24 changed files with 935 additions and 360 deletions.
3 changes: 3 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,8 @@ const (
HistoryQueryWorkflowScope
// HistoryProcessDeleteHistoryEventScope tracks ProcessDeleteHistoryEvent processing calls
HistoryProcessDeleteHistoryEventScope
// HistoryDeleteWorkflowExecutionScope tracks DeleteWorkflowExecutions API calls
HistoryDeleteWorkflowExecutionScope
// WorkflowCompletionStatsScope tracks workflow completion updates
WorkflowCompletionStatsScope
// ArchiverClientScope is scope used by all metrics emitted by archiver.Client
Expand Down Expand Up @@ -1586,6 +1588,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
HistoryResetWorkflowExecutionScope: {operation: "ResetWorkflowExecution"},
HistoryQueryWorkflowScope: {operation: "QueryWorkflow"},
HistoryProcessDeleteHistoryEventScope: {operation: "ProcessDeleteHistoryEvent"},
HistoryDeleteWorkflowExecutionScope: {operation: "DeleteWorkflowExecution"},
HistoryScheduleWorkflowTaskScope: {operation: "ScheduleWorkflowTask"},
HistoryRecordChildExecutionCompletedScope: {operation: "RecordChildExecutionCompleted"},
HistoryRequestCancelWorkflowExecutionScope: {operation: "RequestCancelWorkflowExecution"},
Expand Down
2 changes: 2 additions & 0 deletions service/history/consts/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ var (
ErrActivityTaskNotCancelRequested = serviceerror.NewInvalidArgument("unable to mark activity as canceled without activity being request canceled first")
// ErrWorkflowCompleted is the error to indicate workflow execution already completed
ErrWorkflowCompleted = serviceerror.NewNotFound("workflow execution already completed")
// ErrWorkflowNotCompleted is the error to indicate workflow execution is not completed.
ErrWorkflowNotCompleted = serviceerror.NewNotFound("workflow execution is not completed")
// ErrWorkflowExecutionNotFound is the error to indicate workflow execution does not exist
ErrWorkflowExecutionNotFound = serviceerror.NewNotFound("workflow execution not found")
// ErrWorkflowParent is the error to parent execution is given and mismatch
Expand Down
61 changes: 35 additions & 26 deletions service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ type (
rawMatchingClient matchingservice.MatchingServiceClient
replicationDLQHandler replicationDLQHandler
searchAttributesValidator *searchattribute.Validator
searchAttributesMapper searchattribute.Mapper
workflowDeleteManager workflow.DeleteManager
}
)

Expand All @@ -145,33 +145,44 @@ func NewEngineWithShardContext(
logger := shard.GetLogger()
executionManager := shard.GetExecutionManager()
historyCache := newCacheFn(shard)

archivalClient := archiver.NewClient(
shard.GetMetricsClient(),
logger,
publicClient,
shard.GetConfig().NumArchiveSystemWorkflows,
shard.GetConfig().ArchiveRequestRPS,
archiverProvider,
)

workflowDeleteManager := workflow.NewDeleteManager(
shard,
historyCache,
config,
archivalClient,
)

historyEngImpl := &historyEngineImpl{
status: common.DaemonStatusInitialized,
currentClusterName: currentClusterName,
shard: shard,
clusterMetadata: shard.GetClusterMetadata(),
timeSource: shard.GetTimeSource(),
executionManager: executionManager,
tokenSerializer: common.NewProtoTaskTokenSerializer(),
historyCache: historyCache,
logger: log.With(logger, tag.ComponentHistoryEngine),
throttledLogger: log.With(shard.GetThrottledLogger(), tag.ComponentHistoryEngine),
metricsClient: shard.GetMetricsClient(),
eventNotifier: eventNotifier,
config: config,
archivalClient: archiver.NewClient(
shard.GetMetricsClient(),
logger,
publicClient,
shard.GetConfig().NumArchiveSystemWorkflows,
shard.GetConfig().ArchiveRequestRPS,
archiverProvider,
),
status: common.DaemonStatusInitialized,
currentClusterName: currentClusterName,
shard: shard,
clusterMetadata: shard.GetClusterMetadata(),
timeSource: shard.GetTimeSource(),
executionManager: executionManager,
tokenSerializer: common.NewProtoTaskTokenSerializer(),
historyCache: historyCache,
logger: log.With(logger, tag.ComponentHistoryEngine),
throttledLogger: log.With(shard.GetThrottledLogger(), tag.ComponentHistoryEngine),
metricsClient: shard.GetMetricsClient(),
eventNotifier: eventNotifier,
config: config,
archivalClient: archivalClient,
publicClient: publicClient,
matchingClient: matchingClient,
rawMatchingClient: rawMatchingClient,
replicationTaskProcessors: make(map[string]ReplicationTaskProcessor),
replicationTaskFetchers: replicationTaskFetchers,
workflowDeleteManager: workflowDeleteManager,
}

historyEngImpl.txProcessor = newTransferQueueProcessor(shard, historyEngImpl,
Expand Down Expand Up @@ -217,8 +228,6 @@ func NewEngineWithShardContext(
config.SearchAttributesTotalSizeLimit,
)

historyEngImpl.searchAttributesMapper = shard.GetSearchAttributesMapper()

historyEngImpl.workflowTaskHandler = newWorkflowTaskHandlerCallback(historyEngImpl)
historyEngImpl.replicationDLQHandler = newLazyReplicationDLQHandler(shard)

Expand Down Expand Up @@ -514,7 +523,7 @@ func (e *historyEngineImpl) StartWorkflowExecution(
return nil, err
}

err = searchattribute.SubstituteAliases(e.searchAttributesMapper, request.GetSearchAttributes(), namespace.String())
err = searchattribute.SubstituteAliases(e.shard.GetSearchAttributesMapper(), request.GetSearchAttributes(), namespace.String())
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2012,7 +2021,7 @@ func (e *historyEngineImpl) SignalWithStartWorkflowExecution(
return nil, err
}

err = searchattribute.SubstituteAliases(e.searchAttributesMapper, request.GetSearchAttributes(), namespace.String())
err = searchattribute.SubstituteAliases(e.shard.GetSearchAttributesMapper(), request.GetSearchAttributes(), namespace.String())
if err != nil {
return nil, err
}
Expand Down
29 changes: 14 additions & 15 deletions service/history/historyEngine2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,21 +144,20 @@ func (s *engine2Suite) SetupTest() {

historyCache := workflow.NewCache(s.mockShard)
h := &historyEngineImpl{
currentClusterName: s.mockShard.GetClusterMetadata().GetCurrentClusterName(),
shard: s.mockShard,
clusterMetadata: s.mockClusterMetadata,
executionManager: s.mockExecutionMgr,
historyCache: historyCache,
logger: s.logger,
throttledLogger: s.logger,
metricsClient: metrics.NewNoopMetricsClient(),
tokenSerializer: common.NewProtoTaskTokenSerializer(),
config: s.config,
timeSource: s.mockShard.GetTimeSource(),
eventNotifier: events.NewNotifier(clock.NewRealTimeSource(), metrics.NewNoopMetricsClient(), func(namespace.ID, string) int32 { return 1 }),
txProcessor: s.mockTxProcessor,
timerProcessor: s.mockTimerProcessor,
searchAttributesMapper: s.mockShard.Resource.SearchAttributesMapper,
currentClusterName: s.mockShard.GetClusterMetadata().GetCurrentClusterName(),
shard: s.mockShard,
clusterMetadata: s.mockClusterMetadata,
executionManager: s.mockExecutionMgr,
historyCache: historyCache,
logger: s.logger,
throttledLogger: s.logger,
metricsClient: metrics.NewNoopMetricsClient(),
tokenSerializer: common.NewProtoTaskTokenSerializer(),
config: s.config,
timeSource: s.mockShard.GetTimeSource(),
eventNotifier: events.NewNotifier(clock.NewRealTimeSource(), metrics.NewNoopMetricsClient(), func(namespace.ID, string) int32 { return 1 }),
txProcessor: s.mockTxProcessor,
timerProcessor: s.mockTimerProcessor,
searchAttributesValidator: searchattribute.NewValidator(
searchattribute.NewTestProvider(),
s.mockShard.Resource.SearchAttributesMapper,
Expand Down
4 changes: 2 additions & 2 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ func (s *ContextImpl) AppendHistoryEvents(
func (s *ContextImpl) DeleteWorkflowExecution(
key definition.WorkflowKey,
branchToken []byte,
version int64,
newTaskVersion int64,
) error {
if err := s.errorByState(); err != nil {
return err
Expand Down Expand Up @@ -855,7 +855,7 @@ func (s *ContextImpl) DeleteWorkflowExecution(
// TaskID is set by addTasksLocked
WorkflowKey: key,
VisibilityTimestamp: s.timeSource.Now(),
Version: version,
Version: newTaskVersion,
}},
}
err = s.addTasksLocked(addTasksRequest, namespaceEntry)
Expand Down
1 change: 1 addition & 0 deletions service/history/shard/context_testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func NewTestContext(
persistenceShardManager: resource.GetShardManager(),
clientBean: resource.GetClientBean(),
saProvider: resource.GetSearchAttributesProvider(),
saMapper: resource.GetSearchAttributesMapper(),
historyClient: resource.GetHistoryClient(),
archivalMetadata: resource.GetArchivalMetadata(),
}
Expand Down
1 change: 1 addition & 0 deletions service/history/shard/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (

commonpb "go.temporal.io/api/common/v1"
historypb "go.temporal.io/api/history/v1"

"go.temporal.io/server/api/historyservice/v1"
replicationspb "go.temporal.io/server/api/replication/v1"
"go.temporal.io/server/common"
Expand Down
6 changes: 4 additions & 2 deletions service/history/timerQueueActiveProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ func newTimerQueueActiveProcessor(
}
processor.taskExecutor = newTimerQueueActiveTaskExecutor(
shard,
historyService,
historyService.workflowDeleteManager,
historyService.historyCache,
processor,
logger,
historyService.metricsClient,
Expand Down Expand Up @@ -189,7 +190,8 @@ func newTimerQueueFailoverProcessor(
}
processor.taskExecutor = newTimerQueueActiveTaskExecutor(
shard,
historyService,
historyService.workflowDeleteManager,
historyService.historyCache,
processor,
logger,
historyService.metricsClient,
Expand Down
6 changes: 4 additions & 2 deletions service/history/timerQueueActiveTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ type (

func newTimerQueueActiveTaskExecutor(
shard shard.Context,
historyService *historyEngineImpl,
workflowDeleteManager workflow.DeleteManager,
cache workflow.Cache,
queueProcessor *timerQueueActiveProcessorImpl,
logger log.Logger,
metricsClient metrics.Client,
Expand All @@ -72,7 +73,8 @@ func newTimerQueueActiveTaskExecutor(
return &timerQueueActiveTaskExecutor{
timerQueueTaskExecutorBase: newTimerQueueTaskExecutorBase(
shard,
historyService,
workflowDeleteManager,
cache,
logger,
metricsClient,
config,
Expand Down
28 changes: 16 additions & 12 deletions service/history/timerQueueActiveTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type (
mockClusterMetadata *cluster.MockMetadata

mockHistoryEngine *historyEngineImpl
mockDeleteManager *workflow.MockDeleteManager
mockExecutionMgr *persistence.MockExecutionManager

logger log.Logger
Expand Down Expand Up @@ -147,25 +148,28 @@ func (s *timerQueueActiveTaskExecutorSuite) SetupTest() {
s.logger = s.mockShard.GetLogger()

historyCache := workflow.NewCache(s.mockShard)
s.mockDeleteManager = workflow.NewMockDeleteManager(s.controller)
h := &historyEngineImpl{
currentClusterName: s.mockShard.Resource.GetClusterMetadata().GetCurrentClusterName(),
shard: s.mockShard,
clusterMetadata: s.mockClusterMetadata,
executionManager: s.mockExecutionMgr,
historyCache: historyCache,
logger: s.logger,
tokenSerializer: common.NewProtoTaskTokenSerializer(),
metricsClient: s.mockShard.GetMetricsClient(),
eventNotifier: events.NewNotifier(clock.NewRealTimeSource(), metrics.NewNoopMetricsClient(), func(namespace.ID, string) int32 { return 1 }),
txProcessor: s.mockTxProcessor,
timerProcessor: s.mockTimerProcessor,
currentClusterName: s.mockShard.Resource.GetClusterMetadata().GetCurrentClusterName(),
shard: s.mockShard,
clusterMetadata: s.mockClusterMetadata,
executionManager: s.mockExecutionMgr,
historyCache: historyCache,
logger: s.logger,
tokenSerializer: common.NewProtoTaskTokenSerializer(),
metricsClient: s.mockShard.GetMetricsClient(),
eventNotifier: events.NewNotifier(clock.NewRealTimeSource(), metrics.NewNoopMetricsClient(), func(namespace.ID, string) int32 { return 1 }),
txProcessor: s.mockTxProcessor,
timerProcessor: s.mockTimerProcessor,
workflowDeleteManager: s.mockDeleteManager,
}
s.mockShard.SetEngineForTesting(h)
s.mockHistoryEngine = h

s.timerQueueActiveTaskExecutor = newTimerQueueActiveTaskExecutor(
s.mockShard,
h,
s.mockDeleteManager,
historyCache,
newTimerQueueActiveProcessor(
s.mockShard,
h,
Expand Down
3 changes: 2 additions & 1 deletion service/history/timerQueueStandbyProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ func newTimerQueueStandbyProcessor(
timerGate: timerGate,
taskExecutor: newTimerQueueStandbyTaskExecutor(
shard,
historyService,
historyService.workflowDeleteManager,
historyService.historyCache,
nDCHistoryResender,
logger,
historyService.metricsClient,
Expand Down
6 changes: 4 additions & 2 deletions service/history/timerQueueStandbyTaskExecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ type (

func newTimerQueueStandbyTaskExecutor(
shard shard.Context,
historyService *historyEngineImpl,
deleteManager workflow.DeleteManager,
cache workflow.Cache,
nDCHistoryResender xdc.NDCHistoryResender,
logger log.Logger,
metricsClient metrics.Client,
Expand All @@ -71,7 +72,8 @@ func newTimerQueueStandbyTaskExecutor(
return &timerQueueStandbyTaskExecutor{
timerQueueTaskExecutorBase: newTimerQueueTaskExecutorBase(
shard,
historyService,
deleteManager,
cache,
logger,
metricsClient,
config,
Expand Down
28 changes: 16 additions & 12 deletions service/history/timerQueueStandbyTaskExecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type (
mockClusterMetadata *cluster.MockMetadata
mockAdminClient *adminservicemock.MockAdminServiceClient
mockNDCHistoryResender *xdc.MockNDCHistoryResender
mockDeleteManager *workflow.MockDeleteManager

logger log.Logger
namespaceID namespace.ID
Expand Down Expand Up @@ -162,26 +163,29 @@ func (s *timerQueueStandbyTaskExecutorSuite) SetupTest() {
s.logger = s.mockShard.GetLogger()

historyCache := workflow.NewCache(s.mockShard)
s.mockDeleteManager = workflow.NewMockDeleteManager(s.controller)
h := &historyEngineImpl{
currentClusterName: s.mockShard.Resource.GetClusterMetadata().GetCurrentClusterName(),
shard: s.mockShard,
clusterMetadata: s.mockClusterMetadata,
executionManager: s.mockExecutionMgr,
historyCache: historyCache,
logger: s.logger,
tokenSerializer: common.NewProtoTaskTokenSerializer(),
metricsClient: s.mockShard.GetMetricsClient(),
eventNotifier: events.NewNotifier(s.timeSource, metrics.NewNoopMetricsClient(), func(namespace.ID, string) int32 { return 1 }),
txProcessor: s.mockTxProcessor,
timerProcessor: s.mockTimerProcessor,
currentClusterName: s.mockShard.Resource.GetClusterMetadata().GetCurrentClusterName(),
shard: s.mockShard,
clusterMetadata: s.mockClusterMetadata,
executionManager: s.mockExecutionMgr,
historyCache: historyCache,
logger: s.logger,
tokenSerializer: common.NewProtoTaskTokenSerializer(),
metricsClient: s.mockShard.GetMetricsClient(),
eventNotifier: events.NewNotifier(s.timeSource, metrics.NewNoopMetricsClient(), func(namespace.ID, string) int32 { return 1 }),
txProcessor: s.mockTxProcessor,
timerProcessor: s.mockTimerProcessor,
workflowDeleteManager: s.mockDeleteManager,
}
s.mockShard.SetEngineForTesting(h)
s.mockBean = client.NewMockBean(s.controller)
s.mockBean.EXPECT().GetRemoteAdminClient("standby").Return(s.mockAdminClient)

s.timerQueueStandbyTaskExecutor = newTimerQueueStandbyTaskExecutor(
s.mockShard,
h,
s.mockDeleteManager,
historyCache,
s.mockNDCHistoryResender,
s.logger,
s.mockShard.GetMetricsClient(),
Expand Down
Loading

0 comments on commit 43b4531

Please sign in to comment.