diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 60b462d455c8..9f98c5f76a82 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -1415,7 +1415,7 @@ func (ds *DistSender) sendPartialBatchAsync( ctx, stop.TaskOpts{ TaskName: "kv.DistSender: sending partial batch", - ChildSpan: true, + SpanOpt: stop.ChildSpan, Sem: ds.asyncSenderSem, WaitForSem: false, }, diff --git a/pkg/kv/kvprober/kvprober.go b/pkg/kv/kvprober/kvprober.go index 86fb4913ca26..0a578bd3013a 100644 --- a/pkg/kv/kvprober/kvprober.go +++ b/pkg/kv/kvprober/kvprober.go @@ -170,7 +170,7 @@ func (p *Prober) Metrics() Metrics { func (p *Prober) Start(ctx context.Context, stopper *stop.Stopper) error { ctx = logtags.AddTag(ctx, "kvprober", nil /* value */) startLoop := func(ctx context.Context, opName string, probe func(context.Context, *kv.DB, planner), pl planner, interval *settings.DurationSetting) error { - return stopper.RunAsyncTask(ctx, opName, func(ctx context.Context) { + return stopper.RunAsyncTaskEx(ctx, stop.TaskOpts{TaskName: opName, SpanOpt: stop.SterileRootSpan}, func(ctx context.Context) { defer logcrash.RecoverAndReportNonfatalPanic(ctx, &p.settings.SV) rnd, _ /* seed */ := randutil.NewPseudoRand() diff --git a/pkg/kv/kvserver/liveness/liveness.go b/pkg/kv/kvserver/liveness/liveness.go index e193bca99565..ddf4a3dfdf40 100644 --- a/pkg/kv/kvserver/liveness/liveness.go +++ b/pkg/kv/kvserver/liveness/liveness.go @@ -691,7 +691,7 @@ func (nl *NodeLiveness) Start(ctx context.Context, opts NodeLivenessStartOptions nl.mu.engines = opts.Engines nl.mu.Unlock() - _ = opts.Stopper.RunAsyncTask(ctx, "liveness-hb", func(context.Context) { + _ = opts.Stopper.RunAsyncTaskEx(ctx, stop.TaskOpts{TaskName: "liveness-hb", SpanOpt: stop.SterileRootSpan}, func(context.Context) { ambient := nl.ambientCtx ambient.AddLogTag("liveness-hb", nil) ctx, cancel := opts.Stopper.WithCancelOnQuiesce(context.Background()) diff --git a/pkg/kv/kvserver/queue.go b/pkg/kv/kvserver/queue.go index b9561bd94bf2..f3f8af37f731 100644 --- a/pkg/kv/kvserver/queue.go +++ b/pkg/kv/kvserver/queue.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" ) @@ -788,85 +789,88 @@ func (bq *baseQueue) MaybeRemove(rangeID roachpb.RangeID) { // stopper signals exit. func (bq *baseQueue) processLoop(stopper *stop.Stopper) { ctx := bq.AnnotateCtx(context.Background()) - stop := func() { + done := func() { bq.mu.Lock() bq.mu.stopped = true bq.mu.Unlock() } - if err := stopper.RunAsyncTask(ctx, "queue-loop", func(ctx context.Context) { - defer stop() + if err := stopper.RunAsyncTaskEx(ctx, + stop.TaskOpts{TaskName: "queue-loop", SpanOpt: stop.SterileRootSpan}, + func(ctx context.Context) { + defer done() - // nextTime is initially nil; we don't start any timers until the queue - // becomes non-empty. - var nextTime <-chan time.Time + // nextTime is initially nil; we don't start any timers until the queue + // becomes non-empty. + var nextTime <-chan time.Time - immediately := make(chan time.Time) - close(immediately) + immediately := make(chan time.Time) + close(immediately) - for { - select { - // Exit on stopper. - case <-stopper.ShouldQuiesce(): - return + for { + select { + // Exit on stopper. + case <-stopper.ShouldQuiesce(): + return - // Incoming signal sets the next time to process if there were previously - // no replicas in the queue. - case <-bq.incoming: - if nextTime == nil { - // When a replica is added, wake up immediately. This is mainly - // to facilitate testing without unnecessary sleeps. - nextTime = immediately + // Incoming signal sets the next time to process if there were previously + // no replicas in the queue. + case <-bq.incoming: + if nextTime == nil { + // When a replica is added, wake up immediately. This is mainly + // to facilitate testing without unnecessary sleeps. + nextTime = immediately - // In case we're in a test, still block on the impl. - bq.impl.timer(0) - } - // Process replicas as the timer expires. - case <-nextTime: - // Acquire from the process semaphore. - bq.processSem <- struct{}{} - - repl := bq.pop() - if repl != nil { - annotatedCtx := repl.AnnotateCtx(ctx) - if stopper.RunAsyncTask( - annotatedCtx, fmt.Sprintf("storage.%s: processing replica", bq.name), - func(ctx context.Context) { - // Release semaphore when finished processing. - defer func() { <-bq.processSem }() - - start := timeutil.Now() - err := bq.processReplica(ctx, repl) - - duration := timeutil.Since(start) - bq.recordProcessDuration(ctx, duration) - - bq.finishProcessingReplica(ctx, stopper, repl, err) - }) != nil { - // Release semaphore on task failure. + // In case we're in a test, still block on the impl. + bq.impl.timer(0) + } + // Process replicas as the timer expires. + case <-nextTime: + // Acquire from the process semaphore. + bq.processSem <- struct{}{} + + repl := bq.pop() + if repl != nil { + annotatedCtx := repl.AnnotateCtx(ctx) + if stopper.RunAsyncTaskEx(annotatedCtx, stop.TaskOpts{ + TaskName: bq.processOpName() + " [outer]", + }, + func(ctx context.Context) { + // Release semaphore when finished processing. + defer func() { <-bq.processSem }() + + start := timeutil.Now() + err := bq.processReplica(ctx, repl) + + duration := timeutil.Since(start) + bq.recordProcessDuration(ctx, duration) + + bq.finishProcessingReplica(ctx, stopper, repl, err) + }) != nil { + // Release semaphore on task failure. + <-bq.processSem + return + } + } else { + // Release semaphore if no replicas were available. <-bq.processSem - return } - } else { - // Release semaphore if no replicas were available. - <-bq.processSem - } - if bq.Length() == 0 { - nextTime = nil - } else { - // lastDur will be 0 after the first processing attempt. - lastDur := bq.lastProcessDuration() - switch t := bq.impl.timer(lastDur); t { - case 0: - nextTime = immediately - default: - nextTime = time.After(t) + if bq.Length() == 0 { + nextTime = nil + } else { + // lastDur will be 0 after the first processing attempt. + lastDur := bq.lastProcessDuration() + switch t := bq.impl.timer(lastDur); t { + case 0: + nextTime = immediately + default: + nextTime = time.After(t) + } } } } - } - }); err != nil { - stop() + }); err != nil { + done() } } @@ -888,7 +892,8 @@ func (bq *baseQueue) recordProcessDuration(ctx context.Context, dur time.Duratio // called externally to the queue. bq.mu.Lock must not be held // while calling this method. // -// ctx should already be annotated by repl.AnnotateCtx(). +// ctx should already be annotated by both bq.AnnotateCtx() and +// repl.AnnotateCtx(). func (bq *baseQueue) processReplica(ctx context.Context, repl replicaInQueue) error { // Load the system config if it's needed. var confReader spanconfig.StoreReader @@ -913,7 +918,7 @@ func (bq *baseQueue) processReplica(ctx context.Context, repl replicaInQueue) er return nil } - ctx, span := bq.AnnotateCtxWithSpan(ctx, bq.name) + ctx, span := tracing.EnsureChildSpan(ctx, bq.Tracer, bq.processOpName()) defer span.Finish() return contextutil.RunWithTimeout(ctx, fmt.Sprintf("%s queue process replica %d", bq.name, repl.GetRangeID()), bq.processTimeoutFunc(bq.store.ClusterSettings(), repl), func(ctx context.Context) error { @@ -1141,7 +1146,7 @@ func (bq *baseQueue) addToPurgatoryLocked( } workerCtx := bq.AnnotateCtx(context.Background()) - _ = stopper.RunAsyncTask(workerCtx, "purgatory", func(ctx context.Context) { + _ = stopper.RunAsyncTaskEx(workerCtx, stop.TaskOpts{TaskName: bq.name + ".purgatory", SpanOpt: stop.SterileRootSpan}, func(ctx context.Context) { ticker := time.NewTicker(purgatoryReportInterval) for { select { @@ -1172,8 +1177,7 @@ func (bq *baseQueue) addToPurgatoryLocked( } annotatedCtx := repl.AnnotateCtx(ctx) if stopper.RunTask( - annotatedCtx, fmt.Sprintf("storage.%s: purgatory processing replica", bq.name), - func(ctx context.Context) { + annotatedCtx, bq.processOpName(), func(ctx context.Context) { err := bq.processReplica(ctx, repl) bq.finishProcessingReplica(ctx, stopper, repl, err) }) != nil { @@ -1302,10 +1306,14 @@ func (bq *baseQueue) DrainQueue(stopper *stop.Stopper) { // time it returns. defer bq.lockProcessing()() - ctx := bq.AnnotateCtx(context.TODO()) + ctx := bq.AnnotateCtx(context.Background()) for repl := bq.pop(); repl != nil; repl = bq.pop() { annotatedCtx := repl.AnnotateCtx(ctx) err := bq.processReplica(annotatedCtx, repl) bq.finishProcessingReplica(annotatedCtx, stopper, repl, err) } } + +func (bq *baseQueue) processOpName() string { + return fmt.Sprintf("queue.%s: processing replica", bq.name) +} diff --git a/pkg/kv/kvserver/replica_range_lease.go b/pkg/kv/kvserver/replica_range_lease.go index cf36c53aaf91..aa869c4b34c5 100644 --- a/pkg/kv/kvserver/replica_range_lease.go +++ b/pkg/kv/kvserver/replica_range_lease.go @@ -356,7 +356,7 @@ func (p *pendingLeaseRequest) requestLeaseAsync( // later block on this lease acquisition too, and we can't include the // acquisition's trace in all of them, but let's at least include it in // the request that triggered it. - ChildSpan: true, + SpanOpt: stop.ChildSpan, }, func(ctx context.Context) { defer sp.Finish() diff --git a/pkg/kv/kvserver/scheduler.go b/pkg/kv/kvserver/scheduler.go index b5748ce42dfb..761b18f0a9d6 100644 --- a/pkg/kv/kvserver/scheduler.go +++ b/pkg/kv/kvserver/scheduler.go @@ -196,13 +196,27 @@ func (s *raftScheduler) Start(ctx context.Context, stopper *stop.Stopper) { s.mu.Unlock() s.mu.cond.Broadcast() } - if err := stopper.RunAsyncTask(ctx, "raftsched-wait-quiesce", waitQuiesce); err != nil { + if err := stopper.RunAsyncTaskEx(ctx, + stop.TaskOpts{ + TaskName: "raftsched-wait-quiesce", + // This task doesn't reference a parent because it runs for the server's + // lifetime. + SpanOpt: stop.SterileRootSpan, + }, + waitQuiesce); err != nil { waitQuiesce(ctx) } s.done.Add(s.numWorkers) for i := 0; i < s.numWorkers; i++ { - if err := stopper.RunAsyncTask(ctx, "raft-worker", s.worker); err != nil { + if err := stopper.RunAsyncTaskEx(ctx, + stop.TaskOpts{ + TaskName: "raft-worker", + // This task doesn't reference a parent because it runs for the server's + // lifetime. + SpanOpt: stop.SterileRootSpan, + }, + s.worker); err != nil { s.done.Done() } } diff --git a/pkg/server/diagnostics/reporter.go b/pkg/server/diagnostics/reporter.go index 5dda40efbd63..7aaf4cc577d8 100644 --- a/pkg/server/diagnostics/reporter.go +++ b/pkg/server/diagnostics/reporter.go @@ -93,7 +93,7 @@ type Reporter struct { // PeriodicallyReportDiagnostics starts a background worker that periodically // phones home to report usage and diagnostics. func (r *Reporter) PeriodicallyReportDiagnostics(ctx context.Context, stopper *stop.Stopper) { - _ = stopper.RunAsyncTask(ctx, "diagnostics", func(ctx context.Context) { + _ = stopper.RunAsyncTaskEx(ctx, stop.TaskOpts{TaskName: "diagnostics", SpanOpt: stop.SterileRootSpan}, func(ctx context.Context) { defer logcrash.RecoverAndReportNonfatalPanic(ctx, &r.Settings.SV) nextReport := r.StartTime diff --git a/pkg/server/diagnostics/update_checker.go b/pkg/server/diagnostics/update_checker.go index 4c481b664d24..3dac9fce38ee 100644 --- a/pkg/server/diagnostics/update_checker.go +++ b/pkg/server/diagnostics/update_checker.go @@ -72,7 +72,10 @@ type UpdateChecker struct { // PeriodicallyCheckForUpdates starts a background worker that periodically // phones home to check for updates. func (u *UpdateChecker) PeriodicallyCheckForUpdates(ctx context.Context, stopper *stop.Stopper) { - _ = stopper.RunAsyncTask(ctx, "update-checker", func(ctx context.Context) { + _ = stopper.RunAsyncTaskEx(ctx, stop.TaskOpts{ + TaskName: "update-checker", + SpanOpt: stop.SterileRootSpan, + }, func(ctx context.Context) { defer logcrash.RecoverAndReportNonfatalPanic(ctx, &u.Settings.SV) nextUpdateCheck := u.StartTime diff --git a/pkg/server/server.go b/pkg/server/server.go index 2c5b7bed45bc..ec69ce822563 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -268,7 +268,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { clock = hlc.NewClock(hlc.UnixNano, time.Duration(cfg.MaxOffset)) } registry := metric.NewRegistry() - // If the tracer has a Close function, call it after the server stops. + stopper.SetTracer(cfg.AmbientCtx.Tracer) stopper.AddCloser(cfg.AmbientCtx.Tracer) // Add a dynamic log tag value for the node ID. @@ -2567,69 +2567,73 @@ func startSampleEnvironment(ctx context.Context, cfg sampleEnvironmentCfg) error } } - return cfg.stopper.RunAsyncTask(ctx, "mem-logger", func(ctx context.Context) { - var goMemStats atomic.Value // *status.GoMemStats - goMemStats.Store(&status.GoMemStats{}) - var collectingMemStats int32 // atomic, 1 when stats call is ongoing + return cfg.stopper.RunAsyncTaskEx(ctx, + stop.TaskOpts{TaskName: "mem-logger", SpanOpt: stop.SterileRootSpan}, + func(ctx context.Context) { + var goMemStats atomic.Value // *status.GoMemStats + goMemStats.Store(&status.GoMemStats{}) + var collectingMemStats int32 // atomic, 1 when stats call is ongoing - timer := timeutil.NewTimer() - defer timer.Stop() - timer.Reset(cfg.minSampleInterval) + timer := timeutil.NewTimer() + defer timer.Stop() + timer.Reset(cfg.minSampleInterval) - for { - select { - case <-cfg.stopper.ShouldQuiesce(): - return - case <-timer.C: - timer.Read = true - timer.Reset(cfg.minSampleInterval) - - // We read the heap stats on another goroutine and give up after 1s. - // This is necessary because as of Go 1.12, runtime.ReadMemStats() - // "stops the world" and that requires first waiting for any current GC - // run to finish. With a large heap and under extreme conditions, a - // single GC run may take longer than the default sampling period of - // 10s. Under normal operations and with more recent versions of Go, - // this hasn't been observed to be a problem. - statsCollected := make(chan struct{}) - if atomic.CompareAndSwapInt32(&collectingMemStats, 0, 1) { - if err := cfg.stopper.RunAsyncTask(ctx, "get-mem-stats", func(ctx context.Context) { - var ms status.GoMemStats - runtime.ReadMemStats(&ms.MemStats) - ms.Collected = timeutil.Now() - log.VEventf(ctx, 2, "memstats: %+v", ms) - - goMemStats.Store(&ms) - atomic.StoreInt32(&collectingMemStats, 0) - close(statsCollected) - }); err != nil { - close(statsCollected) + for { + select { + case <-cfg.stopper.ShouldQuiesce(): + return + case <-timer.C: + timer.Read = true + timer.Reset(cfg.minSampleInterval) + + // We read the heap stats on another goroutine and give up after 1s. + // This is necessary because as of Go 1.12, runtime.ReadMemStats() + // "stops the world" and that requires first waiting for any current GC + // run to finish. With a large heap and under extreme conditions, a + // single GC run may take longer than the default sampling period of + // 10s. Under normal operations and with more recent versions of Go, + // this hasn't been observed to be a problem. + statsCollected := make(chan struct{}) + if atomic.CompareAndSwapInt32(&collectingMemStats, 0, 1) { + if err := cfg.stopper.RunAsyncTaskEx(ctx, + stop.TaskOpts{TaskName: "get-mem-stats"}, + func(ctx context.Context) { + var ms status.GoMemStats + runtime.ReadMemStats(&ms.MemStats) + ms.Collected = timeutil.Now() + log.VEventf(ctx, 2, "memstats: %+v", ms) + + goMemStats.Store(&ms) + atomic.StoreInt32(&collectingMemStats, 0) + close(statsCollected) + }); err != nil { + close(statsCollected) + } } - } - select { - case <-statsCollected: - // Good; we managed to read the Go memory stats quickly enough. - case <-time.After(time.Second): - } + select { + case <-statsCollected: + // Good; we managed to read the Go memory stats quickly enough. + case <-time.After(time.Second): + } - curStats := goMemStats.Load().(*status.GoMemStats) - cgoStats := status.GetCGoMemStats(ctx) - cfg.runtime.SampleEnvironment(ctx, curStats, cgoStats) - if goroutineDumper != nil { - goroutineDumper.MaybeDump(ctx, cfg.st, cfg.runtime.Goroutines.Value()) - } - if heapProfiler != nil { - heapProfiler.MaybeTakeProfile(ctx, cfg.runtime.GoAllocBytes.Value()) - nonGoAllocProfiler.MaybeTakeProfile(ctx, cfg.runtime.CgoTotalBytes.Value()) - statsProfiler.MaybeTakeProfile(ctx, cfg.runtime.RSSBytes.Value(), curStats, cgoStats) - } - if queryProfiler != nil { - queryProfiler.MaybeDumpQueries(ctx, cfg.sessionRegistry, cfg.st) + curStats := goMemStats.Load().(*status.GoMemStats) + cgoStats := status.GetCGoMemStats(ctx) + cfg.runtime.SampleEnvironment(ctx, curStats, cgoStats) + if goroutineDumper != nil { + goroutineDumper.MaybeDump(ctx, cfg.st, cfg.runtime.Goroutines.Value()) + } + if heapProfiler != nil { + heapProfiler.MaybeTakeProfile(ctx, cfg.runtime.GoAllocBytes.Value()) + nonGoAllocProfiler.MaybeTakeProfile(ctx, cfg.runtime.CgoTotalBytes.Value()) + statsProfiler.MaybeTakeProfile(ctx, cfg.runtime.RSSBytes.Value(), curStats, cgoStats) + } + if queryProfiler != nil { + queryProfiler.MaybeDumpQueries(ctx, cfg.sessionRegistry, cfg.st) + } } } - } - }) + }) } // Stop stops the server. diff --git a/pkg/util/stop/stopper.go b/pkg/util/stop/stopper.go index f19819d95d46..fa588fea6c8f 100644 --- a/pkg/util/stop/stopper.go +++ b/pkg/util/stop/stopper.go @@ -158,6 +158,7 @@ type Stopper struct { quiescer chan struct{} // Closed when quiescing stopped chan struct{} // Closed when stopped completely onPanic func(interface{}) // called with recover() on panic on any goroutine + tracer *tracing.Tracer // tracer used to create spans for tasks mu struct { syncutil.RWMutex @@ -187,6 +188,8 @@ type Option interface { type optionPanicHandler func(interface{}) +var _ Option = optionPanicHandler(nil) + func (oph optionPanicHandler) apply(stopper *Stopper) { stopper.onPanic = oph } @@ -200,6 +203,23 @@ func OnPanic(handler func(interface{})) Option { return optionPanicHandler(handler) } +type withTracer struct { + tr *tracing.Tracer +} + +var _ Option = withTracer{} + +func (o withTracer) apply(stopper *Stopper) { + stopper.tracer = o.tr +} + +// WithTracer is an option for NewStopper() supplying the Tracer to use for +// creating spans for tasks. Note that for tasks asking for a child span, the +// parent's tracer is used instead of this one. +func WithTracer(t *tracing.Tracer) Option { + return withTracer{t} +} + // NewStopper returns an instance of Stopper. func NewStopper(options ...Option) *Stopper { s := &Stopper{ @@ -210,6 +230,9 @@ func NewStopper(options ...Option) *Stopper { for _, opt := range options { opt.apply(s) } + if s.tracer == nil { + s.tracer = tracing.NewTracer() + } register(s) return s @@ -334,37 +357,56 @@ func (s *Stopper) RunAsyncTask( return s.RunAsyncTaskEx(ctx, TaskOpts{ TaskName: taskName, - ChildSpan: false, + SpanOpt: FollowsFromSpan, Sem: nil, WaitForSem: false, }, f) } +// SpanOption specifies the type of tracing span that a task will run in. +type SpanOption int + +const ( + // FollowsFromSpan makes the task run in a span that's not included in the + // caller's recording (if any). For external tracers, the task's span will + // still reference the caller's span through a FollowsFrom relationship. If + // the caller doesn't have a span, then the task will execute in a root span. + // + // Use this when the caller will not wait for the task to finish, but a + // relationship between the caller and the task might still be useful to + // visualize in a trace collector. + FollowsFromSpan SpanOption = iota + + // ChildSpan makes the task run in a span that's a child of the caller's span + // (if any). The child is included in the parent's recording. For external + // tracers, the child references the parent through a ChildOf relationship. + // If the caller doesn't have a span, then the task will execute in a root + // span. + // + // ChildSpan has consequences on memory usage: the memory lifetime of + // the task's span becomes tied to the lifetime of the parent. Generally + // ChildSpan should be used when the parent waits for the task to + // complete, and the parent is not a long-running process. + ChildSpan + + // SterileRootSpan makes the task run in a root span that doesn't get any + // children. Anybody trying to create a child of the task's span will get a + // root span. This is suitable for long-running tasks: connecting children to + // these tasks would lead to infinitely-long traces, and connecting the + // long-running task to its parent is also problematic because of the + // different lifetimes. + SterileRootSpan +) + // TaskOpts groups the task execution options for RunAsyncTaskEx. type TaskOpts struct { // TaskName is a human-readable name for the operation. Used as the name of // the tracing span. TaskName string - // ChildSpan, if set, creates the tracing span for the task via - // tracing.ChildSpan() instead of tracing.ForkSpan. This makes the task's span - // be part of the parent span's recording (it is created with the - // WithParentAndAutoCollection option instead of - // WithParentAndManualCollection). It also leads to a ChildOf relationship - // instead of a FollowsFrom relationship to be used for the task's span, which - // typically implies a non-binding expectation that the parent span will - // outlive the task's span, i.e. that the parent will wait for the task to - // complete. - // - // Regardless of this setting, if the caller doesn't have a span, the task - // doesn't get a span either. - // - // Setting ChildSpan can have consequences on memory usage. The lifetime of - // the task's span becomes tied to the lifetime of the parent. Generally - // ChildSpan should be used when the parent waits for the task to complete, - // and the parent is not a long-running process. - ChildSpan bool + // SpanOpt controls the kind of span that the task will run in. + SpanOpt SpanOption // If set, Sem is used as a semaphore limiting the concurrency (each task has // weight 1). @@ -422,11 +464,16 @@ func (s *Stopper) RunAsyncTaskEx(ctx context.Context, opt TaskOpts, f func(conte } // If the caller has a span, the task gets a child span. - var span *tracing.Span - if opt.ChildSpan { - ctx, span = tracing.ChildSpan(ctx, opt.TaskName) - } else { - ctx, span = tracing.ForkSpan(ctx, opt.TaskName) + var sp *tracing.Span + switch opt.SpanOpt { + case FollowsFromSpan: + ctx, sp = tracing.EnsureForkSpan(ctx, s.tracer, opt.TaskName) + case ChildSpan: + ctx, sp = tracing.EnsureChildSpan(ctx, s.tracer, opt.TaskName) + case SterileRootSpan: + ctx, sp = s.tracer.StartSpanCtx(ctx, opt.TaskName, tracing.WithSterile()) + default: + panic(fmt.Sprintf("unsupported SpanOption: %v", opt.SpanOpt)) } // Call f on another goroutine. @@ -434,7 +481,9 @@ func (s *Stopper) RunAsyncTaskEx(ctx context.Context, opt TaskOpts, f func(conte go func() { defer s.Recover(ctx) defer s.runPostlude() - defer span.Finish() + if sp != nil { + defer sp.Finish() + } if alloc != nil { defer alloc.Release() } @@ -560,3 +609,14 @@ func (s *Stopper) Quiesce(ctx context.Context) { time.Sleep(5 * time.Millisecond) } } + +// SetTracer sets the tracer to be used for task spans. This cannot be called +// concurrently with starting tasks. +// +// Note that for tasks asking for a child span, the parent's tracer is used +// instead of this one. +// +// When possible, prefer supplying the tracer to the ctor through WithTracer. +func (s *Stopper) SetTracer(tr *tracing.Tracer) { + s.tracer = tr +} diff --git a/pkg/util/stop/stopper_test.go b/pkg/util/stop/stopper_test.go index 541620e1a39d..10d56384d02a 100644 --- a/pkg/util/stop/stopper_test.go +++ b/pkg/util/stop/stopper_test.go @@ -687,30 +687,62 @@ func (cf closerFunc) Close() { cf() } // the ChildSpan option. func TestStopperRunAsyncTaskTracing(t *testing.T) { defer leaktest.AfterTest(t)() - s := stop.NewStopper() + tr := tracing.NewTracerWithOpt(context.Background(), tracing.WithTestingKnobs( + tracing.TracerTestingKnobs{ + // We want the tracer to generate real spans so that we can test that the + // RootSpan option produces a root span. + ForceRealSpans: true, + })) + s := stop.NewStopper(stop.WithTracer(tr)) ctx, getRecording, finish := tracing.ContextWithRecordingSpan( - context.Background(), tracing.NewTracer(), "parent") + context.Background(), tr, "parent") + root := tracing.SpanFromContext(ctx) + require.NotNil(t, root) + traceID := root.TraceID() // Start two child tasks. Only the one with ChildSpan:true is expected to be // present in the parent's recording. require.NoError(t, s.RunAsyncTaskEx(ctx, stop.TaskOpts{ - TaskName: "async child", - ChildSpan: false, + TaskName: "async child different recording", + SpanOpt: stop.FollowsFromSpan, }, func(ctx context.Context) { log.Event(ctx, "async 1") }, )) require.NoError(t, s.RunAsyncTaskEx(ctx, stop.TaskOpts{ - TaskName: "async child same trace", - ChildSpan: true, + TaskName: "async child same trace", + SpanOpt: stop.ChildSpan, }, func(ctx context.Context) { log.Event(ctx, "async 2") }, )) + { + errC := make(chan error) + require.NoError(t, s.RunAsyncTaskEx(ctx, stop.TaskOpts{ + TaskName: "sterile parent", + SpanOpt: stop.SterileRootSpan, + }, + func(ctx context.Context) { + log.Event(ctx, "async 3") + sp := tracing.SpanFromContext(ctx) + if sp == nil { + errC <- errors.Errorf("missing span") + return + } + sp = tr.StartSpan("child", tracing.WithParentAndAutoCollection(sp)) + if sp.TraceID() == traceID { + errC <- errors.Errorf("expected different trace") + } + close(errC) + }, + )) + require.NoError(t, <-errC) + } + s.Stop(ctx) finish() require.NoError(t, tracing.CheckRecordedSpans(getRecording(), ` @@ -718,3 +750,20 @@ func TestStopperRunAsyncTaskTracing(t *testing.T) { span: async child same trace event: async 2`)) } + +// Test that RunAsyncTask creates root spans when the caller doesn't have a +// span. +func TestStopperRunAsyncTaskCreatesRootSpans(t *testing.T) { + defer leaktest.AfterTest(t)() + tr := tracing.NewTracer() + ctx := context.Background() + s := stop.NewStopper(stop.WithTracer(tr)) + defer s.Stop(ctx) + c := make(chan *tracing.Span) + require.NoError(t, s.RunAsyncTask(ctx, "test", + func(ctx context.Context) { + c <- tracing.SpanFromContext(ctx) + }, + )) + require.NotNil(t, <-c) +} diff --git a/pkg/util/tracing/crdbspan.go b/pkg/util/tracing/crdbspan.go index 368b6dbc7dfe..209a1505b4ac 100644 --- a/pkg/util/tracing/crdbspan.go +++ b/pkg/util/tracing/crdbspan.go @@ -287,7 +287,7 @@ func (s *crdbSpan) record(msg redact.RedactableString) { var now time.Time if clock := s.testing.Clock; clock != nil { - now = s.testing.Clock.Now() + now = clock.Now() } else { now = time.Now() } diff --git a/pkg/util/tracing/span.go b/pkg/util/tracing/span.go index bf575212dde4..5a11a28cab50 100644 --- a/pkg/util/tracing/span.go +++ b/pkg/util/tracing/span.go @@ -223,6 +223,13 @@ func (sp *Span) TraceID() uint64 { return sp.i.TraceID() } +// IsSterile returns true if this span does not want to have children spans. In +// that case, trying to create a child span will result in the would-be child +// being a root span. +func (sp *Span) IsSterile() bool { + return sp.i.sterile +} + // SpanMeta is information about a Span that is not local to this // process. Typically, SpanMeta is populated from information // about a Span on the other end of an RPC, and is used to derive @@ -252,6 +259,18 @@ type SpanMeta struct { // The Span's associated baggage. Baggage map[string]string + + // sterile is set if this span does not want to have children spans. In that + // case, trying to create a child span will result in the would-be child being + // a root span. This is useful for span corresponding to long-running + // operations that don't want to be associated with derived operations. + // + // Note that this field is unlike all the others in that it doesn't make it + // across the wire through a carrier. As can be seen in + // Tracer.InjectMetaInto(carrier), if sterile is set, then we don't propagate + // any info about the span in order to not have a child be created on the + // other side. Similarly, ExtractMetaFrom does not deserialize this field. + sterile bool } // Empty returns whether or not the SpanMeta is a zero value. diff --git a/pkg/util/tracing/span_inner.go b/pkg/util/tracing/span_inner.go index a1730751b9cb..91bd6e7ad1c0 100644 --- a/pkg/util/tracing/span_inner.go +++ b/pkg/util/tracing/span_inner.go @@ -35,6 +35,12 @@ type spanInner struct { // otelSpan is the "shadow span" created for reporting to the OpenTelemetry // tracer (if an otel tracer was configured). otelSpan oteltrace.Span + + // sterile is set if this span does not want to have children spans. In that + // case, trying to create a child span will result in the would-be child being + // a root span. This is useful for span corresponding to long-running + // operations that don't want to be associated with derived operations. + sterile bool } func (s *spanInner) TraceID() uint64 { @@ -48,6 +54,10 @@ func (s *spanInner) isNoop() bool { return s.crdb == nil && s.netTr == nil && s.otelSpan == nil } +func (s *spanInner) isSterile() bool { + return s.sterile +} + func (s *spanInner) IsVerbose() bool { return s.crdb.recordingType() == RecordingVerbose } @@ -136,6 +146,7 @@ func (s *spanInner) Meta() SpanMeta { var spanID uint64 var recordingType RecordingType var baggage map[string]string + var sterile bool if s.crdb != nil { traceID, spanID = s.crdb.traceID, s.crdb.spanID @@ -150,6 +161,7 @@ func (s *spanInner) Meta() SpanMeta { baggage[k] = v } recordingType = s.crdb.mu.recording.recordingType.load() + sterile = s.isSterile() } var otelCtx oteltrace.SpanContext @@ -161,7 +173,8 @@ func (s *spanInner) Meta() SpanMeta { spanID == 0 && !otelCtx.TraceID().IsValid() && recordingType == 0 && - baggage == nil { + baggage == nil && + !sterile { return SpanMeta{} } return SpanMeta{ @@ -170,6 +183,7 @@ func (s *spanInner) Meta() SpanMeta { otelCtx: otelCtx, recordingType: recordingType, Baggage: baggage, + sterile: sterile, } } diff --git a/pkg/util/tracing/span_options.go b/pkg/util/tracing/span_options.go index 081281e56bee..bbc3f9b900a1 100644 --- a/pkg/util/tracing/span_options.go +++ b/pkg/util/tracing/span_options.go @@ -49,7 +49,8 @@ type spanOptions struct { LogTags *logtags.Buffer // see WithLogTags Tags map[string]interface{} // see WithTags ForceRealSpan bool // see WithForceRealSpan - SpanKind oteltrace.SpanKind + SpanKind oteltrace.SpanKind // see WithSpanKind + Sterile bool // see WithSterile } func (opts *spanOptions) parentTraceID() uint64 { @@ -120,9 +121,16 @@ type parentAndAutoCollectionOption Span // WithParentAndAutoCollection instructs StartSpan to create a child Span // from a parent Span. // -// WithParentAndAutoCollection can be called with a nil `sp`, in which case -// it'll be a no-op. It can also be called with a "no-op span", in which case -// the option will also be a no-op (i.e. the upcoming span will be a root). +// WithParentAndAutoCollection will be a no-op (i.e. the span resulting from +// applying this option will be a root span, just as if this option hadn't been +// specified) in the following cases: +// - if `sp` is nil +// - if `sp` is a no-op span +// - if `sp` is a sterile span (i.e. a span explicitly marked as not wanting +// children). Note that the singleton Tracer.noop span is marked as sterile, +// which makes this condition mostly encompass the previous one, however in +// theory there could be no-op spans other than the singleton one. +// // // The child inherits the parent's log tags. The data collected in the // child trace will be retrieved automatically when the parent's data is @@ -144,7 +152,7 @@ type parentAndAutoCollectionOption Span // WithParentAndManualCollection should be used, which incurs an // obligation to manually propagate the trace data to the parent Span. func WithParentAndAutoCollection(sp *Span) SpanOption { - if sp == nil || sp.IsNoop() { + if sp == nil || sp.IsNoop() || sp.IsSterile() { return (*parentAndAutoCollectionOption)(nil) } return (*parentAndAutoCollectionOption)(sp) @@ -180,6 +188,9 @@ type parentAndManualCollectionOption SpanMeta // wait for the child to Finish(). If this expectation does not hold, // WithFollowsFrom should be added to the StartSpan invocation. func WithParentAndManualCollection(parent SpanMeta) SpanOption { + if parent.sterile { + return parentAndManualCollectionOption{} + } return (parentAndManualCollectionOption)(parent) } @@ -267,3 +278,16 @@ var WithServerSpanKind = WithSpanKind(oteltrace.SpanKindServer) // WithClientSpanKind is a shorthand for server spans, frequently saving // allocations. var WithClientSpanKind = WithSpanKind(oteltrace.SpanKindClient) + +type withSterileOption struct{} + +// WithSterile configures the span to not permit any child spans. The would-be +// children of a sterile span end up being root spans. +func WithSterile() SpanOption { + return withSterileOption{} +} + +func (w withSterileOption) apply(opts spanOptions) spanOptions { + opts.Sterile = true + return opts +} diff --git a/pkg/util/tracing/tracer.go b/pkg/util/tracing/tracer.go index aa5a341031de..3f7aa6a4d290 100644 --- a/pkg/util/tracing/tracer.go +++ b/pkg/util/tracing/tracer.go @@ -211,7 +211,7 @@ func NewTracer() *Tracer { t.activeSpans.m = make(map[uint64]*Span) // The noop span is marked as finished so that even in the case of a bug, // it won't soak up data. - t.noopSpan = &Span{numFinishCalled: 1, i: spanInner{tracer: t}} + t.noopSpan = &Span{numFinishCalled: 1, i: spanInner{tracer: t, sterile: true}} return t } @@ -511,6 +511,11 @@ func (t *Tracer) startSpanGeneric( // WithParentAndAutoCollection option. panic("invalid no-op parent") } + if opts.Parent.IsSterile() { + // A sterile parent should have been optimized away by + // WithParentAndAutoCollection. + panic("invalid sterile parent") + } } // Are we tracing everything, or have a parent, or want a real span? Then @@ -611,6 +616,7 @@ func (t *Tracer) startSpanGeneric( crdb: &helper.crdbSpan, otelSpan: otelSpan, netTr: netTr, + sterile: opts.Sterile, } // Copy over the parent span's root span reference, and if there isn't one @@ -724,6 +730,12 @@ func (t *Tracer) InjectMetaInto(sm SpanMeta, carrier Carrier) error { // empty map as a noop context. return nil } + // If the span has been marked as not wanting children, we don't propagate any + // information about it through the carrier (the point of propagating span + // info is to create a child from it). + if sm.sterile { + return nil + } carrier.Set(fieldNameTraceID, strconv.FormatUint(sm.traceID, 16)) carrier.Set(fieldNameSpanID, strconv.FormatUint(sm.spanID, 16)) @@ -823,6 +835,10 @@ func (t *Tracer) ExtractMetaFrom(carrier Carrier) (SpanMeta, error) { otelCtx: otelCtx, recordingType: recordingType, Baggage: baggage, + // The sterile field doesn't make it across the wire. The simple fact that + // there was any tracing info in the carrier means that the parent span was + // not sterile. + sterile: false, }, nil } @@ -904,6 +920,26 @@ func ForkSpan(ctx context.Context, opName string) (context.Context, *Span) { return sp.Tracer().StartSpanCtx(ctx, opName, WithFollowsFrom(), collectionOpt) } +// EnsureForkSpan is like ForkSpan except that, if there is no span in ctx, it +// creates a root span. +func EnsureForkSpan(ctx context.Context, tr *Tracer, opName string) (context.Context, *Span) { + sp := SpanFromContext(ctx) + var opts []SpanOption + // If there's a span in ctx, we use it as a parent. + if sp != nil { + tr = sp.Tracer() + if !tr.ShouldRecordAsyncSpans() { + opts = append(opts, WithParentAndManualCollection(sp.Meta())) + } else { + // Using auto collection here ensures that recordings from async spans + // also show up at the parent. + opts = append(opts, WithParentAndAutoCollection(sp)) + } + opts = append(opts, WithFollowsFrom()) + } + return tr.StartSpanCtx(ctx, opName, opts...) +} + // ChildSpan creates a child span of the current one, if any. Recordings from // child spans are automatically propagated to the parent span, and the tags are // inherited from the context's log tags automatically. Also see `ForkSpan`, diff --git a/pkg/util/tracing/tracer_test.go b/pkg/util/tracing/tracer_test.go index 5f9ccfe51d6e..f5a377e96601 100644 --- a/pkg/util/tracing/tracer_test.go +++ b/pkg/util/tracing/tracer_test.go @@ -11,6 +11,7 @@ package tracing import ( + "context" "sort" "sync" "testing" @@ -222,6 +223,35 @@ func TestStartChildSpan(t *testing.T) { } } +func TestSterileSpan(t *testing.T) { + tr := NewTracerWithOpt(context.Background(), WithTestingKnobs(TracerTestingKnobs{ForceRealSpans: true})) + + // Check that a children of sterile spans are roots. + sp1 := tr.StartSpan("parent", WithSterile()) + // Make the span verbose so that we can use its recording below to assert that + // there were no children. + sp1.SetVerbose(true) + sp2 := tr.StartSpan("child", WithParentAndAutoCollection(sp1)) + require.Zero(t, sp2.i.crdb.parentSpanID) + + require.True(t, sp1.Meta().sterile) + require.False(t, sp2.Meta().sterile) + sp3 := tr.StartSpan("child", WithParentAndManualCollection(sp1.Meta())) + require.Zero(t, sp3.i.crdb.parentSpanID) + + sp2.Finish() + sp3.Finish() + require.NoError(t, CheckRecordedSpans(sp1.GetRecording(), ` + span: parent + tags: _unfinished=1 _verbose=1 + `)) + + // Check that the meta of a sterile span doesn't get injected into carriers. + carrier := metadataCarrier{metadata.MD{}} + require.NoError(t, tr.InjectMetaInto(sp1.Meta(), carrier)) + require.Len(t, carrier.MD, 0) +} + func TestTracerInjectExtract(t *testing.T) { tr := NewTracer() tr2 := NewTracer()