diff --git a/service/history/queue/timer_queue_failover_processor.go b/service/history/queue/timer_queue_failover_processor.go index 78f52e26f45..6d34461a71f 100644 --- a/service/history/queue/timer_queue_failover_processor.go +++ b/service/history/queue/timer_queue_failover_processor.go @@ -28,7 +28,6 @@ import ( "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/persistence" - "github.com/uber/cadence/service/history/engine" "github.com/uber/cadence/service/history/shard" "github.com/uber/cadence/service/history/task" ) @@ -36,7 +35,6 @@ import ( func newTimerQueueFailoverProcessor( standbyClusterName string, shardContext shard.Context, - historyEngine engine.Engine, taskProcessor task.Processor, taskAllocator TaskAllocator, taskExecutor task.Executor, diff --git a/service/history/queue/timer_queue_processor.go b/service/history/queue/timer_queue_processor.go index cf4800295c0..8890b7916c7 100644 --- a/service/history/queue/timer_queue_processor.go +++ b/service/history/queue/timer_queue_processor.go @@ -60,13 +60,14 @@ type timerQueueProcessor struct { shutdownChan chan struct{} shutdownWG sync.WaitGroup - ackLevel time.Time - taskAllocator TaskAllocator - activeTaskExecutor task.Executor - activeQueueProcessor *timerQueueProcessorBase - standbyQueueProcessors map[string]*timerQueueProcessorBase - standbyTaskExecutors []task.Executor - standbyQueueTimerGates map[string]RemoteTimerGate + ackLevel time.Time + taskAllocator TaskAllocator + activeTaskExecutor task.Executor + activeQueueProcessor *timerQueueProcessorBase + standbyQueueProcessors map[string]*timerQueueProcessorBase + standbyTaskExecutors []task.Executor + standbyQueueTimerGates map[string]RemoteTimerGate + failoverQueueProcessors []*timerQueueProcessorBase } // NewTimerQueueProcessor creates a new timer QueueProcessor @@ -206,7 +207,7 @@ func (t *timerQueueProcessor) Stop() { t.logger.Warn("transferQueueProcessor timed out on shut down", tag.LifeCycleStopTimedout) } t.activeQueueProcessor.Stop() - t.activeTaskExecutor.Stop() + for _, standbyQueueProcessor := range t.standbyQueueProcessors { standbyQueueProcessor.Stop() } @@ -215,6 +216,15 @@ func (t *timerQueueProcessor) Stop() { for _, standbyTaskExecutor := range t.standbyTaskExecutors { standbyTaskExecutor.Stop() } + + if len(t.failoverQueueProcessors) > 0 { + t.logger.Info("Shutting down failover timer queues", tag.Counter(len(t.failoverQueueProcessors))) + for _, failoverQueueProcessor := range t.failoverQueueProcessors { + failoverQueueProcessor.Stop() + } + } + + t.activeTaskExecutor.Stop() } func (t *timerQueueProcessor) NotifyNewTask(clusterName string, info *hcommon.NotifyTaskInfo) { @@ -300,7 +310,6 @@ func (t *timerQueueProcessor) FailoverDomain(domainIDs map[string]struct{}) { updateClusterAckLevelFn, failoverQueueProcessor := newTimerQueueFailoverProcessor( standbyClusterName, t.shard, - t.historyEngine, t.taskProcessor, t.taskAllocator, t.activeTaskExecutor, @@ -316,6 +325,12 @@ func (t *timerQueueProcessor) FailoverDomain(domainIDs map[string]struct{}) { if err != nil { t.logger.Error("Error update shard ack level", tag.Error(err)) } + + // Failover queue processors are started on the fly when domains are failed over. + // Failover queue processors will be stopped when the timer queue instance is stopped (due to restart or shard movement). + // This means the failover queue processor might not finish its job. + // There is no mechanism to re-start ongoing failover queue processors in the new shard owner. + t.failoverQueueProcessors = append(t.failoverQueueProcessors, failoverQueueProcessor) failoverQueueProcessor.Start() } @@ -367,7 +382,7 @@ func (t *timerQueueProcessor) UnlockTaskProcessing() { func (t *timerQueueProcessor) drain() { if !t.shard.GetConfig().QueueProcessorEnableGracefulSyncShutdown() { if err := t.completeTimer(context.Background()); err != nil { - t.logger.Error("Failed to complete timer task during shutdown", tag.Error(err)) + t.logger.Error("Failed to complete timer task during drain", tag.Error(err)) } return } @@ -376,7 +391,7 @@ func (t *timerQueueProcessor) drain() { ctx, cancel := context.WithTimeout(context.Background(), gracefulShutdownTimeout) defer cancel() if err := t.completeTimer(ctx); err != nil { - t.logger.Error("Failed to complete timer task during shutdown", tag.Error(err)) + t.logger.Error("Failed to complete timer task during drain", tag.Error(err)) } } diff --git a/service/history/queue/transfer_queue_processor.go b/service/history/queue/transfer_queue_processor.go index 0b124ab3af1..b548649cfef 100644 --- a/service/history/queue/transfer_queue_processor.go +++ b/service/history/queue/transfer_queue_processor.go @@ -73,11 +73,12 @@ type transferQueueProcessor struct { shutdownChan chan struct{} shutdownWG sync.WaitGroup - ackLevel int64 - taskAllocator TaskAllocator - activeTaskExecutor task.Executor - activeQueueProcessor *transferQueueProcessorBase - standbyQueueProcessors map[string]*transferQueueProcessorBase + ackLevel int64 + taskAllocator TaskAllocator + activeTaskExecutor task.Executor + activeQueueProcessor *transferQueueProcessorBase + standbyQueueProcessors map[string]*transferQueueProcessorBase + failoverQueueProcessors []*transferQueueProcessorBase } // NewTransferQueueProcessor creates a new transfer QueueProcessor @@ -110,7 +111,6 @@ func NewTransferQueueProcessor( activeQueueProcessor := newTransferQueueActiveProcessor( shard, - historyEngine, taskProcessor, taskAllocator, activeTaskExecutor, @@ -141,7 +141,6 @@ func NewTransferQueueProcessor( standbyQueueProcessors[clusterName] = newTransferQueueStandbyProcessor( clusterName, shard, - historyEngine, taskProcessor, taskAllocator, standbyTaskExecutor, @@ -206,6 +205,13 @@ func (t *transferQueueProcessor) Stop() { for _, standbyQueueProcessor := range t.standbyQueueProcessors { standbyQueueProcessor.Stop() } + + if len(t.failoverQueueProcessors) > 0 { + t.logger.Info("Shutting down failover transfer queues", tag.Counter(len(t.failoverQueueProcessors))) + for _, failoverQueueProcessor := range t.failoverQueueProcessors { + failoverQueueProcessor.Stop() + } + } } func (t *transferQueueProcessor) NotifyNewTask(clusterName string, info *hcommon.NotifyTaskInfo) { @@ -270,7 +276,6 @@ func (t *transferQueueProcessor) FailoverDomain(domainIDs map[string]struct{}) { updateShardAckLevel, failoverQueueProcessor := newTransferQueueFailoverProcessor( t.shard, - t.historyEngine, t.taskProcessor, t.taskAllocator, t.activeTaskExecutor, @@ -287,6 +292,12 @@ func (t *transferQueueProcessor) FailoverDomain(domainIDs map[string]struct{}) { if err != nil { t.logger.Error("Error update shard ack level", tag.Error(err)) } + + // Failover queue processors are started on the fly when domains are failed over. + // Failover queue processors will be stopped when the transfer queue instance is stopped (due to restart or shard movement). + // This means the failover queue processor might not finish its job. + // There is no mechanism to re-start ongoing failover queue processors in the new shard owner. + t.failoverQueueProcessors = append(t.failoverQueueProcessors, failoverQueueProcessor) failoverQueueProcessor.Start() } @@ -456,7 +467,6 @@ func (t *transferQueueProcessor) completeTransfer() error { func newTransferQueueActiveProcessor( shard shard.Context, - historyEngine engine.Engine, taskProcessor task.Processor, taskAllocator TaskAllocator, taskExecutor task.Executor, @@ -517,7 +527,6 @@ func newTransferQueueActiveProcessor( func newTransferQueueStandbyProcessor( clusterName string, shard shard.Context, - historyEngine engine.Engine, taskProcessor task.Processor, taskAllocator TaskAllocator, taskExecutor task.Executor, @@ -594,7 +603,6 @@ func newTransferQueueStandbyProcessor( func newTransferQueueFailoverProcessor( shardContext shard.Context, - historyEngine engine.Engine, taskProcessor task.Processor, taskAllocator TaskAllocator, taskExecutor task.Executor,