Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stop,log: tracing improvements #70370

Merged
merged 3 commits into from
Oct 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1415,7 +1415,7 @@ func (ds *DistSender) sendPartialBatchAsync(
ctx,
stop.TaskOpts{
TaskName: "kv.DistSender: sending partial batch",
ChildSpan: true,
SpanOpt: stop.ChildSpan,
Sem: ds.asyncSenderSem,
WaitForSem: false,
},
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvprober/kvprober.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func (p *Prober) Metrics() Metrics {
func (p *Prober) Start(ctx context.Context, stopper *stop.Stopper) error {
ctx = logtags.AddTag(ctx, "kvprober", nil /* value */)
startLoop := func(ctx context.Context, opName string, probe func(context.Context, *kv.DB, planner), pl planner, interval *settings.DurationSetting) error {
return stopper.RunAsyncTask(ctx, opName, func(ctx context.Context) {
return stopper.RunAsyncTaskEx(ctx, stop.TaskOpts{TaskName: opName, SpanOpt: stop.SterileRootSpan}, func(ctx context.Context) {
defer logcrash.RecoverAndReportNonfatalPanic(ctx, &p.settings.SV)

rnd, _ /* seed */ := randutil.NewPseudoRand()
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/liveness/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ func (nl *NodeLiveness) Start(ctx context.Context, opts NodeLivenessStartOptions
nl.mu.engines = opts.Engines
nl.mu.Unlock()

_ = opts.Stopper.RunAsyncTask(ctx, "liveness-hb", func(context.Context) {
_ = opts.Stopper.RunAsyncTaskEx(ctx, stop.TaskOpts{TaskName: "liveness-hb", SpanOpt: stop.SterileRootSpan}, func(context.Context) {
ambient := nl.ambientCtx
ambient.AddLogTag("liveness-hb", nil)
ctx, cancel := opts.Stopper.WithCancelOnQuiesce(context.Background())
Expand Down
148 changes: 78 additions & 70 deletions pkg/kv/kvserver/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -788,85 +789,88 @@ func (bq *baseQueue) MaybeRemove(rangeID roachpb.RangeID) {
// stopper signals exit.
func (bq *baseQueue) processLoop(stopper *stop.Stopper) {
ctx := bq.AnnotateCtx(context.Background())
stop := func() {
done := func() {
bq.mu.Lock()
bq.mu.stopped = true
bq.mu.Unlock()
}
if err := stopper.RunAsyncTask(ctx, "queue-loop", func(ctx context.Context) {
defer stop()
if err := stopper.RunAsyncTaskEx(ctx,
stop.TaskOpts{TaskName: "queue-loop", SpanOpt: stop.SterileRootSpan},
func(ctx context.Context) {
defer done()

// nextTime is initially nil; we don't start any timers until the queue
// becomes non-empty.
var nextTime <-chan time.Time
// nextTime is initially nil; we don't start any timers until the queue
// becomes non-empty.
var nextTime <-chan time.Time

immediately := make(chan time.Time)
close(immediately)
immediately := make(chan time.Time)
close(immediately)

for {
select {
// Exit on stopper.
case <-stopper.ShouldQuiesce():
return
for {
select {
// Exit on stopper.
case <-stopper.ShouldQuiesce():
return

// Incoming signal sets the next time to process if there were previously
// no replicas in the queue.
case <-bq.incoming:
if nextTime == nil {
// When a replica is added, wake up immediately. This is mainly
// to facilitate testing without unnecessary sleeps.
nextTime = immediately
// Incoming signal sets the next time to process if there were previously
// no replicas in the queue.
case <-bq.incoming:
if nextTime == nil {
// When a replica is added, wake up immediately. This is mainly
// to facilitate testing without unnecessary sleeps.
nextTime = immediately

// In case we're in a test, still block on the impl.
bq.impl.timer(0)
}
// Process replicas as the timer expires.
case <-nextTime:
// Acquire from the process semaphore.
bq.processSem <- struct{}{}

repl := bq.pop()
if repl != nil {
annotatedCtx := repl.AnnotateCtx(ctx)
if stopper.RunAsyncTask(
annotatedCtx, fmt.Sprintf("storage.%s: processing replica", bq.name),
func(ctx context.Context) {
// Release semaphore when finished processing.
defer func() { <-bq.processSem }()

start := timeutil.Now()
err := bq.processReplica(ctx, repl)

duration := timeutil.Since(start)
bq.recordProcessDuration(ctx, duration)

bq.finishProcessingReplica(ctx, stopper, repl, err)
}) != nil {
// Release semaphore on task failure.
// In case we're in a test, still block on the impl.
bq.impl.timer(0)
}
// Process replicas as the timer expires.
case <-nextTime:
// Acquire from the process semaphore.
bq.processSem <- struct{}{}

repl := bq.pop()
if repl != nil {
annotatedCtx := repl.AnnotateCtx(ctx)
if stopper.RunAsyncTaskEx(annotatedCtx, stop.TaskOpts{
TaskName: bq.processOpName() + " [outer]",
},
func(ctx context.Context) {
// Release semaphore when finished processing.
defer func() { <-bq.processSem }()

start := timeutil.Now()
err := bq.processReplica(ctx, repl)

duration := timeutil.Since(start)
bq.recordProcessDuration(ctx, duration)

bq.finishProcessingReplica(ctx, stopper, repl, err)
}) != nil {
// Release semaphore on task failure.
<-bq.processSem
return
}
} else {
// Release semaphore if no replicas were available.
<-bq.processSem
return
}
} else {
// Release semaphore if no replicas were available.
<-bq.processSem
}

if bq.Length() == 0 {
nextTime = nil
} else {
// lastDur will be 0 after the first processing attempt.
lastDur := bq.lastProcessDuration()
switch t := bq.impl.timer(lastDur); t {
case 0:
nextTime = immediately
default:
nextTime = time.After(t)
if bq.Length() == 0 {
nextTime = nil
} else {
// lastDur will be 0 after the first processing attempt.
lastDur := bq.lastProcessDuration()
switch t := bq.impl.timer(lastDur); t {
case 0:
nextTime = immediately
default:
nextTime = time.After(t)
}
}
}
}
}
}); err != nil {
stop()
}); err != nil {
done()
}
}

Expand All @@ -888,7 +892,8 @@ func (bq *baseQueue) recordProcessDuration(ctx context.Context, dur time.Duratio
// called externally to the queue. bq.mu.Lock must not be held
// while calling this method.
//
// ctx should already be annotated by repl.AnnotateCtx().
// ctx should already be annotated by both bq.AnnotateCtx() and
// repl.AnnotateCtx().
func (bq *baseQueue) processReplica(ctx context.Context, repl replicaInQueue) error {
// Load the system config if it's needed.
var confReader spanconfig.StoreReader
Expand All @@ -913,7 +918,7 @@ func (bq *baseQueue) processReplica(ctx context.Context, repl replicaInQueue) er
return nil
}

ctx, span := bq.AnnotateCtxWithSpan(ctx, bq.name)
ctx, span := tracing.EnsureChildSpan(ctx, bq.Tracer, bq.processOpName())
defer span.Finish()
return contextutil.RunWithTimeout(ctx, fmt.Sprintf("%s queue process replica %d", bq.name, repl.GetRangeID()),
bq.processTimeoutFunc(bq.store.ClusterSettings(), repl), func(ctx context.Context) error {
Expand Down Expand Up @@ -1141,7 +1146,7 @@ func (bq *baseQueue) addToPurgatoryLocked(
}

workerCtx := bq.AnnotateCtx(context.Background())
_ = stopper.RunAsyncTask(workerCtx, "purgatory", func(ctx context.Context) {
_ = stopper.RunAsyncTaskEx(workerCtx, stop.TaskOpts{TaskName: bq.name + ".purgatory", SpanOpt: stop.SterileRootSpan}, func(ctx context.Context) {
ticker := time.NewTicker(purgatoryReportInterval)
for {
select {
Expand Down Expand Up @@ -1172,8 +1177,7 @@ func (bq *baseQueue) addToPurgatoryLocked(
}
annotatedCtx := repl.AnnotateCtx(ctx)
if stopper.RunTask(
annotatedCtx, fmt.Sprintf("storage.%s: purgatory processing replica", bq.name),
func(ctx context.Context) {
annotatedCtx, bq.processOpName(), func(ctx context.Context) {
err := bq.processReplica(ctx, repl)
bq.finishProcessingReplica(ctx, stopper, repl, err)
}) != nil {
Expand Down Expand Up @@ -1302,10 +1306,14 @@ func (bq *baseQueue) DrainQueue(stopper *stop.Stopper) {
// time it returns.
defer bq.lockProcessing()()

ctx := bq.AnnotateCtx(context.TODO())
ctx := bq.AnnotateCtx(context.Background())
for repl := bq.pop(); repl != nil; repl = bq.pop() {
annotatedCtx := repl.AnnotateCtx(ctx)
err := bq.processReplica(annotatedCtx, repl)
bq.finishProcessingReplica(annotatedCtx, stopper, repl, err)
}
}

func (bq *baseQueue) processOpName() string {
return fmt.Sprintf("queue.%s: processing replica", bq.name)
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
// later block on this lease acquisition too, and we can't include the
// acquisition's trace in all of them, but let's at least include it in
// the request that triggered it.
ChildSpan: true,
SpanOpt: stop.ChildSpan,
},
func(ctx context.Context) {
defer sp.Finish()
Expand Down
18 changes: 16 additions & 2 deletions pkg/kv/kvserver/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,13 +196,27 @@ func (s *raftScheduler) Start(ctx context.Context, stopper *stop.Stopper) {
s.mu.Unlock()
s.mu.cond.Broadcast()
}
if err := stopper.RunAsyncTask(ctx, "raftsched-wait-quiesce", waitQuiesce); err != nil {
if err := stopper.RunAsyncTaskEx(ctx,
stop.TaskOpts{
TaskName: "raftsched-wait-quiesce",
// This task doesn't reference a parent because it runs for the server's
// lifetime.
SpanOpt: stop.SterileRootSpan,
},
waitQuiesce); err != nil {
waitQuiesce(ctx)
}

s.done.Add(s.numWorkers)
for i := 0; i < s.numWorkers; i++ {
if err := stopper.RunAsyncTask(ctx, "raft-worker", s.worker); err != nil {
if err := stopper.RunAsyncTaskEx(ctx,
stop.TaskOpts{
TaskName: "raft-worker",
// This task doesn't reference a parent because it runs for the server's
// lifetime.
SpanOpt: stop.SterileRootSpan,
},
s.worker); err != nil {
s.done.Done()
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/diagnostics/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ type Reporter struct {
// PeriodicallyReportDiagnostics starts a background worker that periodically
// phones home to report usage and diagnostics.
func (r *Reporter) PeriodicallyReportDiagnostics(ctx context.Context, stopper *stop.Stopper) {
_ = stopper.RunAsyncTask(ctx, "diagnostics", func(ctx context.Context) {
_ = stopper.RunAsyncTaskEx(ctx, stop.TaskOpts{TaskName: "diagnostics", SpanOpt: stop.SterileRootSpan}, func(ctx context.Context) {
defer logcrash.RecoverAndReportNonfatalPanic(ctx, &r.Settings.SV)
nextReport := r.StartTime

Expand Down
5 changes: 4 additions & 1 deletion pkg/server/diagnostics/update_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ type UpdateChecker struct {
// PeriodicallyCheckForUpdates starts a background worker that periodically
// phones home to check for updates.
func (u *UpdateChecker) PeriodicallyCheckForUpdates(ctx context.Context, stopper *stop.Stopper) {
_ = stopper.RunAsyncTask(ctx, "update-checker", func(ctx context.Context) {
_ = stopper.RunAsyncTaskEx(ctx, stop.TaskOpts{
TaskName: "update-checker",
SpanOpt: stop.SterileRootSpan,
}, func(ctx context.Context) {
defer logcrash.RecoverAndReportNonfatalPanic(ctx, &u.Settings.SV)
nextUpdateCheck := u.StartTime

Expand Down
Loading