From 44267026fc494eb3e686377fdecf3395cadd1d03 Mon Sep 17 00:00:00 2001 From: Yichao Yang Date: Mon, 15 Aug 2022 23:08:10 -0700 Subject: [PATCH] Fix failover queue shutdown (#3232) --- service/history/historyEngineInterfaces.go | 4 --- .../history/historyEngineInterfaces_mock.go | 35 ------------------- ...ueueProcessor.go => queueProcessorBase.go} | 34 +++++++++--------- service/history/timerQueueProcessorBase.go | 10 +++--- 4 files changed, 24 insertions(+), 59 deletions(-) rename service/history/{queueProcessor.go => queueProcessorBase.go} (92%) diff --git a/service/history/historyEngineInterfaces.go b/service/history/historyEngineInterfaces.go index f5efdb0a027..ae27c89f4fe 100644 --- a/service/history/historyEngineInterfaces.go +++ b/service/history/historyEngineInterfaces.go @@ -70,10 +70,6 @@ type ( queueShutdown() error } - timerProcessor interface { - notifyNewTimers(timerTask []tasks.Task) - } - timerQueueAckMgr interface { getFinishedChan() <-chan struct{} readTimerTasks() ([]queues.Executable, *time.Time, bool, error) diff --git a/service/history/historyEngineInterfaces_mock.go b/service/history/historyEngineInterfaces_mock.go index 6602bf705a8..4c79869b135 100644 --- a/service/history/historyEngineInterfaces_mock.go +++ b/service/history/historyEngineInterfaces_mock.go @@ -349,41 +349,6 @@ func (mr *MockprocessorMockRecorder) updateAckLevel(taskID interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "updateAckLevel", reflect.TypeOf((*Mockprocessor)(nil).updateAckLevel), taskID) } -// MocktimerProcessor is a mock of timerProcessor interface. -type MocktimerProcessor struct { - ctrl *gomock.Controller - recorder *MocktimerProcessorMockRecorder -} - -// MocktimerProcessorMockRecorder is the mock recorder for MocktimerProcessor. -type MocktimerProcessorMockRecorder struct { - mock *MocktimerProcessor -} - -// NewMocktimerProcessor creates a new mock instance. -func NewMocktimerProcessor(ctrl *gomock.Controller) *MocktimerProcessor { - mock := &MocktimerProcessor{ctrl: ctrl} - mock.recorder = &MocktimerProcessorMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MocktimerProcessor) EXPECT() *MocktimerProcessorMockRecorder { - return m.recorder -} - -// notifyNewTimers mocks base method. -func (m *MocktimerProcessor) notifyNewTimers(timerTask []tasks.Task) { - m.ctrl.T.Helper() - m.ctrl.Call(m, "notifyNewTimers", timerTask) -} - -// notifyNewTimers indicates an expected call of notifyNewTimers. -func (mr *MocktimerProcessorMockRecorder) notifyNewTimers(timerTask interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "notifyNewTimers", reflect.TypeOf((*MocktimerProcessor)(nil).notifyNewTimers), timerTask) -} - // MocktimerQueueAckMgr is a mock of timerQueueAckMgr interface. type MocktimerQueueAckMgr struct { ctrl *gomock.Controller diff --git a/service/history/queueProcessor.go b/service/history/queueProcessorBase.go similarity index 92% rename from service/history/queueProcessor.go rename to service/history/queueProcessorBase.go index 649f12c36ae..3de81d506f3 100644 --- a/service/history/queueProcessor.go +++ b/service/history/queueProcessorBase.go @@ -61,7 +61,7 @@ type ( shard shard.Context timeSource clock.TimeSource options *QueueProcessorOptions - processor processor + queueProcessor common.Daemon logger log.Logger metricsScope metrics.Scope rateLimiter quotas.RateLimiter // Read rate limiter @@ -89,7 +89,7 @@ func newQueueProcessorBase( clusterName string, shard shard.Context, options *QueueProcessorOptions, - processor processor, + queueProcessor common.Daemon, queueAckMgr queueAckMgr, historyCache workflow.Cache, scheduler queues.Scheduler, @@ -100,19 +100,19 @@ func newQueueProcessorBase( ) *queueProcessorBase { p := &queueProcessorBase{ - clusterName: clusterName, - shard: shard, - timeSource: shard.GetTimeSource(), - options: options, - processor: processor, - rateLimiter: rateLimiter, - status: common.DaemonStatusInitialized, - notifyCh: make(chan struct{}, 1), - shutdownCh: make(chan struct{}), - logger: logger, + clusterName: clusterName, + shard: shard, + timeSource: shard.GetTimeSource(), + options: options, + queueProcessor: queueProcessor, + rateLimiter: rateLimiter, + status: common.DaemonStatusInitialized, + notifyCh: make(chan struct{}, 1), + shutdownCh: make(chan struct{}), + logger: logger, metricsScope: metricsScope, - ackMgr: queueAckMgr, - lastPollTime: time.Time{}, + ackMgr: queueAckMgr, + lastPollTime: time.Time{}, readTaskRetrier: backoff.NewRetrier( common.CreateReadTaskRetryPolicy(), backoff.SystemClock, @@ -186,7 +186,8 @@ processorPumpLoop: break processorPumpLoop case <-p.ackMgr.getFinishedChan(): // use a separate gorouting since the caller hold the shutdownWG - go p.Stop() + // stop the entire queue processor, not just processor base. + go p.queueProcessor.Stop() case <-p.notifyCh: p.processBatch() case <-pollTimer.C: @@ -204,7 +205,8 @@ processorPumpLoop: )) if err := p.ackMgr.updateQueueAckLevel(); shard.IsShardOwnershipLostError(err) { // shard is no longer owned by this instance, bail out - go p.Stop() + // stop the entire queue processor, not just processor base. + go p.queueProcessor.Stop() break processorPumpLoop } } diff --git a/service/history/timerQueueProcessorBase.go b/service/history/timerQueueProcessorBase.go index fa099999359..e90dc7ae285 100644 --- a/service/history/timerQueueProcessorBase.go +++ b/service/history/timerQueueProcessorBase.go @@ -67,7 +67,7 @@ type ( logger log.Logger metricsClient metrics.Client metricsScope metrics.Scope - timerProcessor timerProcessor + timerProcessor common.Daemon timerQueueAckMgr timerQueueAckMgr timerGate timer.Gate timeSource clock.TimeSource @@ -88,7 +88,7 @@ func newTimerQueueProcessorBase( scope int, shard shard.Context, workflowCache workflow.Cache, - timerProcessor timerProcessor, + timerProcessor common.Daemon, timerQueueAckMgr timerQueueAckMgr, timerGate timer.Gate, scheduler queues.Scheduler, @@ -245,7 +245,8 @@ func (t *timerQueueProcessorBase) internalProcessor() error { // timer queue ack manager indicate that all task scanned // are finished and no more tasks // use a separate goroutine since the caller hold the shutdownWG - go t.Stop() + // stop the entire timer queue processor, not just processor base. + go t.timerProcessor.Stop() return nil case <-t.timerGate.FireChan(): nextFireTime, err := t.readAndFanoutTimerTasks() @@ -276,7 +277,8 @@ func (t *timerQueueProcessorBase) internalProcessor() error { )) if err := t.timerQueueAckMgr.updateAckLevel(); shard.IsShardOwnershipLostError(err) { // shard is closed, shutdown timerQProcessor and bail out - go t.Stop() + // stop the entire timer queue processor, not just processor base. + go t.timerProcessor.Stop() return err } case <-t.newTimerCh: