Skip to content

Commit

Permalink
cleanup: prevent leaks from time.After
Browse files Browse the repository at this point in the history
This PR replaces use of time.After with a safe helper function
that creates a time.Timer to use instead. The new function returns
both a time.Timer and a Stop function that the caller must handle.

Unlike time.NewTimer, the helper function does not panic if the duration
set is <= 0.
  • Loading branch information
shoenig committed Feb 2, 2022
1 parent f031c7d commit 40c5e69
Show file tree
Hide file tree
Showing 15 changed files with 138 additions and 18 deletions.
16 changes: 13 additions & 3 deletions api/allocations_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ import (
"github.com/gorilla/websocket"
)

const (
// heartbeatInterval is the amount of time to wait between sending heartbeats
// during an exec streaming operation
heartbeatInterval = 10 * time.Second
)

type execSession struct {
client *Client
alloc *Allocation
Expand Down Expand Up @@ -177,15 +183,19 @@ func (s *execSession) startTransmit(ctx context.Context, conn *websocket.Conn) <

// send a heartbeat every 10 seconds
go func() {
t := time.NewTimer(heartbeatInterval)
defer t.Stop()

for {
t.Reset(heartbeatInterval)

select {
case <-ctx.Done():
return
// heartbeat message
case <-time.After(10 * time.Second):
case <-t.C:
// heartbeat message
send(&execStreamingInputHeartbeat)
}

}
}()

Expand Down
7 changes: 6 additions & 1 deletion client/allocrunner/taskrunner/task_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,9 @@ func (tr *TaskRunner) Run() {
return
}

timer, stop := helper.NewSafeTimer(0) // timer duration calculated JIT
defer stop()

MAIN:
for !tr.shouldShutdown() {
select {
Expand Down Expand Up @@ -612,9 +615,11 @@ MAIN:
break MAIN
}

timer.Reset(restartDelay)

// Actually restart by sleeping and also watching for destroy events
select {
case <-time.After(restartDelay):
case <-timer.C:
case <-tr.killCtx.Done():
tr.logger.Trace("task killed between restarts", "delay", restartDelay)
break MAIN
Expand Down
9 changes: 8 additions & 1 deletion command/agent/consul/version_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

log "github.com/hashicorp/go-hclog"
version "github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/helper"
)

// checkConsulTLSSkipVerify logs if Consul does not support TLSSkipVerify on
Expand All @@ -20,6 +21,10 @@ func checkConsulTLSSkipVerify(ctx context.Context, logger log.Logger, client Age
defer close(done)

i := uint64(0)

timer, stop := helper.NewSafeTimer(limit)
defer stop()

for {
self, err := client.Self()
if err == nil {
Expand All @@ -39,10 +44,12 @@ func checkConsulTLSSkipVerify(ctx context.Context, logger log.Logger, client Age
i++
}

timer.Reset(backoff)

select {
case <-ctx.Done():
return
case <-time.After(backoff):
case <-timer.C:
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion command/agent/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/helper"
)

// Monitor provides a mechanism to stream logs using go-hclog
Expand Down Expand Up @@ -107,12 +108,17 @@ func (d *monitor) Start() <-chan []byte {
// dropped messages and makes room on the logCh
// to add a dropped message count warning
go func() {
timer, stop := helper.NewSafeTimer(d.droppedDuration)
defer stop()

// loop and check for dropped messages
for {
timer.Reset(d.droppedDuration)

select {
case <-d.doneCh:
return
case <-time.After(d.droppedDuration):
case <-timer.C:
d.Lock()

// Check if there have been any dropped messages.
Expand Down
11 changes: 10 additions & 1 deletion drivers/docker/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
docker "github.com/fsouza/go-dockerclient"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/drivers/docker/util"
"github.com/hashicorp/nomad/helper"
nstructs "github.com/hashicorp/nomad/nomad/structs"
)

Expand Down Expand Up @@ -93,17 +94,25 @@ func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, inte
// backoff and retry used if the docker stats API returns an error
var backoff time.Duration
var retry int

// create an interval timer
timer, stop := helper.NewSafeTimer(interval)
defer stop()

// loops until doneCh is closed
for {
timer.Reset(interval)

if backoff > 0 {
select {
case <-time.After(backoff):
case <-timer.C:
case <-ctx.Done():
return
case <-h.doneCh:
return
}
}

// make a channel for docker stats structs and start a collector to
// receive stats from docker and emit nomad stats
// statsCh will always be closed by docker client.
Expand Down
8 changes: 7 additions & 1 deletion drivers/shared/eventer/eventer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/plugins/drivers"
)

Expand Down Expand Up @@ -62,14 +63,19 @@ func NewEventer(ctx context.Context, logger hclog.Logger) *Eventer {
// eventLoop is the main logic which pulls events from the channel and broadcasts
// them to all consumers
func (e *Eventer) eventLoop() {
timer, stop := helper.NewSafeTimer(ConsumerGCInterval)
defer stop()

for {
timer.Reset(ConsumerGCInterval)

select {
case <-e.ctx.Done():
e.logger.Trace("task event loop shutdown")
return
case event := <-e.events:
e.iterateConsumers(event)
case <-time.After(ConsumerGCInterval):
case <-timer.C:
e.gcConsumers()
}
}
Expand Down
27 changes: 27 additions & 0 deletions helper/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,3 +572,30 @@ func PathEscapesSandbox(sandboxDir, path string) bool {
}
return false
}

// TimerStopFunc is used to stop a time.Timer created with NewSafeTimer
type TimerStopFunc func()

// NewSafeTimer creates a time.Timer but does not panic if duration is <= 0.
//
// Using a time.Timer is recommended instead of time.After when it is necessary
// to avoid leaking goroutines (e.g. in a select inside a loop).
//
// Returns the time.Timer and also a TimerStopFunc, forcing the caller to deal
// with stopping the time.Timer to avoid leaking a goroutine.
func NewSafeTimer(duration time.Duration) (*time.Timer, TimerStopFunc) {
if duration <= 0 {
// Avoid panic by using the smallest positive value. This is close enough
// to the behavior of time.After(0), which this helper is intended to
// replace.
// https://go.dev/play/p/EIkm9MsPbHY
duration = 1
}

t := time.NewTimer(duration)
cancel := func() {
t.Stop()
}

return t, cancel
}
14 changes: 14 additions & 0 deletions helper/funcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,3 +431,17 @@ func TestPathEscapesSandbox(t *testing.T) {
})
}
}

func Test_NewSafeTimer(t *testing.T) {
t.Run("zero", func(t *testing.T) {
timer, stop := NewSafeTimer(0)
defer stop()
<-timer.C
})

t.Run("positive", func(t *testing.T) {
timer, stop := NewSafeTimer(1)
defer stop()
<-timer.C
})
}
7 changes: 6 additions & 1 deletion nomad/blocked_evals.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,9 +706,14 @@ func (b *BlockedEvals) Stats() *BlockedStats {

// EmitStats is used to export metrics about the blocked eval tracker while enabled
func (b *BlockedEvals) EmitStats(period time.Duration, stopCh <-chan struct{}) {
timer, stop := helper.NewSafeTimer(period)
defer stop()

for {
timer.Reset(period)

select {
case <-time.After(period):
case <-timer.C:
stats := b.Stats()
metrics.SetGauge([]string{"nomad", "blocked_evals", "total_quota_limit"}, float32(stats.TotalQuotaLimit))
metrics.SetGauge([]string{"nomad", "blocked_evals", "total_blocked"}, float32(stats.TotalBlocked))
Expand Down
6 changes: 6 additions & 0 deletions nomad/drainer/watch_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,16 @@ func (w *drainingJobWatcher) deregisterJob(jobID, namespace string) {

// watch is the long lived watching routine that detects job drain changes.
func (w *drainingJobWatcher) watch() {
timer, stop := helper.NewSafeTimer(stateReadErrorDelay)
defer stop()

waitIndex := uint64(1)

for {
timer.Reset(stateReadErrorDelay)
w.logger.Trace("getting job allocs at index", "index", waitIndex)
jobAllocs, index, err := w.getJobAllocs(w.getQueryCtx(), waitIndex)

if err != nil {
if err == context.Canceled {
// Determine if it is a cancel or a shutdown
Expand Down
9 changes: 7 additions & 2 deletions nomad/drainer/watch_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package drainer

import (
"context"
"time"

log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/helper"

"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -148,8 +148,13 @@ func NewNodeDrainWatcher(ctx context.Context, limiter *rate.Limiter, state *stat

// watch is the long lived watching routine that detects node changes.
func (w *nodeDrainWatcher) watch() {
timer, stop := helper.NewSafeTimer(stateReadErrorDelay)
defer stop()

nindex := uint64(1)

for {
timer.Reset(stateReadErrorDelay)
nodes, index, err := w.getNodes(nindex)
if err != nil {
if err == context.Canceled {
Expand All @@ -160,7 +165,7 @@ func (w *nodeDrainWatcher) watch() {
select {
case <-w.ctx.Done():
return
case <-time.After(stateReadErrorDelay):
case <-timer.C:
continue
}
}
Expand Down
11 changes: 8 additions & 3 deletions nomad/eval_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ package nomad

import (
"container/heap"
"context"
"errors"
"fmt"
"math/rand"
"sync"
"time"

"context"

metrics "github.com/armon/go-metrics"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/lib/delayheap"
"github.com/hashicorp/nomad/nomad/structs"
Expand Down Expand Up @@ -835,9 +835,14 @@ func (b *EvalBroker) Stats() *BrokerStats {

// EmitStats is used to export metrics about the broker while enabled
func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) {
timer, stop := helper.NewSafeTimer(period)
defer stop()

for {
timer.Reset(period)

select {
case <-time.After(period):
case <-timer.C:
stats := b.Stats()
metrics.SetGauge([]string{"nomad", "broker", "total_ready"}, float32(stats.TotalReady))
metrics.SetGauge([]string{"nomad", "broker", "total_unacked"}, float32(stats.TotalUnacked))
Expand Down
7 changes: 5 additions & 2 deletions nomad/plan_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

metrics "github.com/armon/go-metrics"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
)

Expand Down Expand Up @@ -196,12 +197,14 @@ func (q *PlanQueue) Stats() *QueueStats {

// EmitStats is used to export metrics about the broker while enabled
func (q *PlanQueue) EmitStats(period time.Duration, stopCh <-chan struct{}) {
timer, stop := helper.NewSafeTimer(period)
defer stop()

for {
select {
case <-time.After(period):
case <-timer.C:
stats := q.Stats()
metrics.SetGauge([]string{"nomad", "plan", "queue_depth"}, float32(stats.Depth))

case <-stopCh:
return
}
Expand Down
8 changes: 7 additions & 1 deletion nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
multierror "github.com/hashicorp/go-multierror"
lru "github.com/hashicorp/golang-lru"
"github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/codec"
"github.com/hashicorp/nomad/helper/pool"
"github.com/hashicorp/nomad/helper/stats"
Expand Down Expand Up @@ -1852,9 +1853,14 @@ func (s *Server) Stats() map[string]map[string]string {

// EmitRaftStats is used to export metrics about raft indexes and state store snapshot index
func (s *Server) EmitRaftStats(period time.Duration, stopCh <-chan struct{}) {
timer, stop := helper.NewSafeTimer(period)
defer stop()

for {
timer.Reset(period)

select {
case <-time.After(period):
case <-timer.C:
lastIndex := s.raft.LastIndex()
metrics.SetGauge([]string{"raft", "lastIndex"}, float32(lastIndex))
appliedIndex := s.raft.AppliedIndex()
Expand Down
Loading

0 comments on commit 40c5e69

Please sign in to comment.