From ca8151eae7480354c2766d46a7106cdb82b3a308 Mon Sep 17 00:00:00 2001 From: Alper Rifat Ulucinar Date: Fri, 3 Mar 2023 12:33:42 +0300 Subject: [PATCH] Add upjet runtime Prometheus metrics: - upjet_terraform_cli_duration: Reports statistics, in seconds, on how long it takes a Terraform CLI invocation to complete - upjet_terraform_active_cli_invocations: The number of active (running) Terraform CLI invocations - upjet_terraform_running_processes: The number of running Terraform CLI and Terraform provider processes - upjet_resource_ttr: Measures, in seconds, the time-to-readiness for managed resources - terraform.Operation.MarkStart now atomically checks for any previous ongoing operation before starting a new one - terraform.Operation.{Start,End}Time no longer return pointers that could potentially be used to modify the shared state outside of critical sections. Signed-off-by: Alper Rifat Ulucinar --- go.mod | 3 +- go.sum | 2 + pkg/controller/external.go | 10 +++- pkg/controller/external_test.go | 2 +- pkg/metrics/metrics.go | 93 +++++++++++++++++++++++++++++++++ pkg/terraform/operation.go | 18 ++++--- pkg/terraform/operation_test.go | 2 +- pkg/terraform/store.go | 60 ++++++++++++++++----- pkg/terraform/workspace.go | 63 ++++++++++------------ pkg/terraform/workspace_test.go | 2 +- 10 files changed, 197 insertions(+), 58 deletions(-) create mode 100644 pkg/metrics/metrics.go diff --git a/go.mod b/go.mod index 6a20f16a..6007cc19 100644 --- a/go.mod +++ b/go.mod @@ -14,8 +14,10 @@ require ( github.com/hashicorp/terraform-plugin-sdk/v2 v2.24.0 github.com/iancoleman/strcase v0.2.0 github.com/json-iterator/go v1.1.12 + github.com/mitchellh/go-ps v1.0.0 github.com/muvaf/typewriter v0.0.0-20210910160850-80e49fe1eb32 github.com/pkg/errors v0.9.1 + github.com/prometheus/client_golang v1.12.2 github.com/spf13/afero v1.9.2 github.com/tmccombs/hcl2json v0.3.3 github.com/yuin/goldmark v1.4.13 @@ -79,7 +81,6 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/prometheus/client_golang v1.12.2 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/procfs v0.7.3 // indirect diff --git a/go.sum b/go.sum index 59ef694b..90bfc1e4 100644 --- a/go.sum +++ b/go.sum @@ -411,6 +411,8 @@ github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa1 github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s= github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mitchellh/go-ps v1.0.0 h1:i6ampVEEF4wQFF+bkYfwYgY+F/uYJDktmvLPf7qIgjc= +github.com/mitchellh/go-ps v1.0.0/go.mod h1:J4lOc8z8yJs6vUwklHw2XEIiT4z4C40KtWVN3nvg8Pg= github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= github.com/mitchellh/go-testing-interface v1.14.1 h1:jrgshOhYAUVNMAJiKbEu7EqAwgJJ2JqpQmpLJOu07cU= github.com/mitchellh/go-testing-interface v1.14.1/go.mod h1:gfgS7OtZj6MA4U1UrDRp04twqAjfvlZyCfX3sDjEym8= diff --git a/pkg/controller/external.go b/pkg/controller/external.go index 90043a89..fb8e8ac2 100644 --- a/pkg/controller/external.go +++ b/pkg/controller/external.go @@ -6,6 +6,7 @@ package controller import ( "context" + "time" xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" @@ -14,6 +15,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/upbound/upjet/pkg/config" + "github.com/upbound/upjet/pkg/metrics" "github.com/upbound/upjet/pkg/resource" "github.com/upbound/upjet/pkg/resource/json" "github.com/upbound/upjet/pkg/terraform" @@ -113,7 +115,7 @@ func (e *external) Observe(ctx context.Context, mg xpresource.Managed) (managed. return managed.ExternalObservation{}, errors.Wrap(err, errRefresh) } switch { - case res.IsApplying, res.IsDestroying: + case res.ASyncInProgress: mg.SetConditions(resource.AsyncOperationOngoingCondition()) return managed.ExternalObservation{ ResourceExists: true, @@ -179,6 +181,7 @@ func (e *external) Observe(ctx context.Context, mg xpresource.Managed) (managed. }, nil // we prioritize status updates over late-init'ed spec updates case !markedAvailable: + addTTR(tr) tr.SetConditions(xpv1.Available()) return managed.ExternalObservation{ ResourceExists: true, @@ -211,6 +214,11 @@ func (e *external) Observe(ctx context.Context, mg xpresource.Managed) (managed. } } +func addTTR(mg xpresource.Managed) { + gvk := mg.GetObjectKind().GroupVersionKind() + metrics.TTRMeasurements.WithLabelValues(gvk.Group, gvk.Version, gvk.Kind, mg.GetName(), string(mg.GetUID())).Observe(time.Since(mg.GetCreationTimestamp().Time).Seconds()) +} + func (e *external) Create(ctx context.Context, mg xpresource.Managed) (managed.ExternalCreation, error) { if e.config.UseAsync { return managed.ExternalCreation{}, errors.Wrap(e.workspace.ApplyAsync(e.callback.Apply(mg.GetName())), errStartAsyncApply) diff --git a/pkg/controller/external_test.go b/pkg/controller/external_test.go index aac89880..7bc4ce3c 100644 --- a/pkg/controller/external_test.go +++ b/pkg/controller/external_test.go @@ -225,7 +225,7 @@ func TestObserve(t *testing.T) { w: WorkspaceFns{ RefreshFn: func(_ context.Context) (terraform.RefreshResult, error) { return terraform.RefreshResult{ - IsApplying: true, + ASyncInProgress: true, }, nil }, }, diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go new file mode 100644 index 00000000..d05dfafd --- /dev/null +++ b/pkg/metrics/metrics.go @@ -0,0 +1,93 @@ +// Copyright 2023 Upbound Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metrics + +import ( + "github.com/prometheus/client_golang/prometheus" + "sigs.k8s.io/controller-runtime/pkg/metrics" +) + +const ( + promNSUpjet = "upjet" + promSysTF = "terraform" + promSysResource = "resource" + // TODO(aru): we will probably remove this label from collected metrics + // as it has potentially high cardinality. + promLabelWS = "workspace" +) + +// ExecMode is the Terraform CLI execution mode label +type ExecMode int + +const ( + // ModeSync represents the synchronous execution mode + ModeSync ExecMode = iota + // ModeASync represents the asynchronous execution mode + ModeASync +) + +// String converts an execMode to string +func (em ExecMode) String() string { + switch em { + case ModeSync: + return "sync" + case ModeASync: + return "async" + default: + return "unknown" + } +} + +var ( + // CLITime is the Terraform CLI execution times histogram. + CLITime = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: promNSUpjet, + Subsystem: promSysTF, + Name: "cli_duration", + Help: "Measures in seconds how long it takes a Terraform CLI invocation to complete", + Buckets: []float64{1.0, 3, 5, 10, 15, 30, 60, 120, 300}, + }, []string{promLabelWS, "subcommand", "mode"}) + + // CLIExecutions are the active number of terraform CLI invocations. + CLIExecutions = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNSUpjet, + Subsystem: promSysTF, + Name: "active_cli_invocations", + Help: "The number of active (running) Terraform CLI invocations", + }, []string{promLabelWS, "subcommand", "mode"}) + + // TFProcesses are the active number of + // terraform CLI & Terraform provider processes running. + TFProcesses = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: promNSUpjet, + Subsystem: promSysTF, + Name: "running_processes", + Help: "The number of running Terraform CLI and Terraform provider processes", + }, []string{"type"}) + + // TTRMeasurements are the time-to-readiness measurements for + // the managed resources. + TTRMeasurements = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: promNSUpjet, + Subsystem: promSysResource, + Name: "ttr", + Help: "Measures in seconds the time-to-readiness for managed resources", + Buckets: []float64{10, 15, 30, 60, 120, 300, 600, 1800, 3600}, + }, []string{"group", "version", "kind", "name", "uid"}) +) + +func init() { + metrics.Registry.MustRegister(CLITime, CLIExecutions, TFProcesses, TTRMeasurements) +} diff --git a/pkg/terraform/operation.go b/pkg/terraform/operation.go index 90aeb586..4f199c18 100644 --- a/pkg/terraform/operation.go +++ b/pkg/terraform/operation.go @@ -18,14 +18,20 @@ type Operation struct { mu sync.RWMutex } -// MarkStart marks the operation as started. -func (o *Operation) MarkStart(t string) { +// MarkStart marks the operation as started atomically after checking +// no previous operation is already running. +// Returns `false` if a previous operation is still in progress. +func (o *Operation) MarkStart(t string) bool { o.mu.Lock() defer o.mu.Unlock() + if o.startTime != nil && o.endTime == nil { + return false + } now := time.Now() o.Type = t o.startTime = &now o.endTime = nil + return true } // MarkEnd marks the operation as ended. @@ -60,15 +66,15 @@ func (o *Operation) IsRunning() bool { } // StartTime returns the start time of the current operation. -func (o *Operation) StartTime() *time.Time { +func (o *Operation) StartTime() time.Time { o.mu.RLock() defer o.mu.RUnlock() - return o.startTime + return *o.startTime } // EndTime returns the end time of the current operation. -func (o *Operation) EndTime() *time.Time { +func (o *Operation) EndTime() time.Time { o.mu.RLock() defer o.mu.RUnlock() - return o.endTime + return *o.endTime } diff --git a/pkg/terraform/operation_test.go b/pkg/terraform/operation_test.go index 8c1a3613..a630e3df 100644 --- a/pkg/terraform/operation_test.go +++ b/pkg/terraform/operation_test.go @@ -60,7 +60,7 @@ func TestOperation(t *testing.T) { }, want: want{ checks: func(o *Operation) bool { - return o.Type == "" && o.StartTime() == nil && o.EndTime() == nil + return o.Type == "" && o.startTime == nil && o.endTime == nil }, result: true, }, diff --git a/pkg/terraform/store.go b/pkg/terraform/store.go index 54004717..1c231c88 100644 --- a/pkg/terraform/store.go +++ b/pkg/terraform/store.go @@ -21,9 +21,11 @@ import ( "path/filepath" "strings" "sync" + "time" "github.com/crossplane/crossplane-runtime/pkg/logging" xpresource "github.com/crossplane/crossplane-runtime/pkg/resource" + "github.com/mitchellh/go-ps" "github.com/pkg/errors" "github.com/spf13/afero" "k8s.io/apimachinery/pkg/types" @@ -31,6 +33,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/upbound/upjet/pkg/config" + "github.com/upbound/upjet/pkg/metrics" "github.com/upbound/upjet/pkg/resource" ) @@ -109,6 +112,15 @@ func WithProviderRunner(pr ProviderRunner) WorkspaceStoreOption { } } +// WithProcessReportInterval enables the upjet.terraform.running_processes +// metric, which periodically reports the total number of Terraform CLI and +// Terraform provider processes in the system. +func WithProcessReportInterval(d time.Duration) WorkspaceStoreOption { + return func(ws *WorkspaceStore) { + ws.processReportInterval = d + } +} + // NewWorkspaceStore returns a new WorkspaceStore. func NewWorkspaceStore(l logging.Logger, opts ...WorkspaceStoreOption) *WorkspaceStore { ws := &WorkspaceStore{ @@ -122,6 +134,9 @@ func NewWorkspaceStore(l logging.Logger, opts ...WorkspaceStoreOption) *Workspac for _, f := range opts { f(ws) } + if ws.processReportInterval != 0 { + go ws.reportTFProcesses(ws.processReportInterval) + } return ws } @@ -131,13 +146,13 @@ type WorkspaceStore struct { // Since there can be multiple calls that add/remove values from the map at // the same time, it has to be safe for concurrency since those operations // cause rehashing in some cases. - store map[types.UID]*Workspace - logger logging.Logger - providerRunner ProviderRunner - mu sync.Mutex - - fs afero.Afero - executor exec.Interface + store map[types.UID]*Workspace + logger logging.Logger + providerRunner ProviderRunner + mu sync.Mutex + processReportInterval time.Duration + fs afero.Afero + executor exec.Interface } // Workspace makes sure the Terraform workspace for the given resource is ready @@ -176,9 +191,7 @@ func (ws *WorkspaceStore) Workspace(ctx context.Context, c resource.SecretClient return nil, errors.Wrap(err, "cannot write main tf file") } if isNeedProviderUpgrade { - cmd := w.executor.CommandContext(ctx, "terraform", "init", "-upgrade", "-input=false") - cmd.SetDir(w.dir) - out, err := cmd.CombinedOutput() + out, err := w.runTF(ctx, metrics.ModeSync, "init", "-upgrade", "-input=false") w.logger.Debug("init -upgrade ended", "out", ts.filterSensitiveInformation(string(out))) if err != nil { return w, errors.Wrapf(err, "cannot upgrade workspace: %s", ts.filterSensitiveInformation(string(out))) @@ -198,9 +211,7 @@ func (ws *WorkspaceStore) Workspace(ctx context.Context, c resource.SecretClient if !os.IsNotExist(err) { return w, nil } - cmd := w.executor.CommandContext(ctx, "terraform", "init", "-input=false") - cmd.SetDir(w.dir) - out, err := cmd.CombinedOutput() + out, err := w.runTF(ctx, metrics.ModeSync, "init", "-input=false") w.logger.Debug("init ended", "out", ts.filterSensitiveInformation(string(out))) return w, errors.Wrapf(err, "cannot init workspace: %s", ts.filterSensitiveInformation(string(out))) } @@ -229,3 +240,26 @@ func (ts Setup) filterSensitiveInformation(s string) string { } return s } + +func (ws *WorkspaceStore) reportTFProcesses(interval time.Duration) { + t := time.NewTicker(interval) + for range t.C { + processes, err := ps.Processes() + if err != nil { + ws.logger.Debug("Failed to list processes", "err", err) + continue + } + cliCount, providerCount := 0.0, 0.0 + for _, p := range processes { + e := p.Executable() + switch { + case e == "terraform": + cliCount++ + case strings.HasPrefix(e, "terraform-"): + providerCount++ + } + } + metrics.TFProcesses.WithLabelValues("cli").Set(cliCount) + metrics.TFProcesses.WithLabelValues("provider").Set(providerCount) + } +} diff --git a/pkg/terraform/workspace.go b/pkg/terraform/workspace.go index 3fd0511e..242f61d6 100644 --- a/pkg/terraform/workspace.go +++ b/pkg/terraform/workspace.go @@ -27,6 +27,7 @@ import ( "github.com/crossplane/crossplane-runtime/pkg/logging" + "github.com/upbound/upjet/pkg/metrics" "github.com/upbound/upjet/pkg/resource/json" tferrors "github.com/upbound/upjet/pkg/terraform/errors" ) @@ -110,17 +111,13 @@ type Workspace struct { // ApplyAsync makes a terraform apply call without blocking and calls the given // function once that apply call finishes. func (w *Workspace) ApplyAsync(callback CallbackFn) error { - if w.LastOperation.IsRunning() { + if !w.LastOperation.MarkStart("apply") { return errors.Errorf("%s operation that started at %s is still running", w.LastOperation.Type, w.LastOperation.StartTime().String()) } - w.LastOperation.MarkStart("apply") ctx, cancel := context.WithDeadline(context.TODO(), w.LastOperation.StartTime().Add(defaultAsyncTimeout)) go func() { defer cancel() - cmd := w.executor.CommandContext(ctx, "terraform", "apply", "-auto-approve", "-input=false", "-lock=false", "-json") - cmd.SetEnv(append(os.Environ(), w.env...)) - cmd.SetDir(w.dir) - out, err := cmd.CombinedOutput() + out, err := w.runTF(ctx, metrics.ModeASync, "apply", "-auto-approve", "-input=false", "-lock=false", "-json") if err != nil { err = tferrors.NewApplyFailed(out) } @@ -145,10 +142,7 @@ func (w *Workspace) Apply(ctx context.Context) (ApplyResult, error) { if w.LastOperation.IsRunning() { return ApplyResult{}, errors.Errorf("%s operation that started at %s is still running", w.LastOperation.Type, w.LastOperation.StartTime().String()) } - cmd := w.executor.CommandContext(ctx, "terraform", "apply", "-auto-approve", "-input=false", "-lock=false", "-json") - cmd.SetEnv(append(os.Environ(), w.env...)) - cmd.SetDir(w.dir) - out, err := cmd.CombinedOutput() + out, err := w.runTF(ctx, metrics.ModeSync, "apply", "-auto-approve", "-input=false", "-lock=false", "-json") w.logger.Debug("apply ended", "out", w.filterFn(string(out))) if err != nil { return ApplyResult{}, tferrors.NewApplyFailed(out) @@ -175,17 +169,13 @@ func (w *Workspace) DestroyAsync(callback CallbackFn) error { return nil // We cannot run destroy until current non-destroy operation is completed. // TODO(muvaf): Gracefully terminate the ongoing apply operation? - case w.LastOperation.IsRunning(): + case !w.LastOperation.MarkStart("destroy"): return errors.Errorf("%s operation that started at %s is still running", w.LastOperation.Type, w.LastOperation.StartTime().String()) } - w.LastOperation.MarkStart("destroy") ctx, cancel := context.WithDeadline(context.TODO(), w.LastOperation.StartTime().Add(defaultAsyncTimeout)) go func() { defer cancel() - cmd := w.executor.CommandContext(ctx, "terraform", "destroy", "-auto-approve", "-input=false", "-lock=false", "-json") - cmd.SetEnv(append(os.Environ(), w.env...)) - cmd.SetDir(w.dir) - out, err := cmd.CombinedOutput() + out, err := w.runTF(ctx, metrics.ModeASync, "destroy", "-auto-approve", "-input=false", "-lock=false", "-json") if err != nil { err = tferrors.NewDestroyFailed(out) } @@ -205,10 +195,7 @@ func (w *Workspace) Destroy(ctx context.Context) error { if w.LastOperation.IsRunning() { return errors.Errorf("%s operation that started at %s is still running", w.LastOperation.Type, w.LastOperation.StartTime().String()) } - cmd := w.executor.CommandContext(ctx, "terraform", "destroy", "-auto-approve", "-input=false", "-lock=false", "-json") - cmd.SetEnv(append(os.Environ(), w.env...)) - cmd.SetDir(w.dir) - out, err := cmd.CombinedOutput() + out, err := w.runTF(ctx, metrics.ModeSync, "destroy", "-auto-approve", "-input=false", "-lock=false", "-json") w.logger.Debug("destroy ended", "out", w.filterFn(string(out))) if err != nil { return tferrors.NewDestroyFailed(out) @@ -218,10 +205,9 @@ func (w *Workspace) Destroy(ctx context.Context) error { // RefreshResult contains information about the current state of the resource. type RefreshResult struct { - Exists bool - IsApplying bool - IsDestroying bool - State *json.StateV4 + Exists bool + ASyncInProgress bool + State *json.StateV4 } // Refresh makes a blocking terraform apply -refresh-only call where only the state file @@ -230,16 +216,12 @@ func (w *Workspace) Refresh(ctx context.Context) (RefreshResult, error) { switch { case w.LastOperation.IsRunning(): return RefreshResult{ - IsApplying: w.LastOperation.Type == "apply", - IsDestroying: w.LastOperation.Type == "destroy", + ASyncInProgress: w.LastOperation.Type == "apply" || w.LastOperation.Type == "destroy", }, nil case w.LastOperation.IsEnded(): defer w.LastOperation.Flush() } - cmd := w.executor.CommandContext(ctx, "terraform", "apply", "-refresh-only", "-auto-approve", "-input=false", "-lock=false", "-json") - cmd.SetEnv(append(os.Environ(), w.env...)) - cmd.SetDir(w.dir) - out, err := cmd.CombinedOutput() + out, err := w.runTF(ctx, metrics.ModeSync, "apply", "-refresh-only", "-auto-approve", "-input=false", "-lock=false", "-json") w.logger.Debug("refresh ended", "out", w.filterFn(string(out))) if err != nil { return RefreshResult{}, tferrors.NewRefreshFailed(out) @@ -271,10 +253,7 @@ func (w *Workspace) Plan(ctx context.Context) (PlanResult, error) { if w.LastOperation.IsRunning() { return PlanResult{}, errors.Errorf("%s operation that started at %s is still running", w.LastOperation.Type, w.LastOperation.StartTime().String()) } - cmd := w.executor.CommandContext(ctx, "terraform", "plan", "-refresh=false", "-input=false", "-lock=false", "-json") - cmd.SetEnv(append(os.Environ(), w.env...)) - cmd.SetDir(w.dir) - out, err := cmd.CombinedOutput() + out, err := w.runTF(ctx, metrics.ModeSync, "plan", "-refresh=false", "-input=false", "-lock=false", "-json") w.logger.Debug("plan ended", "out", w.filterFn(string(out))) if err != nil { return PlanResult{}, tferrors.NewPlanFailed(out) @@ -304,3 +283,19 @@ func (w *Workspace) Plan(ctx context.Context) (PlanResult, error) { UpToDate: p.Changes.Change == 0, }, nil } + +func (w *Workspace) runTF(ctx context.Context, execMode metrics.ExecMode, args ...string) ([]byte, error) { + if len(args) < 1 { + return nil, errors.New("args cannot be empty") + } + cmd := w.executor.CommandContext(ctx, "terraform", args...) + cmd.SetEnv(append(os.Environ(), w.env...)) + cmd.SetDir(w.dir) + metrics.CLIExecutions.WithLabelValues(filepath.Base(w.dir), args[0], execMode.String()).Inc() + start := time.Now() + defer func() { + metrics.CLITime.WithLabelValues(filepath.Base(w.dir), args[0], execMode.String()).Observe(time.Since(start).Seconds()) + metrics.CLIExecutions.WithLabelValues(filepath.Base(w.dir), args[0], execMode.String()).Dec() + }() + return cmd.CombinedOutput() +} diff --git a/pkg/terraform/workspace_test.go b/pkg/terraform/workspace_test.go index acd5c823..cfef66f7 100644 --- a/pkg/terraform/workspace_test.go +++ b/pkg/terraform/workspace_test.go @@ -219,7 +219,7 @@ func TestWorkspaceRefresh(t *testing.T) { }, want: want{ r: RefreshResult{ - IsApplying: true, + ASyncInProgress: true, }, }, },