diff --git a/common/cache/metricsScopeCache.go b/common/cache/metricsScopeCache.go index a7e7a686c46..82b3bd6332f 100644 --- a/common/cache/metricsScopeCache.go +++ b/common/cache/metricsScopeCache.go @@ -28,6 +28,7 @@ import ( "time" "github.com/uber/cadence/common" + "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/metrics" ) @@ -47,6 +48,7 @@ type ( cache atomic.Value closeCh chan struct{} flushDuration time.Duration + timeSource clock.TimeSource } ) @@ -59,6 +61,7 @@ func NewDomainMetricsScopeCache() DomainMetricsScopeCache { }, closeCh: make(chan struct{}), flushDuration: flushBufferedMetricsScopeDuration, + timeSource: clock.NewRealTimeSource(), } mc.cache.Store(make(metricsScopeMap)) @@ -66,9 +69,11 @@ func NewDomainMetricsScopeCache() DomainMetricsScopeCache { } func (c *domainMetricsScopeCache) flushBufferedMetricsScope(flushDuration time.Duration) { + ticker := c.timeSource.NewTicker(flushDuration) + defer ticker.Stop() for { select { - case <-time.After(flushDuration): + case <-ticker.Chan(): c.buffer.Lock() if len(c.buffer.bufferMap) > 0 { scopeMap := make(metricsScopeMap) diff --git a/service/history/engine/engineimpl/query_workflow.go b/service/history/engine/engineimpl/query_workflow.go index 15fef7f42e8..623e138b955 100644 --- a/service/history/engine/engineimpl/query_workflow.go +++ b/service/history/engine/engineimpl/query_workflow.go @@ -87,7 +87,7 @@ func (e *historyEngineImpl) QueryWorkflow( } deadline := time.Now().Add(queryFirstDecisionTaskWaitTime) for mutableStateResp.GetPreviousStartedEventID() <= 0 && time.Now().Before(deadline) { - <-time.After(queryFirstDecisionTaskCheckInterval) + time.Sleep(queryFirstDecisionTaskCheckInterval) mutableStateResp, err = e.getMutableState(ctx, request.GetDomainUUID(), execution) if err != nil { return nil, err diff --git a/service/history/queue/timer_queue_processor.go b/service/history/queue/timer_queue_processor.go index 7ab0eaf3c61..329622bf608 100644 --- a/service/history/queue/timer_queue_processor.go +++ b/service/history/queue/timer_queue_processor.go @@ -401,6 +401,13 @@ func (t *timerQueueProcessor) completeTimerLoop() { completeTimer := time.NewTimer(t.config.TimerProcessorCompleteTimerInterval()) defer completeTimer.Stop() + // Create a retryTimer once, and reset it as needed + retryTimer := time.NewTimer(0) + defer retryTimer.Stop() + // Stop it immediately because we don't want it to fire initially + if !retryTimer.Stop() { + <-retryTimer.C + } for { select { case <-t.shutdownChan: @@ -425,11 +432,15 @@ func (t *timerQueueProcessor) completeTimerLoop() { return } + // Reset the retryTimer for the delay between attempts + // TODO: the first retry has 0 backoff, revisit it to see if it's expected + retryDuration := time.Duration(attempt*100) * time.Millisecond + retryTimer.Reset(retryDuration) select { case <-t.shutdownChan: t.drain() return - case <-time.After(time.Duration(attempt*100) * time.Millisecond): + case <-retryTimer.C: // do nothing. retry loop will continue } } diff --git a/service/history/queue/transfer_queue_processor.go b/service/history/queue/transfer_queue_processor.go index 02af6cf2e2e..5ff660acc19 100644 --- a/service/history/queue/transfer_queue_processor.go +++ b/service/history/queue/transfer_queue_processor.go @@ -363,6 +363,14 @@ func (t *transferQueueProcessor) completeTransferLoop() { completeTimer := time.NewTimer(t.config.TransferProcessorCompleteTransferInterval()) defer completeTimer.Stop() + // Create a retryTimer once, and reset it as needed + retryTimer := time.NewTimer(0) + defer retryTimer.Stop() + // Stop it immediately because we don't want it to fire initially + if !retryTimer.Stop() { + <-retryTimer.C + } + for { select { case <-t.shutdownChan: @@ -387,12 +395,15 @@ func (t *transferQueueProcessor) completeTransferLoop() { t.Stop() return } - + // Reset the retryTimer for the delay between attempts + // TODO: the first retry has 0 backoff, revisit it to see if it's expected + retryDuration := time.Duration(attempt*100) * time.Millisecond + retryTimer.Reset(retryDuration) select { case <-t.shutdownChan: t.drain() return - case <-time.After(time.Duration(attempt*100) * time.Millisecond): + case <-retryTimer.C: // do nothing. retry loop will continue } } diff --git a/service/worker/scanner/tasklist/scavenger.go b/service/worker/scanner/tasklist/scavenger.go index 353e3d47060..1c26ff46fa8 100644 --- a/service/worker/scanner/tasklist/scavenger.go +++ b/service/worker/scanner/tasklist/scavenger.go @@ -28,6 +28,7 @@ import ( "github.com/uber/cadence/common" "github.com/uber/cadence/common/cache" + "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" @@ -59,6 +60,7 @@ type ( maxTasksPerJobFn dynamicconfig.IntPropertyFn cleanOrphans dynamicconfig.BoolPropertyFn pollInterval time.Duration + timeSource clock.TimeSource // stopC is used to signal the scavenger to stop stopC chan struct{} @@ -178,6 +180,7 @@ func NewScavenger( pollInterval: pollInterval, maxTasksPerJobFn: maxTasksPerJobFn, getOrphanTasksPageSizeFn: getOrphanTasksPageSize, + timeSource: clock.NewRealTimeSource(), } } @@ -257,9 +260,11 @@ func (s *Scavenger) process(taskListInfo *p.TaskListInfo) executor.TaskStatus { func (s *Scavenger) awaitExecutor() { outstanding := s.executor.TaskCount() + ticker := s.timeSource.NewTicker(s.pollInterval) + defer ticker.Stop() for outstanding > 0 { select { - case <-time.After(s.pollInterval): + case <-ticker.Chan(): outstanding = s.executor.TaskCount() s.scope.UpdateGauge(metrics.TaskListOutstandingCount, float64(outstanding)) case <-s.stopC: