Skip to content

Commit

Permalink
Replace time.After with timers (cadence-workflow#6303)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll authored and dkrotx committed Sep 26, 2024
1 parent 24f61ca commit 91d9818
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 6 deletions.
7 changes: 6 additions & 1 deletion common/cache/metricsScopeCache.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"time"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/metrics"
)

Expand All @@ -47,6 +48,7 @@ type (
cache atomic.Value
closeCh chan struct{}
flushDuration time.Duration
timeSource clock.TimeSource
}
)

Expand All @@ -59,16 +61,19 @@ func NewDomainMetricsScopeCache() DomainMetricsScopeCache {
},
closeCh: make(chan struct{}),
flushDuration: flushBufferedMetricsScopeDuration,
timeSource: clock.NewRealTimeSource(),
}

mc.cache.Store(make(metricsScopeMap))
return mc
}

func (c *domainMetricsScopeCache) flushBufferedMetricsScope(flushDuration time.Duration) {
ticker := c.timeSource.NewTicker(flushDuration)
defer ticker.Stop()
for {
select {
case <-time.After(flushDuration):
case <-ticker.Chan():
c.buffer.Lock()
if len(c.buffer.bufferMap) > 0 {
scopeMap := make(metricsScopeMap)
Expand Down
2 changes: 1 addition & 1 deletion service/history/engine/engineimpl/query_workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (e *historyEngineImpl) QueryWorkflow(
}
deadline := time.Now().Add(queryFirstDecisionTaskWaitTime)
for mutableStateResp.GetPreviousStartedEventID() <= 0 && time.Now().Before(deadline) {
<-time.After(queryFirstDecisionTaskCheckInterval)
time.Sleep(queryFirstDecisionTaskCheckInterval)
mutableStateResp, err = e.getMutableState(ctx, request.GetDomainUUID(), execution)
if err != nil {
return nil, err
Expand Down
13 changes: 12 additions & 1 deletion service/history/queue/timer_queue_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,13 @@ func (t *timerQueueProcessor) completeTimerLoop() {
completeTimer := time.NewTimer(t.config.TimerProcessorCompleteTimerInterval())
defer completeTimer.Stop()

// Create a retryTimer once, and reset it as needed
retryTimer := time.NewTimer(0)
defer retryTimer.Stop()
// Stop it immediately because we don't want it to fire initially
if !retryTimer.Stop() {
<-retryTimer.C
}
for {
select {
case <-t.shutdownChan:
Expand All @@ -425,11 +432,15 @@ func (t *timerQueueProcessor) completeTimerLoop() {
return
}

// Reset the retryTimer for the delay between attempts
// TODO: the first retry has 0 backoff, revisit it to see if it's expected
retryDuration := time.Duration(attempt*100) * time.Millisecond
retryTimer.Reset(retryDuration)
select {
case <-t.shutdownChan:
t.drain()
return
case <-time.After(time.Duration(attempt*100) * time.Millisecond):
case <-retryTimer.C:
// do nothing. retry loop will continue
}
}
Expand Down
15 changes: 13 additions & 2 deletions service/history/queue/transfer_queue_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,14 @@ func (t *transferQueueProcessor) completeTransferLoop() {
completeTimer := time.NewTimer(t.config.TransferProcessorCompleteTransferInterval())
defer completeTimer.Stop()

// Create a retryTimer once, and reset it as needed
retryTimer := time.NewTimer(0)
defer retryTimer.Stop()
// Stop it immediately because we don't want it to fire initially
if !retryTimer.Stop() {
<-retryTimer.C
}

for {
select {
case <-t.shutdownChan:
Expand All @@ -387,12 +395,15 @@ func (t *transferQueueProcessor) completeTransferLoop() {
t.Stop()
return
}

// Reset the retryTimer for the delay between attempts
// TODO: the first retry has 0 backoff, revisit it to see if it's expected
retryDuration := time.Duration(attempt*100) * time.Millisecond
retryTimer.Reset(retryDuration)
select {
case <-t.shutdownChan:
t.drain()
return
case <-time.After(time.Duration(attempt*100) * time.Millisecond):
case <-retryTimer.C:
// do nothing. retry loop will continue
}
}
Expand Down
7 changes: 6 additions & 1 deletion service/worker/scanner/tasklist/scavenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
Expand Down Expand Up @@ -59,6 +60,7 @@ type (
maxTasksPerJobFn dynamicconfig.IntPropertyFn
cleanOrphans dynamicconfig.BoolPropertyFn
pollInterval time.Duration
timeSource clock.TimeSource

// stopC is used to signal the scavenger to stop
stopC chan struct{}
Expand Down Expand Up @@ -178,6 +180,7 @@ func NewScavenger(
pollInterval: pollInterval,
maxTasksPerJobFn: maxTasksPerJobFn,
getOrphanTasksPageSizeFn: getOrphanTasksPageSize,
timeSource: clock.NewRealTimeSource(),
}
}

Expand Down Expand Up @@ -257,9 +260,11 @@ func (s *Scavenger) process(taskListInfo *p.TaskListInfo) executor.TaskStatus {

func (s *Scavenger) awaitExecutor() {
outstanding := s.executor.TaskCount()
ticker := s.timeSource.NewTicker(s.pollInterval)
defer ticker.Stop()
for outstanding > 0 {
select {
case <-time.After(s.pollInterval):
case <-ticker.Chan():
outstanding = s.executor.TaskCount()
s.scope.UpdateGauge(metrics.TaskListOutstandingCount, float64(outstanding))
case <-s.stopC:
Expand Down

0 comments on commit 91d9818

Please sign in to comment.