Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
70370: stop,log: tracing improvements r=andreimatei a=andreimatei

Improvements around making the traces of long-running background processes look better. See individual commits.

Co-authored-by: Andrei Matei <andrei@cockroachlabs.com>
  • Loading branch information
craig[bot] and andreimatei committed Oct 8, 2021
2 parents aff3b79 + e0e83ca commit a2eedb4
Show file tree
Hide file tree
Showing 17 changed files with 435 additions and 174 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1415,7 +1415,7 @@ func (ds *DistSender) sendPartialBatchAsync(
ctx,
stop.TaskOpts{
TaskName: "kv.DistSender: sending partial batch",
ChildSpan: true,
SpanOpt: stop.ChildSpan,
Sem: ds.asyncSenderSem,
WaitForSem: false,
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvprober/kvprober.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (p *Prober) Metrics() Metrics {
func (p *Prober) Start(ctx context.Context, stopper *stop.Stopper) error {
ctx = logtags.AddTag(ctx, "kvprober", nil /* value */)
startLoop := func(ctx context.Context, opName string, probe func(context.Context, *kv.DB, planner), pl planner, interval *settings.DurationSetting) error {
return stopper.RunAsyncTask(ctx, opName, func(ctx context.Context) {
return stopper.RunAsyncTaskEx(ctx, stop.TaskOpts{TaskName: opName, SpanOpt: stop.SterileRootSpan}, func(ctx context.Context) {
defer logcrash.RecoverAndReportNonfatalPanic(ctx, &p.settings.SV)

rnd, _ /* seed */ := randutil.NewPseudoRand()
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/liveness/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ func (nl *NodeLiveness) Start(ctx context.Context, opts NodeLivenessStartOptions
nl.mu.engines = opts.Engines
nl.mu.Unlock()

_ = opts.Stopper.RunAsyncTask(ctx, "liveness-hb", func(context.Context) {
_ = opts.Stopper.RunAsyncTaskEx(ctx, stop.TaskOpts{TaskName: "liveness-hb", SpanOpt: stop.SterileRootSpan}, func(context.Context) {
ambient := nl.ambientCtx
ambient.AddLogTag("liveness-hb", nil)
ctx, cancel := opts.Stopper.WithCancelOnQuiesce(context.Background())
Expand Down
148 changes: 78 additions & 70 deletions pkg/kv/kvserver/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -788,85 +789,88 @@ func (bq *baseQueue) MaybeRemove(rangeID roachpb.RangeID) {
// stopper signals exit.
func (bq *baseQueue) processLoop(stopper *stop.Stopper) {
ctx := bq.AnnotateCtx(context.Background())
stop := func() {
done := func() {
bq.mu.Lock()
bq.mu.stopped = true
bq.mu.Unlock()
}
if err := stopper.RunAsyncTask(ctx, "queue-loop", func(ctx context.Context) {
defer stop()
if err := stopper.RunAsyncTaskEx(ctx,
stop.TaskOpts{TaskName: "queue-loop", SpanOpt: stop.SterileRootSpan},
func(ctx context.Context) {
defer done()

// nextTime is initially nil; we don't start any timers until the queue
// becomes non-empty.
var nextTime <-chan time.Time
// nextTime is initially nil; we don't start any timers until the queue
// becomes non-empty.
var nextTime <-chan time.Time

immediately := make(chan time.Time)
close(immediately)
immediately := make(chan time.Time)
close(immediately)

for {
select {
// Exit on stopper.
case <-stopper.ShouldQuiesce():
return
for {
select {
// Exit on stopper.
case <-stopper.ShouldQuiesce():
return

// Incoming signal sets the next time to process if there were previously
// no replicas in the queue.
case <-bq.incoming:
if nextTime == nil {
// When a replica is added, wake up immediately. This is mainly
// to facilitate testing without unnecessary sleeps.
nextTime = immediately
// Incoming signal sets the next time to process if there were previously
// no replicas in the queue.
case <-bq.incoming:
if nextTime == nil {
// When a replica is added, wake up immediately. This is mainly
// to facilitate testing without unnecessary sleeps.
nextTime = immediately

// In case we're in a test, still block on the impl.
bq.impl.timer(0)
}
// Process replicas as the timer expires.
case <-nextTime:
// Acquire from the process semaphore.
bq.processSem <- struct{}{}

repl := bq.pop()
if repl != nil {
annotatedCtx := repl.AnnotateCtx(ctx)
if stopper.RunAsyncTask(
annotatedCtx, fmt.Sprintf("storage.%s: processing replica", bq.name),
func(ctx context.Context) {
// Release semaphore when finished processing.
defer func() { <-bq.processSem }()

start := timeutil.Now()
err := bq.processReplica(ctx, repl)

duration := timeutil.Since(start)
bq.recordProcessDuration(ctx, duration)

bq.finishProcessingReplica(ctx, stopper, repl, err)
}) != nil {
// Release semaphore on task failure.
// In case we're in a test, still block on the impl.
bq.impl.timer(0)
}
// Process replicas as the timer expires.
case <-nextTime:
// Acquire from the process semaphore.
bq.processSem <- struct{}{}

repl := bq.pop()
if repl != nil {
annotatedCtx := repl.AnnotateCtx(ctx)
if stopper.RunAsyncTaskEx(annotatedCtx, stop.TaskOpts{
TaskName: bq.processOpName() + " [outer]",
},
func(ctx context.Context) {
// Release semaphore when finished processing.
defer func() { <-bq.processSem }()

start := timeutil.Now()
err := bq.processReplica(ctx, repl)

duration := timeutil.Since(start)
bq.recordProcessDuration(ctx, duration)

bq.finishProcessingReplica(ctx, stopper, repl, err)
}) != nil {
// Release semaphore on task failure.
<-bq.processSem
return
}
} else {
// Release semaphore if no replicas were available.
<-bq.processSem
return
}
} else {
// Release semaphore if no replicas were available.
<-bq.processSem
}

if bq.Length() == 0 {
nextTime = nil
} else {
// lastDur will be 0 after the first processing attempt.
lastDur := bq.lastProcessDuration()
switch t := bq.impl.timer(lastDur); t {
case 0:
nextTime = immediately
default:
nextTime = time.After(t)
if bq.Length() == 0 {
nextTime = nil
} else {
// lastDur will be 0 after the first processing attempt.
lastDur := bq.lastProcessDuration()
switch t := bq.impl.timer(lastDur); t {
case 0:
nextTime = immediately
default:
nextTime = time.After(t)
}
}
}
}
}
}); err != nil {
stop()
}); err != nil {
done()
}
}

Expand All @@ -888,7 +892,8 @@ func (bq *baseQueue) recordProcessDuration(ctx context.Context, dur time.Duratio
// called externally to the queue. bq.mu.Lock must not be held
// while calling this method.
//
// ctx should already be annotated by repl.AnnotateCtx().
// ctx should already be annotated by both bq.AnnotateCtx() and
// repl.AnnotateCtx().
func (bq *baseQueue) processReplica(ctx context.Context, repl replicaInQueue) error {
// Load the system config if it's needed.
var confReader spanconfig.StoreReader
Expand All @@ -913,7 +918,7 @@ func (bq *baseQueue) processReplica(ctx context.Context, repl replicaInQueue) er
return nil
}

ctx, span := bq.AnnotateCtxWithSpan(ctx, bq.name)
ctx, span := tracing.EnsureChildSpan(ctx, bq.Tracer, bq.processOpName())
defer span.Finish()
return contextutil.RunWithTimeout(ctx, fmt.Sprintf("%s queue process replica %d", bq.name, repl.GetRangeID()),
bq.processTimeoutFunc(bq.store.ClusterSettings(), repl), func(ctx context.Context) error {
Expand Down Expand Up @@ -1141,7 +1146,7 @@ func (bq *baseQueue) addToPurgatoryLocked(
}

workerCtx := bq.AnnotateCtx(context.Background())
_ = stopper.RunAsyncTask(workerCtx, "purgatory", func(ctx context.Context) {
_ = stopper.RunAsyncTaskEx(workerCtx, stop.TaskOpts{TaskName: bq.name + ".purgatory", SpanOpt: stop.SterileRootSpan}, func(ctx context.Context) {
ticker := time.NewTicker(purgatoryReportInterval)
for {
select {
Expand Down Expand Up @@ -1172,8 +1177,7 @@ func (bq *baseQueue) addToPurgatoryLocked(
}
annotatedCtx := repl.AnnotateCtx(ctx)
if stopper.RunTask(
annotatedCtx, fmt.Sprintf("storage.%s: purgatory processing replica", bq.name),
func(ctx context.Context) {
annotatedCtx, bq.processOpName(), func(ctx context.Context) {
err := bq.processReplica(ctx, repl)
bq.finishProcessingReplica(ctx, stopper, repl, err)
}) != nil {
Expand Down Expand Up @@ -1302,10 +1306,14 @@ func (bq *baseQueue) DrainQueue(stopper *stop.Stopper) {
// time it returns.
defer bq.lockProcessing()()

ctx := bq.AnnotateCtx(context.TODO())
ctx := bq.AnnotateCtx(context.Background())
for repl := bq.pop(); repl != nil; repl = bq.pop() {
annotatedCtx := repl.AnnotateCtx(ctx)
err := bq.processReplica(annotatedCtx, repl)
bq.finishProcessingReplica(annotatedCtx, stopper, repl, err)
}
}

func (bq *baseQueue) processOpName() string {
return fmt.Sprintf("queue.%s: processing replica", bq.name)
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
// later block on this lease acquisition too, and we can't include the
// acquisition's trace in all of them, but let's at least include it in
// the request that triggered it.
ChildSpan: true,
SpanOpt: stop.ChildSpan,
},
func(ctx context.Context) {
defer sp.Finish()
Expand Down
18 changes: 16 additions & 2 deletions pkg/kv/kvserver/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,27 @@ func (s *raftScheduler) Start(ctx context.Context, stopper *stop.Stopper) {
s.mu.Unlock()
s.mu.cond.Broadcast()
}
if err := stopper.RunAsyncTask(ctx, "raftsched-wait-quiesce", waitQuiesce); err != nil {
if err := stopper.RunAsyncTaskEx(ctx,
stop.TaskOpts{
TaskName: "raftsched-wait-quiesce",
// This task doesn't reference a parent because it runs for the server's
// lifetime.
SpanOpt: stop.SterileRootSpan,
},
waitQuiesce); err != nil {
waitQuiesce(ctx)
}

s.done.Add(s.numWorkers)
for i := 0; i < s.numWorkers; i++ {
if err := stopper.RunAsyncTask(ctx, "raft-worker", s.worker); err != nil {
if err := stopper.RunAsyncTaskEx(ctx,
stop.TaskOpts{
TaskName: "raft-worker",
// This task doesn't reference a parent because it runs for the server's
// lifetime.
SpanOpt: stop.SterileRootSpan,
},
s.worker); err != nil {
s.done.Done()
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/diagnostics/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ type Reporter struct {
// PeriodicallyReportDiagnostics starts a background worker that periodically
// phones home to report usage and diagnostics.
func (r *Reporter) PeriodicallyReportDiagnostics(ctx context.Context, stopper *stop.Stopper) {
_ = stopper.RunAsyncTask(ctx, "diagnostics", func(ctx context.Context) {
_ = stopper.RunAsyncTaskEx(ctx, stop.TaskOpts{TaskName: "diagnostics", SpanOpt: stop.SterileRootSpan}, func(ctx context.Context) {
defer logcrash.RecoverAndReportNonfatalPanic(ctx, &r.Settings.SV)
nextReport := r.StartTime

Expand Down
5 changes: 4 additions & 1 deletion pkg/server/diagnostics/update_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ type UpdateChecker struct {
// PeriodicallyCheckForUpdates starts a background worker that periodically
// phones home to check for updates.
func (u *UpdateChecker) PeriodicallyCheckForUpdates(ctx context.Context, stopper *stop.Stopper) {
_ = stopper.RunAsyncTask(ctx, "update-checker", func(ctx context.Context) {
_ = stopper.RunAsyncTaskEx(ctx, stop.TaskOpts{
TaskName: "update-checker",
SpanOpt: stop.SterileRootSpan,
}, func(ctx context.Context) {
defer logcrash.RecoverAndReportNonfatalPanic(ctx, &u.Settings.SV)
nextUpdateCheck := u.StartTime

Expand Down
Loading

0 comments on commit a2eedb4

Please sign in to comment.