diff --git a/go.mod b/go.mod index 47f40199..54addadc 100644 --- a/go.mod +++ b/go.mod @@ -30,9 +30,17 @@ require ( github.com/zclconf/go-cty v1.13.2 github.com/zclconf/go-cty-debug v0.0.0-20191215020915-b22d67c1ba0b go.bobheadxi.dev/gobenchdata v1.3.1 + go.opentelemetry.io/otel/trace v1.16.0 golang.org/x/tools v0.11.0 ) +require ( + github.com/felixge/httpsnoop v1.0.3 // indirect + github.com/go-logr/logr v1.2.4 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + go.opentelemetry.io/otel/metric v1.16.0 // indirect +) + require ( github.com/Masterminds/goutils v1.1.1 // indirect github.com/Masterminds/semver/v3 v3.2.1 // indirect @@ -86,6 +94,9 @@ require ( github.com/stretchr/objx v0.5.0 // indirect github.com/subosito/gotenv v1.4.2 // indirect go.bobheadxi.dev/streamline v1.2.1 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.42.0 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 + go.opentelemetry.io/otel v1.16.0 golang.org/x/crypto v0.11.0 // indirect golang.org/x/mod v0.12.0 // indirect golang.org/x/net v0.12.0 // indirect diff --git a/go.sum b/go.sum index 528cea3f..cef7c4ef 100644 --- a/go.sum +++ b/go.sum @@ -102,6 +102,8 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7 github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs= github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw= +github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk= +github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY= github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= @@ -112,6 +114,11 @@ github.com/go-git/go-git/v5 v5.6.1 h1:q4ZRqQl4pR/ZJHc1L5CFjGA1a10u76aV1iC+nh+bHs github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= +github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-test/deep v1.0.3 h1:ZrJSEWsXzPOxaZnFteGEfooLba+ju3FYIbOrS+rQd68= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= @@ -362,6 +369,16 @@ go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= +go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.42.0 h1:0vzgiFDsCh/jxRCR1xcRrtMoeCu2itXz/PsXst5P8rI= +go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.42.0/go.mod h1:y0vOY2OKFMOTvwxKfurStPayUUKGHlNeVqNneHmFXr0= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0 h1:pginetY7+onl4qN1vl0xW/V/v6OBZ0vVdH+esuJgvmM= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.42.0/go.mod h1:XiYsayHc36K3EByOO6nbAXnAWbrUxdjUROCEeeROOH8= +go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s= +go.opentelemetry.io/otel v1.16.0/go.mod h1:vl0h9NUa1D5s1nv3A5vZOYWn8av4K8Ml6JDeHrT/bx4= +go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo= +go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4= +go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs= +go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= diff --git a/internal/cmd/inspect_module_command.go b/internal/cmd/inspect_module_command.go index 066b2ee7..918cc533 100644 --- a/internal/cmd/inspect_module_command.go +++ b/internal/cmd/inspect_module_command.go @@ -126,7 +126,7 @@ func (c *InspectModuleCommand) inspect(rootPath string) error { c.logger, os.Interrupt, syscall.SIGTERM) defer cancel() - err = ss.WalkerPaths.EnqueueDir(dir) + err = ss.WalkerPaths.EnqueueDir(ctx, dir) if err != nil { return err } diff --git a/internal/cmd/serve_command.go b/internal/cmd/serve_command.go index 2f601b01..e42c58df 100644 --- a/internal/cmd/serve_command.go +++ b/internal/cmd/serve_command.go @@ -21,6 +21,10 @@ import ( "github.com/hashicorp/terraform-ls/internal/logging" "github.com/hashicorp/terraform-ls/internal/pathtpl" "github.com/mitchellh/cli" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + semconv "go.opentelemetry.io/otel/semconv/v1.20.0" + "go.opentelemetry.io/otel/trace" ) type ServeCommand struct { @@ -108,6 +112,26 @@ func (c *ServeCommand) Run(args []string) int { ctx = algolia.WithCredentials(ctx, c.AlgoliaAppID, c.AlgoliaAPIKey) } + var err error + shutdownFunc := func(context.Context) error { return nil } + + // TODO: Currently unused until we decide how/where to export data + tp := trace.NewNoopTracerProvider() + otel.SetTracerProvider(tp) + + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed to init telemetry: %s", err)) + return 1 + } + defer func() { + ctx := context.Background() + err := shutdownFunc(ctx) + if err != nil { + logger.Printf("failed to shutdown telemetry: %s", err) + return + } + }() + srv := langserver.NewLangServer(ctx, handlers.NewSession) srv.SetLogger(logger) @@ -120,7 +144,7 @@ func (c *ServeCommand) Run(args []string) int { return 0 } - err := srv.StartAndWait(os.Stdin, os.Stdout) + err = srv.StartAndWait(os.Stdin, os.Stdout) if err != nil { c.Ui.Error(fmt.Sprintf("Failed to start server: %s", err)) return 1 @@ -129,6 +153,17 @@ func (c *ServeCommand) Run(args []string) int { return 0 } +func (c *ServeCommand) otelResourceAttributes() []attribute.KeyValue { + return []attribute.KeyValue{ + semconv.ServiceName("terraform-ls"), + semconv.ServiceVersion(c.Version), + attribute.Int("process.pid", os.Getpid()), + attribute.Int("runtime.NumCPU", runtime.NumCPU()), + attribute.Int("port", c.port), + attribute.Int("reqConcurrency", c.reqConcurrency), + } +} + type stopFunc func() error func writeCpuProfileInto(rawPath string) (stopFunc, error) { diff --git a/internal/indexer/document_change.go b/internal/indexer/document_change.go index e5adaef2..741c92fa 100644 --- a/internal/indexer/document_change.go +++ b/internal/indexer/document_change.go @@ -13,10 +13,10 @@ import ( op "github.com/hashicorp/terraform-ls/internal/terraform/module/operation" ) -func (idx *Indexer) DocumentChanged(modHandle document.DirHandle) (job.IDs, error) { +func (idx *Indexer) DocumentChanged(ctx context.Context, modHandle document.DirHandle) (job.IDs, error) { ids := make(job.IDs, 0) - parseId, err := idx.jobStore.EnqueueJob(job.Job{ + parseId, err := idx.jobStore.EnqueueJob(ctx, job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { return module.ParseModuleConfiguration(ctx, idx.fs, idx.modStore, modHandle.Path()) @@ -29,13 +29,13 @@ func (idx *Indexer) DocumentChanged(modHandle document.DirHandle) (job.IDs, erro } ids = append(ids, parseId) - modIds, err := idx.decodeModule(modHandle, job.IDs{parseId}, true) + modIds, err := idx.decodeModule(ctx, modHandle, job.IDs{parseId}, true) if err != nil { return ids, err } ids = append(ids, modIds...) - parseVarsId, err := idx.jobStore.EnqueueJob(job.Job{ + parseVarsId, err := idx.jobStore.EnqueueJob(ctx, job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { return module.ParseVariables(ctx, idx.fs, idx.modStore, modHandle.Path()) @@ -48,7 +48,7 @@ func (idx *Indexer) DocumentChanged(modHandle document.DirHandle) (job.IDs, erro } ids = append(ids, parseVarsId) - varsRefsId, err := idx.jobStore.EnqueueJob(job.Job{ + varsRefsId, err := idx.jobStore.EnqueueJob(ctx, job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { return module.DecodeVarsReferences(ctx, idx.modStore, idx.schemaStore, modHandle.Path()) @@ -65,10 +65,10 @@ func (idx *Indexer) DocumentChanged(modHandle document.DirHandle) (job.IDs, erro return ids, nil } -func (idx *Indexer) decodeModule(modHandle document.DirHandle, dependsOn job.IDs, ignoreState bool) (job.IDs, error) { +func (idx *Indexer) decodeModule(ctx context.Context, modHandle document.DirHandle, dependsOn job.IDs, ignoreState bool) (job.IDs, error) { ids := make(job.IDs, 0) - metaId, err := idx.jobStore.EnqueueJob(job.Job{ + metaId, err := idx.jobStore.EnqueueJob(ctx, job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { return module.LoadModuleMetadata(ctx, idx.modStore, modHandle.Path()) @@ -82,7 +82,7 @@ func (idx *Indexer) decodeModule(modHandle document.DirHandle, dependsOn job.IDs } ids = append(ids, metaId) - eSchemaId, err := idx.jobStore.EnqueueJob(job.Job{ + eSchemaId, err := idx.jobStore.EnqueueJob(ctx, job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { return module.PreloadEmbeddedSchema(ctx, idx.logger, schemas.FS, idx.modStore, idx.schemaStore, modHandle.Path()) @@ -96,7 +96,7 @@ func (idx *Indexer) decodeModule(modHandle document.DirHandle, dependsOn job.IDs } ids = append(ids, eSchemaId) - refTargetsId, err := idx.jobStore.EnqueueJob(job.Job{ + refTargetsId, err := idx.jobStore.EnqueueJob(ctx, job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { return module.DecodeReferenceTargets(ctx, idx.modStore, idx.schemaStore, modHandle.Path()) @@ -110,7 +110,7 @@ func (idx *Indexer) decodeModule(modHandle document.DirHandle, dependsOn job.IDs } ids = append(ids, refTargetsId) - refOriginsId, err := idx.jobStore.EnqueueJob(job.Job{ + refOriginsId, err := idx.jobStore.EnqueueJob(ctx, job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { return module.DecodeReferenceOrigins(ctx, idx.modStore, idx.schemaStore, modHandle.Path()) @@ -124,7 +124,7 @@ func (idx *Indexer) decodeModule(modHandle document.DirHandle, dependsOn job.IDs } ids = append(ids, refOriginsId) - registryId, err := idx.jobStore.EnqueueJob(job.Job{ + registryId, err := idx.jobStore.EnqueueJob(ctx, job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { return module.GetModuleDataFromRegistry(ctx, idx.registryClient, diff --git a/internal/indexer/document_open.go b/internal/indexer/document_open.go index ec6a9550..159c2dde 100644 --- a/internal/indexer/document_open.go +++ b/internal/indexer/document_open.go @@ -14,7 +14,7 @@ import ( op "github.com/hashicorp/terraform-ls/internal/terraform/module/operation" ) -func (idx *Indexer) DocumentOpened(modHandle document.DirHandle) (job.IDs, error) { +func (idx *Indexer) DocumentOpened(ctx context.Context, modHandle document.DirHandle) (job.IDs, error) { mod, err := idx.modStore.ModuleByPath(modHandle.Path()) if err != nil { return nil, err @@ -24,7 +24,7 @@ func (idx *Indexer) DocumentOpened(modHandle document.DirHandle) (job.IDs, error var errs *multierror.Error if mod.TerraformVersionState == op.OpStateUnknown { - _, err := idx.jobStore.EnqueueJob(job.Job{ + _, err := idx.jobStore.EnqueueJob(ctx, job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { ctx = exec.WithExecutorFactory(ctx, idx.tfExecFactory) @@ -40,7 +40,7 @@ func (idx *Indexer) DocumentOpened(modHandle document.DirHandle) (job.IDs, error // to avoid delays when documents of new modules are open. } - parseId, err := idx.jobStore.EnqueueJob(job.Job{ + parseId, err := idx.jobStore.EnqueueJob(ctx, job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { return module.ParseModuleConfiguration(ctx, idx.fs, idx.modStore, modHandle.Path()) @@ -53,13 +53,13 @@ func (idx *Indexer) DocumentOpened(modHandle document.DirHandle) (job.IDs, error } ids = append(ids, parseId) - modIds, err := idx.decodeModule(modHandle, job.IDs{parseId}, true) + modIds, err := idx.decodeModule(ctx, modHandle, job.IDs{parseId}, true) if err != nil { return ids, err } ids = append(ids, modIds...) - parseVarsId, err := idx.jobStore.EnqueueJob(job.Job{ + parseVarsId, err := idx.jobStore.EnqueueJob(ctx, job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { return module.ParseVariables(ctx, idx.fs, idx.modStore, modHandle.Path()) @@ -72,7 +72,7 @@ func (idx *Indexer) DocumentOpened(modHandle document.DirHandle) (job.IDs, error } ids = append(ids, parseVarsId) - varsRefsId, err := idx.jobStore.EnqueueJob(job.Job{ + varsRefsId, err := idx.jobStore.EnqueueJob(ctx, job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { return module.DecodeVarsReferences(ctx, idx.modStore, idx.schemaStore, modHandle.Path()) diff --git a/internal/indexer/module_calls.go b/internal/indexer/module_calls.go index 5c15987f..e835e0c6 100644 --- a/internal/indexer/module_calls.go +++ b/internal/indexer/module_calls.go @@ -15,7 +15,7 @@ import ( op "github.com/hashicorp/terraform-ls/internal/terraform/module/operation" ) -func (idx *Indexer) decodeInstalledModuleCalls(modHandle document.DirHandle, ignoreState bool) (job.IDs, error) { +func (idx *Indexer) decodeInstalledModuleCalls(ctx context.Context, modHandle document.DirHandle, ignoreState bool) (job.IDs, error) { jobIds := make(job.IDs, 0) moduleCalls, err := idx.modStore.ModuleCalls(modHandle.Path()) @@ -44,7 +44,7 @@ func (idx *Indexer) decodeInstalledModuleCalls(modHandle document.DirHandle, ign refCollectionDeps := make(job.IDs, 0) - parseId, err := idx.jobStore.EnqueueJob(job.Job{ + parseId, err := idx.jobStore.EnqueueJob(ctx, job.Job{ Dir: mcHandle, Func: func(ctx context.Context) error { return module.ParseModuleConfiguration(ctx, idx.fs, idx.modStore, mcPath) @@ -61,7 +61,7 @@ func (idx *Indexer) decodeInstalledModuleCalls(modHandle document.DirHandle, ign var metaId job.ID if parseId != "" { - metaId, err = idx.jobStore.EnqueueJob(job.Job{ + metaId, err = idx.jobStore.EnqueueJob(ctx, job.Job{ Dir: mcHandle, Type: op.OpTypeLoadModuleMetadata.String(), Func: func(ctx context.Context) error { @@ -77,7 +77,7 @@ func (idx *Indexer) decodeInstalledModuleCalls(modHandle document.DirHandle, ign refCollectionDeps = append(refCollectionDeps, metaId) } - eSchemaId, err := idx.jobStore.EnqueueJob(job.Job{ + eSchemaId, err := idx.jobStore.EnqueueJob(ctx, job.Job{ Dir: mcHandle, Func: func(ctx context.Context) error { return module.PreloadEmbeddedSchema(ctx, idx.logger, schemas.FS, idx.modStore, idx.schemaStore, mcPath) @@ -95,7 +95,7 @@ func (idx *Indexer) decodeInstalledModuleCalls(modHandle document.DirHandle, ign } if parseId != "" { - ids, err := idx.collectReferences(mcHandle, refCollectionDeps, ignoreState) + ids, err := idx.collectReferences(ctx, mcHandle, refCollectionDeps, ignoreState) if err != nil { multierror.Append(errs, err) } else { @@ -103,7 +103,7 @@ func (idx *Indexer) decodeInstalledModuleCalls(modHandle document.DirHandle, ign } } - varsParseId, err := idx.jobStore.EnqueueJob(job.Job{ + varsParseId, err := idx.jobStore.EnqueueJob(ctx, job.Job{ Dir: mcHandle, Func: func(ctx context.Context) error { return module.ParseVariables(ctx, idx.fs, idx.modStore, mcPath) @@ -118,7 +118,7 @@ func (idx *Indexer) decodeInstalledModuleCalls(modHandle document.DirHandle, ign } if varsParseId != "" { - varsRefId, err := idx.jobStore.EnqueueJob(job.Job{ + varsRefId, err := idx.jobStore.EnqueueJob(ctx, job.Job{ Dir: mcHandle, Func: func(ctx context.Context) error { return module.DecodeVarsReferences(ctx, idx.modStore, idx.schemaStore, mcPath) @@ -138,12 +138,12 @@ func (idx *Indexer) decodeInstalledModuleCalls(modHandle document.DirHandle, ign return jobIds, errs.ErrorOrNil() } -func (idx *Indexer) collectReferences(modHandle document.DirHandle, dependsOn job.IDs, ignoreState bool) (job.IDs, error) { +func (idx *Indexer) collectReferences(ctx context.Context, modHandle document.DirHandle, dependsOn job.IDs, ignoreState bool) (job.IDs, error) { ids := make(job.IDs, 0) var errs *multierror.Error - id, err := idx.jobStore.EnqueueJob(job.Job{ + id, err := idx.jobStore.EnqueueJob(ctx, job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { return module.DecodeReferenceTargets(ctx, idx.modStore, idx.schemaStore, modHandle.Path()) @@ -158,7 +158,7 @@ func (idx *Indexer) collectReferences(modHandle document.DirHandle, dependsOn jo ids = append(ids, id) } - id, err = idx.jobStore.EnqueueJob(job.Job{ + id, err = idx.jobStore.EnqueueJob(ctx, job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { return module.DecodeReferenceOrigins(ctx, idx.modStore, idx.schemaStore, modHandle.Path()) diff --git a/internal/indexer/walker.go b/internal/indexer/walker.go index bbf42ae5..9c7b077b 100644 --- a/internal/indexer/walker.go +++ b/internal/indexer/walker.go @@ -23,7 +23,7 @@ func (idx *Indexer) WalkedModule(ctx context.Context, modHandle document.DirHand refCollectionDeps := make(job.IDs, 0) providerVersionDeps := make(job.IDs, 0) - parseId, err := idx.jobStore.EnqueueJob(job.Job{ + parseId, err := idx.jobStore.EnqueueJob(ctx, job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { return module.ParseModuleConfiguration(ctx, idx.fs, idx.modStore, modHandle.Path()) @@ -40,7 +40,7 @@ func (idx *Indexer) WalkedModule(ctx context.Context, modHandle document.DirHand var metaId job.ID if parseId != "" { - metaId, err = idx.jobStore.EnqueueJob(job.Job{ + metaId, err = idx.jobStore.EnqueueJob(ctx, job.Job{ Dir: modHandle, Type: op.OpTypeLoadModuleMetadata.String(), Func: func(ctx context.Context) error { @@ -57,7 +57,7 @@ func (idx *Indexer) WalkedModule(ctx context.Context, modHandle document.DirHand } } - parseVarsId, err := idx.jobStore.EnqueueJob(job.Job{ + parseVarsId, err := idx.jobStore.EnqueueJob(ctx, job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { return module.ParseVariables(ctx, idx.fs, idx.modStore, modHandle.Path()) @@ -71,7 +71,7 @@ func (idx *Indexer) WalkedModule(ctx context.Context, modHandle document.DirHand } if parseVarsId != "" { - varsRefsId, err := idx.jobStore.EnqueueJob(job.Job{ + varsRefsId, err := idx.jobStore.EnqueueJob(ctx, job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { return module.DecodeVarsReferences(ctx, idx.modStore, idx.schemaStore, modHandle.Path()) @@ -94,14 +94,14 @@ func (idx *Indexer) WalkedModule(ctx context.Context, modHandle document.DirHand if dataDir.ModuleManifestPath != "" { // References are collected *after* manifest parsing // so that we reflect any references to submodules. - modManifestId, err = idx.jobStore.EnqueueJob(job.Job{ + modManifestId, err = idx.jobStore.EnqueueJob(ctx, job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { return module.ParseModuleManifest(ctx, idx.fs, idx.modStore, modHandle.Path()) }, Type: op.OpTypeParseModuleManifest.String(), Defer: func(ctx context.Context, jobErr error) (job.IDs, error) { - return idx.decodeInstalledModuleCalls(modHandle, false) + return idx.decodeInstalledModuleCalls(ctx, modHandle, false) }, }) if err != nil { @@ -116,7 +116,7 @@ func (idx *Indexer) WalkedModule(ctx context.Context, modHandle document.DirHand if dataDir.PluginLockFilePath != "" { dependsOn := make(job.IDs, 0) - pSchemaVerId, err := idx.jobStore.EnqueueJob(job.Job{ + pSchemaVerId, err := idx.jobStore.EnqueueJob(ctx, job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { return module.ParseProviderVersions(ctx, idx.fs, idx.modStore, modHandle.Path()) @@ -132,7 +132,7 @@ func (idx *Indexer) WalkedModule(ctx context.Context, modHandle document.DirHand refCollectionDeps = append(refCollectionDeps, pSchemaVerId) } - pSchemaId, err := idx.jobStore.EnqueueJob(job.Job{ + pSchemaId, err := idx.jobStore.EnqueueJob(ctx, job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { ctx = exec.WithExecutorFactory(ctx, idx.tfExecFactory) @@ -149,7 +149,7 @@ func (idx *Indexer) WalkedModule(ctx context.Context, modHandle document.DirHand } } - eSchemaId, err := idx.jobStore.EnqueueJob(job.Job{ + eSchemaId, err := idx.jobStore.EnqueueJob(ctx, job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { return module.PreloadEmbeddedSchema(ctx, idx.logger, schemas.FS, idx.modStore, idx.schemaStore, modHandle.Path()) @@ -168,7 +168,7 @@ func (idx *Indexer) WalkedModule(ctx context.Context, modHandle document.DirHand ids = append(ids, eSchemaId) if parseId != "" { - rIds, err := idx.collectReferences(modHandle, refCollectionDeps, false) + rIds, err := idx.collectReferences(ctx, modHandle, refCollectionDeps, false) if err != nil { errs = multierror.Append(errs, err) } else { diff --git a/internal/indexer/watcher.go b/internal/indexer/watcher.go index f9f86c55..6211d39c 100644 --- a/internal/indexer/watcher.go +++ b/internal/indexer/watcher.go @@ -17,7 +17,7 @@ import ( func (idx *Indexer) ModuleManifestChanged(ctx context.Context, modHandle document.DirHandle) (job.IDs, error) { ids := make(job.IDs, 0) - modManifestId, err := idx.jobStore.EnqueueJob(job.Job{ + modManifestId, err := idx.jobStore.EnqueueJob(ctx, job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { return module.ParseModuleManifest(ctx, idx.fs, idx.modStore, modHandle.Path()) @@ -25,7 +25,7 @@ func (idx *Indexer) ModuleManifestChanged(ctx context.Context, modHandle documen Type: op.OpTypeParseModuleManifest.String(), IgnoreState: true, Defer: func(ctx context.Context, jobErr error) (job.IDs, error) { - return idx.decodeInstalledModuleCalls(modHandle, true) + return idx.decodeInstalledModuleCalls(ctx, modHandle, true) }, }) if err != nil { @@ -41,7 +41,7 @@ func (idx *Indexer) PluginLockChanged(ctx context.Context, modHandle document.Di dependsOn := make(job.IDs, 0) var errs *multierror.Error - pSchemaVerId, err := idx.jobStore.EnqueueJob(job.Job{ + pSchemaVerId, err := idx.jobStore.EnqueueJob(ctx, job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { return module.ParseProviderVersions(ctx, idx.fs, idx.modStore, modHandle.Path()) @@ -56,7 +56,7 @@ func (idx *Indexer) PluginLockChanged(ctx context.Context, modHandle document.Di dependsOn = append(dependsOn, pSchemaVerId) } - pSchemaId, err := idx.jobStore.EnqueueJob(job.Job{ + pSchemaId, err := idx.jobStore.EnqueueJob(ctx, job.Job{ Dir: modHandle, Func: func(ctx context.Context) error { ctx = exec.WithExecutorFactory(ctx, idx.tfExecFactory) diff --git a/internal/job/job_id.go b/internal/job/job_id.go index fca8c64a..5d4e9bb3 100644 --- a/internal/job/job_id.go +++ b/internal/job/job_id.go @@ -20,3 +20,13 @@ func (ids IDs) Copy() IDs { return newIds } + +func (ids IDs) StringSlice() []string { + stringIds := make([]string, len(ids)) + + for i, id := range ids { + stringIds[i] = id.String() + } + + return stringIds +} diff --git a/internal/job/job_store.go b/internal/job/job_store.go index 0fc02e38..e6a36656 100644 --- a/internal/job/job_store.go +++ b/internal/job/job_store.go @@ -8,6 +8,6 @@ import ( ) type JobStore interface { - EnqueueJob(newJob Job) (ID, error) + EnqueueJob(ctx context.Context, newJob Job) (ID, error) WaitForJobs(ctx context.Context, ids ...ID) error } diff --git a/internal/langserver/handlers/did_change.go b/internal/langserver/handlers/did_change.go index d3c628b2..45e7c4e5 100644 --- a/internal/langserver/handlers/did_change.go +++ b/internal/langserver/handlers/did_change.go @@ -54,7 +54,7 @@ func (svc *service) TextDocumentDidChange(ctx context.Context, params lsp.DidCha return err } - jobIds, err := svc.indexer.DocumentChanged(dh.Dir) + jobIds, err := svc.indexer.DocumentChanged(ctx, dh.Dir) if err != nil { return err } diff --git a/internal/langserver/handlers/did_change_watched_files.go b/internal/langserver/handlers/did_change_watched_files.go index 6758f060..cc9dc8a7 100644 --- a/internal/langserver/handlers/did_change_watched_files.go +++ b/internal/langserver/handlers/did_change_watched_files.go @@ -136,7 +136,7 @@ func (svc *service) DidChangeWatchedFiles(ctx context.Context, params lsp.DidCha // if the parent directory exists, we just need to // reparse the module after a file was deleted from it dirHandle := document.DirHandleFromPath(parentDir) - jobIds, err := svc.indexer.DocumentChanged(dirHandle) + jobIds, err := svc.indexer.DocumentChanged(ctx, dirHandle) if err != nil { svc.logger.Printf("error parsing module (%q deleted): %s", rawURI, err) continue @@ -161,7 +161,7 @@ func (svc *service) DidChangeWatchedFiles(ctx context.Context, params lsp.DidCha continue } - jobIds, err := svc.indexer.DocumentChanged(ph.DirHandle) + jobIds, err := svc.indexer.DocumentChanged(ctx, ph.DirHandle) if err != nil { svc.logger.Printf("error parsing module (%q changed): %s", rawURI, err) continue @@ -181,7 +181,7 @@ func (svc *service) DidChangeWatchedFiles(ctx context.Context, params lsp.DidCha } if ph.IsDir { - err = svc.stateStore.WalkerPaths.EnqueueDir(ph.DirHandle) + err = svc.stateStore.WalkerPaths.EnqueueDir(ctx, ph.DirHandle) if err != nil { jrpc2.ServerFromContext(ctx).Notify(ctx, "window/showMessage", &lsp.ShowMessageParams{ Type: lsp.Warning, @@ -191,7 +191,7 @@ func (svc *service) DidChangeWatchedFiles(ctx context.Context, params lsp.DidCha continue } } else { - jobIds, err := svc.indexer.DocumentChanged(ph.DirHandle) + jobIds, err := svc.indexer.DocumentChanged(ctx, ph.DirHandle) if err != nil { svc.logger.Printf("error parsing module (%q created): %s", rawURI, err) continue @@ -214,7 +214,7 @@ func (svc *service) indexModuleIfNotExists(ctx context.Context, modHandle docume _, err := svc.modStore.ModuleByPath(modHandle.Path()) if err != nil { if state.IsModuleNotFound(err) { - err = svc.stateStore.WalkerPaths.EnqueueDir(modHandle) + err = svc.stateStore.WalkerPaths.EnqueueDir(ctx, modHandle) if err != nil { return fmt.Errorf("failed to walk module %q: %w", modHandle, err) } diff --git a/internal/langserver/handlers/did_change_workspace_folders.go b/internal/langserver/handlers/did_change_workspace_folders.go index 4d8902af..7a4c4eb3 100644 --- a/internal/langserver/handlers/did_change_workspace_folders.go +++ b/internal/langserver/handlers/did_change_workspace_folders.go @@ -44,7 +44,7 @@ func (svc *service) DidChangeWorkspaceFolders(ctx context.Context, params lsp.Di func (svc *service) indexNewModule(ctx context.Context, modURI string) { modHandle := document.DirHandleFromURI(modURI) - err := svc.stateStore.WalkerPaths.EnqueueDir(modHandle) + err := svc.stateStore.WalkerPaths.EnqueueDir(ctx, modHandle) if err != nil { jrpc2.ServerFromContext(ctx).Notify(ctx, "window/showMessage", &lsp.ShowMessageParams{ Type: lsp.Warning, diff --git a/internal/langserver/handlers/did_open.go b/internal/langserver/handlers/did_open.go index 4997a89c..eab80a76 100644 --- a/internal/langserver/handlers/did_open.go +++ b/internal/langserver/handlers/did_open.go @@ -59,13 +59,13 @@ func (svc *service) TextDocumentDidOpen(ctx context.Context, params lsp.DidOpenT // (originally parsed) content on the disk // TODO: Do this only if we can verify the file differs? modHandle := document.DirHandleFromPath(mod.Path) - jobIds, err := svc.indexer.DocumentOpened(modHandle) + jobIds, err := svc.indexer.DocumentOpened(ctx, modHandle) if err != nil { return err } if svc.singleFileMode { - err = svc.stateStore.WalkerPaths.EnqueueDir(modHandle) + err = svc.stateStore.WalkerPaths.EnqueueDir(ctx, modHandle) if err != nil { return err } diff --git a/internal/langserver/handlers/initialize.go b/internal/langserver/handlers/initialize.go index 8a65be78..669059ed 100644 --- a/internal/langserver/handlers/initialize.go +++ b/internal/langserver/handlers/initialize.go @@ -308,7 +308,7 @@ func (svc *service) setupWalker(ctx context.Context, params lsp.InitializeParams ignoredPaths = append(ignoredPaths, modPath) } - err = svc.stateStore.WalkerPaths.EnqueueDir(root) + err = svc.stateStore.WalkerPaths.EnqueueDir(ctx, root) if err != nil { return err } @@ -326,7 +326,7 @@ func (svc *service) setupWalker(ctx context.Context, params lsp.InitializeParams modPath := document.DirHandleFromURI(folder.URI) - err := svc.stateStore.WalkerPaths.EnqueueDir(modPath) + err := svc.stateStore.WalkerPaths.EnqueueDir(ctx, modPath) if err != nil { jrpc2.ServerFromContext(ctx).Notify(ctx, "window/showMessage", &lsp.ShowMessageParams{ Type: lsp.Warning, diff --git a/internal/langserver/handlers/service.go b/internal/langserver/handlers/service.go index 4709f465..2d11df84 100644 --- a/internal/langserver/handlers/service.go +++ b/internal/langserver/handlers/service.go @@ -34,6 +34,11 @@ import ( "github.com/hashicorp/terraform-ls/internal/terraform/discovery" "github.com/hashicorp/terraform-ls/internal/terraform/exec" "github.com/hashicorp/terraform-ls/internal/walker" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + "go.opentelemetry.io/otel/trace" ) type service struct { @@ -584,13 +589,52 @@ func convertMap(m map[string]rpch.Func) rpch.Map { } const requestCancelled jrpc2.Code = -32800 +const tracerName = "github.com/hashicorp/terraform-ls/internal/langserver/handlers" // handle calls a jrpc2.Func compatible function func handle(ctx context.Context, req *jrpc2.Request, fn interface{}) (interface{}, error) { + attrs := []attribute.KeyValue{ + { + Key: semconv.RPCMethodKey, + Value: attribute.StringValue(req.Method()), + }, + { + Key: semconv.RPCJsonrpcRequestIDKey, + Value: attribute.StringValue(req.ID()), + }, + } + + // We could capture all parameters here but for now we just + // opportunistically track the most important ones only. + type p struct { + URI string `json:"uri,omitempty"` + RootURI string `json:"rootUri,omitempty"` + } + params := p{} + err := req.UnmarshalParams(¶ms) + if err == nil { + attrs = append(attrs, attribute.KeyValue{ + Key: attribute.Key("URI"), + Value: attribute.StringValue(string(params.URI)), + }) + } + + tracer := otel.Tracer(tracerName) + ctx, span := tracer.Start(ctx, "rpc:"+req.Method(), + trace.WithAttributes(attrs...)) + defer span.End() + result, err := rpch.New(fn)(ctx, req) if ctx.Err() != nil && errors.Is(ctx.Err(), context.Canceled) { err = fmt.Errorf("%w: %s", requestCancelled.Err(), err) } + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "request failed") + } else { + span.SetStatus(codes.Ok, "ok") + } + return result, err } diff --git a/internal/langserver/langserver.go b/internal/langserver/langserver.go index 79c26f53..bc98f0cb 100644 --- a/internal/langserver/langserver.go +++ b/internal/langserver/langserver.go @@ -17,6 +17,7 @@ import ( "github.com/creachadair/jrpc2/channel" "github.com/creachadair/jrpc2/server" "github.com/hashicorp/terraform-ls/internal/langserver/session" + "go.opentelemetry.io/otel/trace" ) type langServer struct { @@ -37,6 +38,10 @@ func NewLangServer(srvCtx context.Context, sf session.SessionFactory) *langServe opts := &jrpc2.ServerOptions{ AllowPush: true, Concurrency: concurrency, + NewContext: func() context.Context { + spanCtx := trace.SpanContextFromContext(srvCtx) + return trace.ContextWithSpanContext(context.Background(), spanCtx) + }, } return &langServer{ diff --git a/internal/registry/module.go b/internal/registry/module.go index 059a7858..3bfa8a54 100644 --- a/internal/registry/module.go +++ b/internal/registry/module.go @@ -9,11 +9,15 @@ import ( "fmt" "io/ioutil" "net/http" + "net/http/httptrace" "sort" - "github.com/hashicorp/go-cleanhttp" "github.com/hashicorp/go-version" tfaddr "github.com/hashicorp/terraform-registry-address" + "go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" ) type ModuleResponse struct { @@ -61,6 +65,8 @@ func (rce ClientError) Error() string { } func (c Client) GetModuleData(ctx context.Context, addr tfaddr.Module, cons version.Constraints) (*ModuleResponse, error) { + ctx, span := otel.Tracer(tracerName).Start(ctx, "registry:GetModuleData") + defer span.End() var response ModuleResponse v, err := c.GetMatchingModuleVersion(ctx, addr, cons) @@ -68,8 +74,7 @@ func (c Client) GetModuleData(ctx context.Context, addr tfaddr.Module, cons vers return nil, err } - client := cleanhttp.DefaultClient() - client.Timeout = defaultTimeout + ctx = httptrace.WithClientTrace(ctx, otelhttptrace.NewClientTrace(ctx, otelhttptrace.WithoutSubSpans())) url := fmt.Sprintf("%s/v1/modules/%s/%s/%s/%s", c.BaseURL, addr.Package.Namespace, @@ -82,7 +87,7 @@ func (c Client) GetModuleData(ctx context.Context, addr tfaddr.Module, cons vers return nil, err } - resp, err := client.Do(req) + resp, err := c.httpClient.Do(req) if err != nil { return nil, err } @@ -106,6 +111,8 @@ func (c Client) GetModuleData(ctx context.Context, addr tfaddr.Module, cons vers } func (c Client) GetMatchingModuleVersion(ctx context.Context, addr tfaddr.Module, con version.Constraints) (*version.Version, error) { + ctx, span := otel.Tracer(tracerName).Start(ctx, "registry:GetMatchingModuleVersion") + defer span.End() foundVersions, err := c.GetModuleVersions(ctx, addr) if err != nil { return nil, err @@ -121,20 +128,22 @@ func (c Client) GetMatchingModuleVersion(ctx context.Context, addr tfaddr.Module } func (c Client) GetModuleVersions(ctx context.Context, addr tfaddr.Module) (version.Collection, error) { + ctx, span := otel.Tracer(tracerName).Start(ctx, "registry:GetModuleVersions") + defer span.End() + url := fmt.Sprintf("%s/v1/modules/%s/%s/%s/versions", c.BaseURL, addr.Package.Namespace, addr.Package.Name, addr.Package.TargetSystem) - client := cleanhttp.DefaultClient() - client.Timeout = defaultTimeout + ctx = httptrace.WithClientTrace(ctx, otelhttptrace.NewClientTrace(ctx, otelhttptrace.WithoutSubSpans())) req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { return nil, err } - resp, err := client.Do(req) + resp, err := c.httpClient.Do(req) if err != nil { return nil, err } @@ -149,11 +158,13 @@ func (c Client) GetModuleVersions(ctx context.Context, addr tfaddr.Module) (vers return nil, ClientError{StatusCode: resp.StatusCode, Body: string(bodyBytes)} } + _, decodeSpan := otel.Tracer(tracerName).Start(ctx, "registry:GetModuleVersions:decodeJson") var response ModuleVersionsResponse err = json.NewDecoder(resp.Body).Decode(&response) if err != nil { return nil, err } + decodeSpan.End() var foundVersions version.Collection for _, module := range response.Modules { @@ -164,6 +175,11 @@ func (c Client) GetModuleVersions(ctx context.Context, addr tfaddr.Module) (vers } } } + span.AddEvent("registry:foundModuleVersions", + trace.WithAttributes(attribute.KeyValue{ + Key: attribute.Key("moduleVersionCount"), + Value: attribute.IntValue(len(foundVersions)), + })) sort.Sort(sort.Reverse(foundVersions)) diff --git a/internal/registry/registry.go b/internal/registry/registry.go index 9818ee8c..cdd38f10 100644 --- a/internal/registry/registry.go +++ b/internal/registry/registry.go @@ -4,24 +4,35 @@ package registry import ( + "net/http" "time" + + "github.com/hashicorp/go-cleanhttp" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" ) const ( defaultBaseURL = "https://registry.terraform.io" defaultTimeout = 5 * time.Second + tracerName = "github.com/hashicorp/terraform-ls/internal/registry" ) type Client struct { BaseURL string Timeout time.Duration ProviderPageSize int + httpClient *http.Client } func NewClient() Client { + client := cleanhttp.DefaultClient() + client.Timeout = defaultTimeout + client.Transport = otelhttp.NewTransport(client.Transport) + return Client{ BaseURL: defaultBaseURL, Timeout: defaultTimeout, ProviderPageSize: 100, + httpClient: client, } } diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go index 1f12753b..99bcd66a 100644 --- a/internal/scheduler/scheduler.go +++ b/internal/scheduler/scheduler.go @@ -10,8 +10,14 @@ import ( "log" "github.com/hashicorp/terraform-ls/internal/job" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" ) +const tracerName = "github.com/hashicorp/terraform-ls/internal/scheduler" + type Scheduler struct { logger *log.Logger jobStorage JobStorage @@ -22,7 +28,7 @@ type Scheduler struct { type JobStorage interface { job.JobStore - AwaitNextJob(ctx context.Context, priority job.JobPriority) (job.ID, job.Job, error) + AwaitNextJob(ctx context.Context, priority job.JobPriority) (context.Context, job.ID, job.Job, error) FinishJob(id job.ID, jobErr error, deferredJobIds ...job.ID) error } @@ -59,7 +65,7 @@ func (s *Scheduler) Stop() { func (s *Scheduler) eval(ctx context.Context) { for { - id, nextJob, err := s.jobStorage.AwaitNextJob(ctx, s.priority) + ctx, id, nextJob, err := s.jobStorage.AwaitNextJob(ctx, s.priority) if err != nil { if errors.Is(err, context.Canceled) { return @@ -69,9 +75,39 @@ func (s *Scheduler) eval(ctx context.Context) { } ctx = job.WithIgnoreState(ctx, nextJob.IgnoreState) + jobSpan := trace.SpanFromContext(ctx) + + ctx, span := otel.Tracer(tracerName).Start(ctx, "job-eval:"+nextJob.Type, + trace.WithAttributes(attribute.KeyValue{ + Key: attribute.Key("JobID"), + Value: attribute.StringValue(id.String()), + }, attribute.KeyValue{ + Key: attribute.Key("JobType"), + Value: attribute.StringValue(nextJob.Type), + }, attribute.KeyValue{ + Key: attribute.Key("Priority"), + Value: attribute.IntValue(int(nextJob.Priority)), + }, attribute.KeyValue{ + Key: attribute.Key("URI"), + Value: attribute.StringValue(nextJob.Dir.URI), + })) jobErr := nextJob.Func(ctx) + if jobErr != nil { + if errors.Is(jobErr, job.StateNotChangedErr{Dir: nextJob.Dir}) { + span.SetStatus(codes.Ok, "state not changed") + } else { + span.RecordError(jobErr) + span.SetStatus(codes.Error, "job failed") + } + } else { + span.SetStatus(codes.Ok, "job finished") + } + span.End() + jobSpan.SetStatus(codes.Ok, "ok") + jobSpan.End() + deferredJobIds := make(job.IDs, 0) if nextJob.Defer != nil { deferredJobIds, err = nextJob.Defer(ctx, jobErr) diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go index db8c53f9..8af19981 100644 --- a/internal/scheduler/scheduler_test.go +++ b/internal/scheduler/scheduler_test.go @@ -39,7 +39,7 @@ func TestScheduler_withIgnoreExistingState(t *testing.T) { }) var stateIgnored int64 = 0 - firstJobId, err := ss.JobStore.EnqueueJob(job.Job{ + firstJobId, err := ss.JobStore.EnqueueJob(ctx, job.Job{ Func: func(ctx context.Context) error { if job.IgnoreState(ctx) { atomic.AddInt64(&stateIgnored, 1) @@ -55,7 +55,7 @@ func TestScheduler_withIgnoreExistingState(t *testing.T) { } var stateNotIgnored int64 = 0 - secondJobId, err := ss.JobStore.EnqueueJob(job.Job{ + secondJobId, err := ss.JobStore.EnqueueJob(ctx, job.Job{ Func: func(ctx context.Context) error { if !job.IgnoreState(ctx) { atomic.AddInt64(&stateNotIgnored, 1) @@ -108,7 +108,7 @@ func TestScheduler_closedOnly(t *testing.T) { i := i dirPath := filepath.Join(tmpDir, fmt.Sprintf("folder-%d", i)) - newId, err := ss.JobStore.EnqueueJob(job.Job{ + newId, err := ss.JobStore.EnqueueJob(ctx, job.Job{ Func: func(c context.Context) error { atomic.AddInt64(&jobsExecuted, 1) return nil @@ -153,7 +153,8 @@ func TestScheduler_closedAndOpen(t *testing.T) { i := i dirPath := filepath.Join(tmpDir, fmt.Sprintf("folder-x-%d", i)) - newId, err := ss.JobStore.EnqueueJob(job.Job{ + ctx := context.Background() + newId, err := ss.JobStore.EnqueueJob(ctx, job.Job{ Func: func(c context.Context) error { atomic.AddInt64(&closedJobsExecuted, 1) return nil @@ -178,7 +179,8 @@ func TestScheduler_closedAndOpen(t *testing.T) { i := i dirPath := filepath.Join(tmpDir, fmt.Sprintf("folder-y-%d", i)) - newId, err := ss.JobStore.EnqueueJob(job.Job{ + ctx := context.Background() + newId, err := ss.JobStore.EnqueueJob(ctx, job.Job{ Func: func(c context.Context) error { atomic.AddInt64(&openJobsExecuted, 1) return nil @@ -274,7 +276,7 @@ func BenchmarkScheduler_EnqueueAndWaitForJob_closedOnly(b *testing.B) { i := i dirPath := filepath.Join(tmpDir, fmt.Sprintf("folder-%d", i)) - newId, err := ss.JobStore.EnqueueJob(job.Job{ + newId, err := ss.JobStore.EnqueueJob(ctx, job.Job{ Func: func(c context.Context) error { return nil }, @@ -319,7 +321,7 @@ func TestScheduler_defer(t *testing.T) { i := i dirPath := filepath.Join(tmpDir, fmt.Sprintf("folder-%d", i)) - newId, err := ss.JobStore.EnqueueJob(job.Job{ + newId, err := ss.JobStore.EnqueueJob(ctx, job.Job{ Func: func(c context.Context) error { atomic.AddInt64(&jobsExecuted, 1) return nil @@ -330,7 +332,7 @@ func TestScheduler_defer(t *testing.T) { ids := make(job.IDs, 0) je := ss.JobStore - id1, err := je.EnqueueJob(job.Job{ + id1, err := je.EnqueueJob(ctx, job.Job{ Dir: document.DirHandleFromPath(dirPath), Type: "test-1", Func: func(c context.Context) error { @@ -344,7 +346,7 @@ func TestScheduler_defer(t *testing.T) { } ids = append(ids, id1) - id2, err := je.EnqueueJob(job.Job{ + id2, err := je.EnqueueJob(ctx, job.Job{ Dir: document.DirHandleFromPath(dirPath), Type: "test-2", Func: func(c context.Context) error { @@ -405,7 +407,7 @@ func TestScheduler_dependsOn(t *testing.T) { dirPath := filepath.Join(tmpDir, "test-folder") - id0, err := ss.JobStore.EnqueueJob(job.Job{ + id0, err := ss.JobStore.EnqueueJob(ctx, job.Job{ Func: func(c context.Context) error { time.Sleep(20 * time.Millisecond) executedJobs = append(executedJobs, "test-0") @@ -419,7 +421,7 @@ func TestScheduler_dependsOn(t *testing.T) { } ids = append(ids, id0) - id1, err := ss.JobStore.EnqueueJob(job.Job{ + id1, err := ss.JobStore.EnqueueJob(ctx, job.Job{ Dir: document.DirHandleFromPath(dirPath), Type: "test-1", Func: func(c context.Context) error { @@ -434,7 +436,7 @@ func TestScheduler_dependsOn(t *testing.T) { } ids = append(ids, id1) - id2, err := ss.JobStore.EnqueueJob(job.Job{ + id2, err := ss.JobStore.EnqueueJob(ctx, job.Job{ Dir: document.DirHandleFromPath(dirPath), Type: "test-2", Func: func(c context.Context) error { diff --git a/internal/state/jobs.go b/internal/state/jobs.go index 655d6245..3e3a5c1b 100644 --- a/internal/state/jobs.go +++ b/internal/state/jobs.go @@ -10,10 +10,15 @@ import ( "log" "sync" "sync/atomic" + "time" "github.com/hashicorp/go-memdb" "github.com/hashicorp/terraform-ls/internal/document" "github.com/hashicorp/terraform-ls/internal/job" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" ) type JobStore struct { @@ -38,9 +43,20 @@ type ScheduledJob struct { // DeferredJobIDs contains IDs of any deferred jobs // set when job finishes (State = StateDone) DeferredJobIDs job.IDs + + // EnqueueTime tracks time when the job was originally put into the queue + EnqueueTime time.Time + // TraceSpan represents a tracing span for the entire job lifecycle + // (from queuing to finishing execution). + TraceSpan trace.Span } func (sj *ScheduledJob) Copy() *ScheduledJob { + // This may be an awkward way to copy the Span + // but the upstream doesn't seem to offer any better way. + newCtx := trace.ContextWithSpan(context.Background(), sj.TraceSpan) + traceSpan := trace.SpanFromContext(newCtx) + return &ScheduledJob{ ID: sj.ID, Job: sj.Job.Copy(), @@ -48,6 +64,8 @@ func (sj *ScheduledJob) Copy() *ScheduledJob { State: sj.State, JobErr: sj.JobErr, DeferredJobIDs: sj.DeferredJobIDs.Copy(), + EnqueueTime: sj.EnqueueTime, + TraceSpan: traceSpan, } } @@ -60,7 +78,7 @@ const ( StateDone ) -func (js *JobStore) EnqueueJob(newJob job.Job) (job.ID, error) { +func (js *JobStore) EnqueueJob(ctx context.Context, newJob job.Job) (job.ID, error) { txn := js.db.Txn(true) defer txn.Abort() @@ -77,12 +95,36 @@ func (js *JobStore) EnqueueJob(newJob job.Job) (job.ID, error) { } } newJob.DependsOn = dependsOn + dirOpen := isDirOpen(txn, newJob.Dir) + + _, jobSpan := otel.Tracer(tracerName).Start(ctx, "job", + trace.WithAttributes(attribute.KeyValue{ + Key: attribute.Key("JobID"), + Value: attribute.StringValue(newJobID.String()), + }, attribute.KeyValue{ + Key: attribute.Key("JobType"), + Value: attribute.StringValue(newJob.Type), + }, attribute.KeyValue{ + Key: attribute.Key("IsDirOpen"), + Value: attribute.BoolValue(dirOpen), + }, attribute.KeyValue{ + Key: attribute.Key("Priority"), + Value: attribute.IntValue(int(newJob.Priority)), + }, attribute.KeyValue{ + Key: attribute.Key("URI"), + Value: attribute.StringValue(newJob.Dir.URI), + }, attribute.KeyValue{ + Key: attribute.Key("DependsOn"), + Value: attribute.StringSliceValue(dependsOn.StringSlice()), + })) sJob := &ScheduledJob{ - ID: newJobID, - Job: newJob, - IsDirOpen: isDirOpen(txn, newJob.Dir), - State: StateQueued, + ID: newJobID, + Job: newJob, + IsDirOpen: dirOpen, + State: StateQueued, + EnqueueTime: time.Now(), + TraceSpan: jobSpan, } err := txn.Insert(js.tableName, sJob) @@ -139,9 +181,12 @@ func (js *JobStore) DequeueJobsForDir(dir document.DirHandle) error { if err != nil { return err } + sJob.TraceSpan.SetStatus(codes.Ok, "job dequeued") + sJob.TraceSpan.End() } txn.Commit() + return nil } @@ -210,7 +255,7 @@ func (js *JobStore) jobExists(j job.Job, state State) (job.ID, bool, error) { return "", false, nil } -func (js *JobStore) AwaitNextJob(ctx context.Context, priority job.JobPriority) (job.ID, job.Job, error) { +func (js *JobStore) AwaitNextJob(ctx context.Context, priority job.JobPriority) (context.Context, job.ID, job.Job, error) { // Locking is needed if same query is executed in multiple threads, // i.e. this method is called at the same time from different threads, at // which point txn.FirstWatch would return the same job to more than @@ -230,20 +275,20 @@ func (js *JobStore) AwaitNextJob(ctx context.Context, priority job.JobPriority) return js.awaitNextJob(ctx, priority) } -func (js *JobStore) awaitNextJob(ctx context.Context, priority job.JobPriority) (job.ID, job.Job, error) { +func (js *JobStore) awaitNextJob(ctx context.Context, priority job.JobPriority) (context.Context, job.ID, job.Job, error) { var sJob *ScheduledJob for { txn := js.db.Txn(false) wCh, obj, err := txn.FirstWatch(js.tableName, "priority_dependecies_state", priority, 0, StateQueued) if err != nil { - return "", job.Job{}, err + return ctx, "", job.Job{}, err } if obj == nil { select { case <-wCh: case <-ctx.Done(): - return "", job.Job{}, ctx.Err() + return ctx, "", job.Job{}, ctx.Err() } js.logger.Printf("retrying on obj is nil") @@ -263,14 +308,38 @@ func (js *JobStore) awaitNextJob(ctx context.Context, priority job.JobPriority) js.logger.Printf("retrying next job: %s", err) continue } - return "", job.Job{}, err + return ctx, "", job.Job{}, err } break } js.logger.Printf("JOBS: Dispatching next job %q (scheduler prio: %d, job prio: %d, isDirOpen: %t): %q for %q", sJob.ID, priority, sJob.Priority, sJob.IsDirOpen, sJob.Type, sJob.Dir) - return sJob.ID, sJob.Job, nil + + ctx = trace.ContextWithSpan(ctx, sJob.TraceSpan) + + _, span := otel.Tracer(tracerName).Start(ctx, "job-wait", + trace.WithTimestamp(sJob.EnqueueTime), + trace.WithAttributes(attribute.KeyValue{ + Key: attribute.Key("JobID"), + Value: attribute.StringValue(sJob.ID.String()), + }, attribute.KeyValue{ + Key: attribute.Key("JobType"), + Value: attribute.StringValue(sJob.Type), + }, attribute.KeyValue{ + Key: attribute.Key("IsDirOpen"), + Value: attribute.BoolValue(sJob.IsDirOpen), + }, attribute.KeyValue{ + Key: attribute.Key("Priority"), + Value: attribute.IntValue(int(sJob.Priority)), + }, attribute.KeyValue{ + Key: attribute.Key("URI"), + Value: attribute.StringValue(sJob.Dir.URI), + }), + ) + span.End() + + return ctx, sJob.ID, sJob.Job, nil } func isDirOpen(txn *memdb.Txn, dirHandle document.DirHandle) bool { diff --git a/internal/state/jobs_test.go b/internal/state/jobs_test.go index a91059a2..2eb536f8 100644 --- a/internal/state/jobs_test.go +++ b/internal/state/jobs_test.go @@ -26,7 +26,8 @@ func TestJobStore_EnqueueJob(t *testing.T) { t.Fatal(err) } - id1, err := ss.JobStore.EnqueueJob(job.Job{ + ctx := context.Background() + id1, err := ss.JobStore.EnqueueJob(ctx, job.Job{ Func: func(ctx context.Context) error { return nil }, @@ -36,7 +37,7 @@ func TestJobStore_EnqueueJob(t *testing.T) { if err != nil { t.Fatal(err) } - id2, err := ss.JobStore.EnqueueJob(job.Job{ + id2, err := ss.JobStore.EnqueueJob(ctx, job.Job{ Func: func(ctx context.Context) error { return nil }, @@ -79,7 +80,8 @@ func TestJobStore_EnqueueJob_openDir(t *testing.T) { t.Fatal(err) } - id, err := ss.JobStore.EnqueueJob(job.Job{ + ctx := context.Background() + id, err := ss.JobStore.EnqueueJob(ctx, job.Job{ Func: func(ctx context.Context) error { return nil }, @@ -91,10 +93,9 @@ func TestJobStore_EnqueueJob_openDir(t *testing.T) { } // verify that job for open dir comes is treated as high priority - ctx := context.Background() ctx, cancelFunc := context.WithTimeout(ctx, 250*time.Millisecond) t.Cleanup(cancelFunc) - nextId, j, err := ss.JobStore.AwaitNextJob(ctx, job.HighPriority) + ctx, nextId, j, err := ss.JobStore.AwaitNextJob(ctx, job.HighPriority) if err != nil { t.Fatal(err) } @@ -119,12 +120,13 @@ func BenchmarkJobStore_EnqueueJob_basic(b *testing.B) { } tmpDir := b.TempDir() + ctx := context.Background() for i := 0; i < b.N; i++ { i := i dirPath := filepath.Join(tmpDir, fmt.Sprintf("folder-%d", i)) - _, err := ss.JobStore.EnqueueJob(job.Job{ + _, err := ss.JobStore.EnqueueJob(ctx, job.Job{ Func: func(c context.Context) error { return nil }, @@ -156,12 +158,13 @@ func TestJobStore_EnqueueJob_verify(t *testing.T) { tmpDir := t.TempDir() jobCount := 50 + ctx := context.Background() for i := 0; i < jobCount; i++ { i := i dirPath := filepath.Join(tmpDir, fmt.Sprintf("folder-%d", i)) - _, err := ss.JobStore.EnqueueJob(job.Job{ + _, err := ss.JobStore.EnqueueJob(ctx, job.Job{ Func: func(c context.Context) error { return nil }, @@ -213,8 +216,9 @@ func TestJobStore_DequeueJobsForDir(t *testing.T) { t.Fatal(err) } + ctx := context.Background() firstDir := document.DirHandleFromPath("/test-1") - _, err = ss.JobStore.EnqueueJob(job.Job{ + _, err = ss.JobStore.EnqueueJob(ctx, job.Job{ Func: func(ctx context.Context) error { return nil }, @@ -224,7 +228,7 @@ func TestJobStore_DequeueJobsForDir(t *testing.T) { if err != nil { t.Fatal(err) } - id2, err := ss.JobStore.EnqueueJob(job.Job{ + id2, err := ss.JobStore.EnqueueJob(ctx, job.Job{ Func: func(ctx context.Context) error { return nil }, @@ -256,8 +260,9 @@ func TestJobStore_AwaitNextJob_closedOnly(t *testing.T) { t.Fatal(err) } + ctx := context.Background() firstDir := document.DirHandleFromPath("/test-1") - id1, err := ss.JobStore.EnqueueJob(job.Job{ + id1, err := ss.JobStore.EnqueueJob(ctx, job.Job{ Func: func(ctx context.Context) error { return nil }, @@ -269,7 +274,7 @@ func TestJobStore_AwaitNextJob_closedOnly(t *testing.T) { } secondDir := document.DirHandleFromPath("/test-2") - _, err = ss.JobStore.EnqueueJob(job.Job{ + _, err = ss.JobStore.EnqueueJob(ctx, job.Job{ Func: func(ctx context.Context) error { return nil }, @@ -285,8 +290,7 @@ func TestJobStore_AwaitNextJob_closedOnly(t *testing.T) { t.Fatal(err) } - ctx := context.Background() - nextId, j, err := ss.JobStore.AwaitNextJob(ctx, job.LowPriority) + ctx, nextId, j, err := ss.JobStore.AwaitNextJob(ctx, job.LowPriority) if err != nil { t.Fatal(err) } @@ -305,7 +309,7 @@ func TestJobStore_AwaitNextJob_closedOnly(t *testing.T) { ctx, cancelFunc := context.WithTimeout(ctx, 250*time.Millisecond) t.Cleanup(cancelFunc) - nextId, j, err = ss.JobStore.AwaitNextJob(ctx, job.LowPriority) + ctx, nextId, j, err = ss.JobStore.AwaitNextJob(ctx, job.LowPriority) if err != nil { if !errors.Is(err, context.DeadlineExceeded) { t.Fatalf("%#v", err) @@ -319,8 +323,9 @@ func TestJobStore_AwaitNextJob_openOnly(t *testing.T) { t.Fatal(err) } + ctx := context.Background() firstDir := document.DirHandleFromPath("/test-1") - _, err = ss.JobStore.EnqueueJob(job.Job{ + _, err = ss.JobStore.EnqueueJob(ctx, job.Job{ Func: func(ctx context.Context) error { return nil }, @@ -332,7 +337,7 @@ func TestJobStore_AwaitNextJob_openOnly(t *testing.T) { } secondDir := document.DirHandleFromPath("/test-2") - id2, err := ss.JobStore.EnqueueJob(job.Job{ + id2, err := ss.JobStore.EnqueueJob(ctx, job.Job{ Func: func(ctx context.Context) error { return nil }, @@ -348,8 +353,7 @@ func TestJobStore_AwaitNextJob_openOnly(t *testing.T) { t.Fatal(err) } - ctx := context.Background() - nextId, j, err := ss.JobStore.AwaitNextJob(ctx, job.HighPriority) + ctx, nextId, j, err := ss.JobStore.AwaitNextJob(ctx, job.HighPriority) if err != nil { t.Fatal(err) } @@ -368,7 +372,7 @@ func TestJobStore_AwaitNextJob_openOnly(t *testing.T) { ctx, cancelFunc := context.WithTimeout(ctx, 250*time.Millisecond) t.Cleanup(cancelFunc) - nextId, j, err = ss.JobStore.AwaitNextJob(ctx, job.HighPriority) + ctx, nextId, j, err = ss.JobStore.AwaitNextJob(ctx, job.HighPriority) if err != nil { if !errors.Is(err, context.DeadlineExceeded) { t.Fatalf("%#v", err) @@ -382,8 +386,9 @@ func TestJobStore_AwaitNextJob_highPriority(t *testing.T) { t.Fatal(err) } + ctx := context.Background() firstDir := document.DirHandleFromPath("/test-1") - id1, err := ss.JobStore.EnqueueJob(job.Job{ + id1, err := ss.JobStore.EnqueueJob(ctx, job.Job{ Func: func(ctx context.Context) error { return nil }, @@ -396,7 +401,7 @@ func TestJobStore_AwaitNextJob_highPriority(t *testing.T) { } secondDir := document.DirHandleFromPath("/test-2") - id2, err := ss.JobStore.EnqueueJob(job.Job{ + id2, err := ss.JobStore.EnqueueJob(ctx, job.Job{ Func: func(ctx context.Context) error { return nil }, @@ -412,8 +417,7 @@ func TestJobStore_AwaitNextJob_highPriority(t *testing.T) { t.Fatal(err) } - ctx := context.Background() - nextId, j, err := ss.JobStore.AwaitNextJob(ctx, job.HighPriority) + ctx, nextId, j, err := ss.JobStore.AwaitNextJob(ctx, job.HighPriority) if err != nil { t.Fatal(err) } @@ -430,7 +434,7 @@ func TestJobStore_AwaitNextJob_highPriority(t *testing.T) { t.Fatalf("expected next job dir %q, given: %q", "test-type", j.Type) } - nextId, j, err = ss.JobStore.AwaitNextJob(ctx, job.HighPriority) + ctx, nextId, j, err = ss.JobStore.AwaitNextJob(ctx, job.HighPriority) if err != nil { t.Fatal(err) } @@ -449,7 +453,7 @@ func TestJobStore_AwaitNextJob_highPriority(t *testing.T) { ctx, cancelFunc := context.WithTimeout(ctx, 250*time.Millisecond) t.Cleanup(cancelFunc) - nextId, j, err = ss.JobStore.AwaitNextJob(ctx, job.HighPriority) + ctx, nextId, j, err = ss.JobStore.AwaitNextJob(ctx, job.HighPriority) if err != nil { if !errors.Is(err, context.DeadlineExceeded) { t.Fatalf("%#v", err) @@ -463,8 +467,9 @@ func TestJobStore_AwaitNextJob_lowPriority(t *testing.T) { t.Fatal(err) } + ctx := context.Background() firstDir := document.DirHandleFromPath("/test-1") - id1, err := ss.JobStore.EnqueueJob(job.Job{ + id1, err := ss.JobStore.EnqueueJob(ctx, job.Job{ Func: func(ctx context.Context) error { return nil }, @@ -476,7 +481,7 @@ func TestJobStore_AwaitNextJob_lowPriority(t *testing.T) { } secondDir := document.DirHandleFromPath("/test-2") - id2, err := ss.JobStore.EnqueueJob(job.Job{ + id2, err := ss.JobStore.EnqueueJob(ctx, job.Job{ Func: func(ctx context.Context) error { return nil }, @@ -497,7 +502,7 @@ func TestJobStore_AwaitNextJob_lowPriority(t *testing.T) { ctx, cancelFunc := context.WithTimeout(baseCtx, 250*time.Millisecond) t.Cleanup(cancelFunc) - _, _, err = ss.JobStore.AwaitNextJob(ctx, job.HighPriority) + _, _, _, err = ss.JobStore.AwaitNextJob(ctx, job.HighPriority) if err != nil { if !errors.Is(err, context.DeadlineExceeded) { t.Fatalf("%#v", err) @@ -506,7 +511,7 @@ func TestJobStore_AwaitNextJob_lowPriority(t *testing.T) { t.Fatal("expected error") } - nextId, j, err := ss.JobStore.AwaitNextJob(baseCtx, job.LowPriority) + _, nextId, j, err := ss.JobStore.AwaitNextJob(baseCtx, job.LowPriority) if err != nil { t.Fatal(err) } @@ -523,7 +528,7 @@ func TestJobStore_AwaitNextJob_lowPriority(t *testing.T) { t.Fatalf("expected next job dir %q, given: %q", "test-type", j.Type) } - nextId, j, err = ss.JobStore.AwaitNextJob(baseCtx, job.LowPriority) + _, nextId, j, err = ss.JobStore.AwaitNextJob(baseCtx, job.LowPriority) if err != nil { t.Fatal(err) } @@ -542,7 +547,7 @@ func TestJobStore_AwaitNextJob_lowPriority(t *testing.T) { ctx, cancelFunc = context.WithTimeout(baseCtx, 250*time.Millisecond) t.Cleanup(cancelFunc) - nextId, j, err = ss.JobStore.AwaitNextJob(ctx, job.HighPriority) + _, nextId, j, err = ss.JobStore.AwaitNextJob(ctx, job.HighPriority) if err != nil { if !errors.Is(err, context.DeadlineExceeded) { t.Fatalf("%#v", err) @@ -558,7 +563,8 @@ func TestJobStore_WaitForJobs(t *testing.T) { t.Fatal(err) } - id1, err := ss.JobStore.EnqueueJob(job.Job{ + ctx := context.Background() + id1, err := ss.JobStore.EnqueueJob(ctx, job.Job{ Func: func(ctx context.Context) error { return nil }, @@ -576,7 +582,6 @@ func TestJobStore_WaitForJobs(t *testing.T) { } }(ss.JobStore) - ctx := context.Background() err = ss.JobStore.WaitForJobs(ctx, id1) if err != nil { t.Fatal(err) @@ -599,7 +604,8 @@ func TestJobStore_FinishJob_basic(t *testing.T) { t.Fatal(err) } - id1, err := ss.JobStore.EnqueueJob(job.Job{ + ctx := context.Background() + id1, err := ss.JobStore.EnqueueJob(ctx, job.Job{ Func: func(ctx context.Context) error { return nil }, @@ -609,7 +615,7 @@ func TestJobStore_FinishJob_basic(t *testing.T) { if err != nil { t.Fatal(err) } - id2, err := ss.JobStore.EnqueueJob(job.Job{ + id2, err := ss.JobStore.EnqueueJob(ctx, job.Job{ Func: func(ctx context.Context) error { return nil }, @@ -645,7 +651,7 @@ func TestJobStore_FinishJob_defer(t *testing.T) { ids := make(job.IDs, 0) jobStore := ss.JobStore - id, err := jobStore.EnqueueJob(job.Job{ + id, err := jobStore.EnqueueJob(ctx, job.Job{ Func: func(ctx context.Context) error { return nil }, @@ -659,7 +665,8 @@ func TestJobStore_FinishJob_defer(t *testing.T) { return ids, err } - id1, err := ss.JobStore.EnqueueJob(job.Job{ + ctx := context.Background() + id1, err := ss.JobStore.EnqueueJob(ctx, job.Job{ Func: func(ctx context.Context) error { return nil }, @@ -671,7 +678,6 @@ func TestJobStore_FinishJob_defer(t *testing.T) { t.Fatal(err) } - ctx := context.Background() // execute deferred func, which is what scheduler would do deferredIds, err := defer1Func(ctx, nil) if err != nil { @@ -712,7 +718,8 @@ func TestJobStore_FinishJob_dependsOn(t *testing.T) { t.Fatal(err) } - parentId, err := ss.JobStore.EnqueueJob(job.Job{ + ctx := context.Background() + parentId, err := ss.JobStore.EnqueueJob(ctx, job.Job{ Func: func(ctx context.Context) error { return nil }, @@ -723,7 +730,7 @@ func TestJobStore_FinishJob_dependsOn(t *testing.T) { t.Fatal(err) } - childId, err := ss.JobStore.EnqueueJob(job.Job{ + childId, err := ss.JobStore.EnqueueJob(ctx, job.Job{ Func: func(ctx context.Context) error { return nil }, @@ -760,7 +767,7 @@ func TestJobStore_FinishJob_dependsOn(t *testing.T) { ctx, cancelFunc := context.WithTimeout(context.Background(), 1*time.Second) t.Cleanup(cancelFunc) - nextId, j, err := ss.JobStore.AwaitNextJob(ctx, job.LowPriority) + _, nextId, j, err := ss.JobStore.AwaitNextJob(ctx, job.LowPriority) if err != nil { t.Fatal(err) } diff --git a/internal/state/module_changes_test.go b/internal/state/module_changes_test.go index 3f71a3d0..76eeec32 100644 --- a/internal/state/module_changes_test.go +++ b/internal/state/module_changes_test.go @@ -95,7 +95,8 @@ func TestModuleChanges_AwaitNextChangeBatch_maxTimespan(t *testing.T) { modPath := t.TempDir() modHandle := document.DirHandleFromPath(modPath) - _, err = ss.JobStore.EnqueueJob(job.Job{ + ctx := context.Background() + _, err = ss.JobStore.EnqueueJob(ctx, job.Job{ Func: func(ctx context.Context) error { return nil }, @@ -113,7 +114,7 @@ func TestModuleChanges_AwaitNextChangeBatch_maxTimespan(t *testing.T) { // confirm the method gets cancelled with pending job // and less than maximum timespan to wait - ctx, cancelFunc := context.WithTimeout(context.Background(), 100*time.Millisecond) + ctx, cancelFunc := context.WithTimeout(ctx, 100*time.Millisecond) defer cancelFunc() _, err = ss.Modules.AwaitNextChangeBatch(ctx) diff --git a/internal/state/state.go b/internal/state/state.go index 4f6a5a8d..ac45f5e3 100644 --- a/internal/state/state.go +++ b/internal/state/state.go @@ -27,6 +27,8 @@ const ( providerIdsTableName = "provider_ids" walkerPathsTableName = "walker_paths" registryModuleTableName = "registry_module" + + tracerName = "github.com/hashicorp/terraform-ls/internal/state" ) var dbSchema = &memdb.DBSchema{ diff --git a/internal/state/walker_paths.go b/internal/state/walker_paths.go index 46f0c76c..5dd4f782 100644 --- a/internal/state/walker_paths.go +++ b/internal/state/walker_paths.go @@ -12,6 +12,7 @@ import ( "github.com/hashicorp/go-memdb" "github.com/hashicorp/terraform-ls/internal/document" + "go.opentelemetry.io/otel/trace" ) type WalkerPathStore struct { @@ -24,9 +25,13 @@ type WalkerPathStore struct { } type WalkerPath struct { - Dir document.DirHandle - IsDirOpen bool - State PathState + Dir document.DirHandle + IsDirOpen bool + State PathState + EnqueueContext trace.SpanContext +} + +type PathContext struct { } //go:generate go run golang.org/x/tools/cmd/stringer -type=PathState -output=path_state_string.go @@ -38,9 +43,15 @@ const ( ) func (wp *WalkerPath) Copy() *WalkerPath { + // This may be an awkward way to copy the SpanContext + // but the upstream doesn't seem to offer any better way. + newCtx := trace.ContextWithSpanContext(context.Background(), wp.EnqueueContext) + spanContext := trace.SpanContextFromContext(newCtx) + return &WalkerPath{ - Dir: wp.Dir, - IsDirOpen: wp.IsDirOpen, + Dir: wp.Dir, + IsDirOpen: wp.IsDirOpen, + EnqueueContext: spanContext, } } @@ -49,12 +60,12 @@ type PathAwaiter struct { openDir bool } -func (pa *PathAwaiter) AwaitNextDir(ctx context.Context) (document.DirHandle, error) { +func (pa *PathAwaiter) AwaitNextDir(ctx context.Context) (context.Context, document.DirHandle, error) { wp, err := pa.wps.AwaitNextDir(ctx, pa.openDir) if err != nil { - return document.DirHandle{}, err + return ctx, document.DirHandle{}, err } - return wp.Dir, nil + return trace.ContextWithSpanContext(ctx, wp.EnqueueContext), wp.Dir, nil } func (pa *PathAwaiter) RemoveDir(dir document.DirHandle) error { @@ -68,7 +79,7 @@ func NewPathAwaiter(wps *WalkerPathStore, openDir bool) *PathAwaiter { } } -func (wps *WalkerPathStore) EnqueueDir(dir document.DirHandle) error { +func (wps *WalkerPathStore) EnqueueDir(ctx context.Context, dir document.DirHandle) error { txn := wps.db.Txn(true) defer txn.Abort() @@ -82,9 +93,10 @@ func (wps *WalkerPathStore) EnqueueDir(dir document.DirHandle) error { } err = txn.Insert(wps.tableName, &WalkerPath{ - Dir: dir, - IsDirOpen: false, - State: PathStateQueued, + Dir: dir, + IsDirOpen: false, + State: PathStateQueued, + EnqueueContext: trace.SpanContextFromContext(ctx), }) if err != nil { return err diff --git a/internal/state/walker_paths_test.go b/internal/state/walker_paths_test.go index 29a7ae48..b69934ab 100644 --- a/internal/state/walker_paths_test.go +++ b/internal/state/walker_paths_test.go @@ -21,12 +21,12 @@ func TestWalkerPathStore_EnqueueDir(t *testing.T) { tmpDir := t.TempDir() dirHandle := document.DirHandleFromPath(tmpDir) - err = ss.WalkerPaths.EnqueueDir(dirHandle) + ctx := context.Background() + err = ss.WalkerPaths.EnqueueDir(ctx, dirHandle) if err != nil { t.Fatal(err) } - ctx := context.Background() wp, err := ss.WalkerPaths.AwaitNextDir(ctx, false) if err != nil { t.Fatal(err) @@ -46,14 +46,15 @@ func TestWalkerPathStore_DequeueDir_queued(t *testing.T) { } tmpDir := t.TempDir() + ctx := context.Background() alphaHandle := document.DirHandleFromPath(filepath.Join(tmpDir, "alpha")) - err = ss.WalkerPaths.EnqueueDir(alphaHandle) + err = ss.WalkerPaths.EnqueueDir(ctx, alphaHandle) if err != nil { t.Fatal(err) } betaHandle := document.DirHandleFromPath(filepath.Join(tmpDir, "beta")) - err = ss.WalkerPaths.EnqueueDir(betaHandle) + err = ss.WalkerPaths.EnqueueDir(ctx, betaHandle) if err != nil { t.Fatal(err) } @@ -63,7 +64,7 @@ func TestWalkerPathStore_DequeueDir_queued(t *testing.T) { t.Fatal(err) } - ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Millisecond) + ctx, cancelFunc := context.WithTimeout(ctx, 10*time.Millisecond) defer cancelFunc() wp, err := ss.WalkerPaths.AwaitNextDir(ctx, false) @@ -95,14 +96,15 @@ func TestWalkerPathStore_DequeueDir_notQueued(t *testing.T) { } tmpDir := t.TempDir() + ctx := context.Background() dirHandle := document.DirHandleFromPath(tmpDir) - err = ss.WalkerPaths.EnqueueDir(dirHandle) + err = ss.WalkerPaths.EnqueueDir(ctx, dirHandle) if err != nil { t.Fatal(err) } - ctx, cancelFunc := context.WithTimeout(context.Background(), 10*time.Millisecond) + ctx, cancelFunc := context.WithTimeout(ctx, 10*time.Millisecond) defer cancelFunc() _, err = ss.WalkerPaths.AwaitNextDir(ctx, false) @@ -123,14 +125,15 @@ func TestWalkerPathStore_RemoveDir(t *testing.T) { } tmpDir := t.TempDir() + ctx := context.Background() alphaHandle := document.DirHandleFromPath(filepath.Join(tmpDir, "alpha")) - err = ss.WalkerPaths.EnqueueDir(alphaHandle) + err = ss.WalkerPaths.EnqueueDir(ctx, alphaHandle) if err != nil { t.Fatal(err) } betaHandle := document.DirHandleFromPath(filepath.Join(tmpDir, "beta")) - err = ss.WalkerPaths.EnqueueDir(betaHandle) + err = ss.WalkerPaths.EnqueueDir(ctx, betaHandle) if err != nil { t.Fatal(err) } @@ -140,8 +143,6 @@ func TestWalkerPathStore_RemoveDir(t *testing.T) { t.Fatal(err) } - ctx := context.Background() - wp, err := ss.WalkerPaths.AwaitNextDir(ctx, false) if err != nil { t.Fatal(err) @@ -179,9 +180,10 @@ func TestWalkerPathStore_AwaitNextDir_openOnly(t *testing.T) { } tmpDir := t.TempDir() + ctx := context.Background() alphaHandle := document.DirHandleFromPath(filepath.Join(tmpDir, "alpha")) - err = ss.WalkerPaths.EnqueueDir(alphaHandle) + err = ss.WalkerPaths.EnqueueDir(ctx, alphaHandle) if err != nil { t.Fatal(err) } @@ -192,12 +194,11 @@ func TestWalkerPathStore_AwaitNextDir_openOnly(t *testing.T) { } betaHandle := document.DirHandleFromPath(filepath.Join(tmpDir, "beta")) - err = ss.WalkerPaths.EnqueueDir(betaHandle) + err = ss.WalkerPaths.EnqueueDir(ctx, betaHandle) if err != nil { t.Fatal(err) } - ctx := context.Background() wp, err := ss.WalkerPaths.AwaitNextDir(ctx, true) if err != nil { t.Fatal(err) @@ -230,14 +231,15 @@ func TestWalkerPathStore_WaitForDirs(t *testing.T) { } tmpDir := t.TempDir() + ctx := context.Background() alphaHandle := document.DirHandleFromPath(filepath.Join(tmpDir, "alpha")) - err = ss.WalkerPaths.EnqueueDir(alphaHandle) + err = ss.WalkerPaths.EnqueueDir(ctx, alphaHandle) if err != nil { t.Fatal(err) } betaHandle := document.DirHandleFromPath(filepath.Join(tmpDir, "beta")) - err = ss.WalkerPaths.EnqueueDir(betaHandle) + err = ss.WalkerPaths.EnqueueDir(ctx, betaHandle) if err != nil { t.Fatal(err) } diff --git a/internal/terraform/exec/exec.go b/internal/terraform/exec/exec.go index cff7497f..1ce1151a 100644 --- a/internal/terraform/exec/exec.go +++ b/internal/terraform/exec/exec.go @@ -15,10 +15,16 @@ import ( "github.com/hashicorp/terraform-exec/tfexec" tfjson "github.com/hashicorp/terraform-json" "github.com/hashicorp/terraform-ls/internal/logging" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" ) var defaultExecTimeout = 30 * time.Second +const tracerName = "github.com/hashicorp/terraform-ls/internal/terraform/exec" + type ctxKey string type Executor struct { @@ -71,6 +77,15 @@ func (e *Executor) contextfulError(ctx context.Context, method string, err error return e.enrichCtxErr(method, err) } +func (e *Executor) setSpanStatus(span trace.Span, err error) { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, "execution returned error") + return + } + span.SetStatus(codes.Ok, "execution successful") +} + func (e *Executor) enrichCtxErr(method string, err error) error { if err == nil { return nil @@ -99,8 +114,13 @@ func (e *Executor) Init(ctx context.Context, opts ...tfexec.InitOption) error { if err != nil { return err } + ctx, span := otel.Tracer(tracerName).Start(ctx, "terraform-exec:Init") + defer span.End() + + err = e.tf.Init(ctx, opts...) + e.setSpanStatus(span, err) - return e.contextfulError(ctx, "Init", e.tf.Init(ctx, opts...)) + return e.contextfulError(ctx, "Init", err) } func (e *Executor) Get(ctx context.Context, opts ...tfexec.GetCmdOption) error { @@ -110,8 +130,13 @@ func (e *Executor) Get(ctx context.Context, opts ...tfexec.GetCmdOption) error { if err != nil { return err } + ctx, span := otel.Tracer(tracerName).Start(ctx, "terraform-exec:Get") + defer span.End() + + err = e.tf.Get(ctx, opts...) + e.setSpanStatus(span, err) - return e.contextfulError(ctx, "Get", e.tf.Get(ctx, opts...)) + return e.contextfulError(ctx, "Get", err) } func (e *Executor) Format(ctx context.Context, input []byte) ([]byte, error) { @@ -122,10 +147,18 @@ func (e *Executor) Format(ctx context.Context, input []byte) ([]byte, error) { return nil, err } + ctx, span := otel.Tracer(tracerName).Start(ctx, "terraform-exec:Format", + trace.WithAttributes(attribute.KeyValue{ + Key: attribute.Key("stdinLength"), + Value: attribute.IntValue(len(input)), + })) + defer span.End() + br := bytes.NewReader(input) buf := bytes.NewBuffer([]byte{}) err = e.tf.Format(ctx, br, buf) + e.setSpanStatus(span, err) return buf.Bytes(), e.contextfulError(ctx, "Format", err) } @@ -138,7 +171,11 @@ func (e *Executor) Validate(ctx context.Context) ([]tfjson.Diagnostic, error) { return []tfjson.Diagnostic{}, err } + ctx, span := otel.Tracer(tracerName).Start(ctx, "terraform-exec:Validate") + defer span.End() + validation, err := e.tf.Validate(ctx) + e.setSpanStatus(span, err) if err != nil { return []tfjson.Diagnostic{}, e.contextfulError(ctx, "Validate", err) } @@ -154,7 +191,12 @@ func (e *Executor) Version(ctx context.Context) (*version.Version, map[string]*v return nil, nil, err } + ctx, span := otel.Tracer(tracerName).Start(ctx, "terraform-exec:Version") + defer span.End() + ver, pv, err := e.tf.Version(ctx, true) + e.setSpanStatus(span, err) + return ver, pv, e.contextfulError(ctx, "Version", err) } @@ -166,6 +208,11 @@ func (e *Executor) ProviderSchemas(ctx context.Context) (*tfjson.ProviderSchemas return nil, err } + ctx, span := otel.Tracer(tracerName).Start(ctx, "terraform-exec:ProviderSchemas") + defer span.End() + ps, err := e.tf.ProvidersSchema(ctx) + e.setSpanStatus(span, err) + return ps, e.contextfulError(ctx, "ProviderSchemas", err) } diff --git a/internal/walker/walker.go b/internal/walker/walker.go index 2e3e0bc6..dc3cfa3c 100644 --- a/internal/walker/walker.go +++ b/internal/walker/walker.go @@ -15,6 +15,10 @@ import ( "github.com/hashicorp/terraform-ls/internal/document" "github.com/hashicorp/terraform-ls/internal/job" "github.com/hashicorp/terraform-ls/internal/terraform/ast" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" ) var ( @@ -54,7 +58,7 @@ type Walker struct { type WalkFunc func(ctx context.Context, modHandle document.DirHandle) (job.IDs, error) type PathStore interface { - AwaitNextDir(ctx context.Context) (document.DirHandle, error) + AwaitNextDir(ctx context.Context) (context.Context, document.DirHandle, error) RemoveDir(dir document.DirHandle) error } @@ -62,6 +66,8 @@ type ModuleStore interface { AddIfNotExists(dir string) error } +const tracerName = "github.com/hashicorp/terraform-ls/internal/walker" + func NewWalker(fs fs.ReadDirFS, pathStore PathStore, modStore ModuleStore, walkFunc WalkFunc) *Walker { return &Walker{ fs: fs, @@ -105,7 +111,7 @@ func (w *Walker) StartWalking(ctx context.Context) error { go func() { for { - nextDir, err := w.pathStore.AwaitNextDir(ctx) + pathCtx, nextDir, err := w.pathStore.AwaitNextDir(ctx) if err != nil { if errors.Is(err, context.Canceled) { return @@ -115,19 +121,38 @@ func (w *Walker) StartWalking(ctx context.Context) error { return } + spanCtx := trace.SpanContextFromContext(pathCtx) + + ctx = trace.ContextWithSpanContext(ctx, spanCtx) + + tracer := otel.Tracer(tracerName) + ctx, span := tracer.Start(ctx, "walk-path", trace.WithAttributes(attribute.KeyValue{ + Key: attribute.Key("URI"), + Value: attribute.StringValue(nextDir.URI), + })) + err = w.walk(ctx, nextDir) if err != nil { w.logger.Printf("walker: walking through %q failed: %s", nextDir, err) w.collectError(err) + span.RecordError(err) + span.SetStatus(codes.Error, "walking failed") + span.End() continue } + span.SetStatus(codes.Ok, "walking finished") + span.End() err = w.pathStore.RemoveDir(nextDir) if err != nil { w.logger.Printf("walker: removing dir %q from queue failed: %s", nextDir, err) w.collectError(err) + span.RecordError(err) + span.SetStatus(codes.Error, "walking failed") + span.End() continue } + w.logger.Printf("walker: walking through %q finished", nextDir) select { diff --git a/internal/walker/walker_test.go b/internal/walker/walker_test.go index 2e88c88d..f6d9b245 100644 --- a/internal/walker/walker_test.go +++ b/internal/walker/walker_test.go @@ -49,12 +49,12 @@ func TestWalker_basic(t *testing.T) { } dir := document.DirHandleFromPath(root) - err = ss.WalkerPaths.EnqueueDir(dir) + ctx := context.Background() + err = ss.WalkerPaths.EnqueueDir(ctx, dir) if err != nil { t.Fatal(err) } - ctx := context.Background() err = w.StartWalking(ctx) if err != nil { t.Fatal(err) @@ -395,7 +395,7 @@ func TestWalker_complexModules(t *testing.T) { w.Collector = NewWalkerCollector() w.SetLogger(testLogger()) dir := document.DirHandleFromPath(tc.root) - err = ss.WalkerPaths.EnqueueDir(dir) + err = ss.WalkerPaths.EnqueueDir(ctx, dir) if err != nil { t.Fatal(err) }