From 68c9112dce274be3988b30ce297eb8328e8ce13d Mon Sep 17 00:00:00 2001 From: Alper Rifat Ulucinar Date: Tue, 14 Mar 2023 19:16:52 +0300 Subject: [PATCH 1/4] Add terraform.ProviderScheduler that manages the lifecycles of ProviderRunners Signed-off-by: Alper Rifat Ulucinar --- pkg/terraform/files.go | 10 +- pkg/terraform/provider_runner.go | 87 ++++++++++++++--- pkg/terraform/provider_scheduler.go | 139 ++++++++++++++++++++++++++++ pkg/terraform/store.go | 97 +++++++++++++++---- pkg/terraform/workspace.go | 11 ++- 5 files changed, 306 insertions(+), 38 deletions(-) create mode 100644 pkg/terraform/provider_scheduler.go diff --git a/pkg/terraform/files.go b/pkg/terraform/files.go index 459b646b..12ec9b4e 100644 --- a/pkg/terraform/files.go +++ b/pkg/terraform/files.go @@ -94,7 +94,7 @@ type FileProducer struct { // WriteMainTF writes the content main configuration file that has the desired // state configuration for Terraform. -func (fp *FileProducer) WriteMainTF() error { +func (fp *FileProducer) WriteMainTF() (ProviderHandle, error) { // If the resource is in a deletion process, we need to remove the deletion // protection. fp.parameters["lifecycle"] = map[string]bool{ @@ -129,9 +129,13 @@ func (fp *FileProducer) WriteMainTF() error { } rawMainTF, err := json.JSParser.Marshal(m) if err != nil { - return errors.Wrap(err, "cannot marshal main hcl object") + return InvalidProviderHandle, errors.Wrap(err, "cannot marshal main hcl object") } - return errors.Wrap(fp.fs.WriteFile(filepath.Join(fp.Dir, "main.tf.json"), rawMainTF, 0600), errWriteMainTFFile) + h, err := fp.Setup.Configuration.ToProviderHandle() + if err != nil { + return InvalidProviderHandle, errors.Wrap(err, "cannot get scheduler handle") + } + return h, errors.Wrap(fp.fs.WriteFile(filepath.Join(fp.Dir, "main.tf.json"), rawMainTF, 0600), errWriteMainTFFile) } // EnsureTFState writes the Terraform state that should exist in the filesystem diff --git a/pkg/terraform/provider_runner.go b/pkg/terraform/provider_runner.go index 75a1953d..72272a9c 100644 --- a/pkg/terraform/provider_runner.go +++ b/pkg/terraform/provider_runner.go @@ -1,6 +1,16 @@ -/* -Copyright 2022 Upbound Inc. -*/ +// Copyright 2022 Upbound Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. package terraform @@ -44,6 +54,7 @@ var ( // gRPC server mode type ProviderRunner interface { Start() (string, error) + Stop() error } // NoOpProviderRunner is a no-op ProviderRunner @@ -59,6 +70,11 @@ func (NoOpProviderRunner) Start() (string, error) { return "", nil } +// Stop takes no action +func (NoOpProviderRunner) Stop() error { + return nil +} + // SharedProvider runs the configured native provider plugin // using the supplied command-line args type SharedProvider struct { @@ -71,6 +87,7 @@ type SharedProvider struct { executor exec.Interface clock clock.Clock mu *sync.Mutex + stopCh chan bool } // SharedGRPCRunnerOption lets you configure the shared gRPC runner. @@ -98,17 +115,30 @@ func WithProtocolVersion(protocolVersion int) SharedGRPCRunnerOption { } } -// NewSharedProvider instantiates a SharedProvider with an -// OS executor using the supplied logger -func NewSharedProvider(l logging.Logger, nativeProviderPath, nativeProviderName string, opts ...SharedGRPCRunnerOption) *SharedProvider { +// WithNativeProviderPath configures the Terraform provider executable path +// for the runner. +func WithNativeProviderPath(p string) SharedGRPCRunnerOption { + return func(sr *SharedProvider) { + sr.nativeProviderPath = p + } +} + +// WithNativeProviderName configures the Terraform provider name +// for the runner. +func WithNativeProviderName(n string) SharedGRPCRunnerOption { + return func(sr *SharedProvider) { + sr.nativeProviderName = n + } +} + +// NewSharedProvider instantiates a SharedProvider runner with an +// OS executor using the supplied options. +func NewSharedProvider(opts ...SharedGRPCRunnerOption) *SharedProvider { sr := &SharedProvider{ - logger: l, - nativeProviderPath: nativeProviderPath, - nativeProviderName: nativeProviderName, - protocolVersion: defaultProtocolVersion, - executor: exec.New(), - clock: clock.RealClock{}, - mu: &sync.Mutex{}, + protocolVersion: defaultProtocolVersion, + executor: exec.New(), + clock: clock.RealClock{}, + mu: &sync.Mutex{}, } for _, o := range opts { o(sr) @@ -131,6 +161,7 @@ func (sr *SharedProvider) Start() (string, error) { //nolint:gocyclo } errCh := make(chan error, 1) reattachCh := make(chan string, 1) + sr.stopCh = make(chan bool, 1) go func() { defer close(errCh) @@ -162,9 +193,24 @@ func (sr *SharedProvider) Start() (string, error) { //nolint:gocyclo reattachCh <- fmt.Sprintf(fmtReattachEnv, sr.nativeProviderName, sr.protocolVersion, os.Getpid(), matches[1]) break } - if err := cmd.Wait(); err != nil { + + waitErrCh := make(chan error, 1) + go func() { + defer close(waitErrCh) + waitErrCh <- cmd.Wait() + }() + select { + case err := <-waitErrCh: log.Info("Native Terraform provider process error", "error", err) errCh <- err + case <-sr.stopCh: + defer func() { + // we have observed a panic in k8s.io/utils/exec.Cmd.Stop() + if r := recover(); r != nil { + sr.logger.Info("Recovered from a panic in SharedProvider.Stop: %v", r) + } + }() + cmd.Stop() } }() @@ -178,3 +224,16 @@ func (sr *SharedProvider) Start() (string, error) { //nolint:gocyclo return "", errors.Errorf(errFmtTimeout, reattachTimeout) } } + +// Stop attempts to stop a shared gRPC server if it's already running. +func (sr *SharedProvider) Stop() error { + sr.mu.Lock() + defer sr.mu.Unlock() + if sr.stopCh == nil { + return errors.New("shared provider process not started yet") + } + sr.stopCh <- true + close(sr.stopCh) + sr.stopCh = nil + return nil +} diff --git a/pkg/terraform/provider_scheduler.go b/pkg/terraform/provider_scheduler.go new file mode 100644 index 00000000..e1d3fc26 --- /dev/null +++ b/pkg/terraform/provider_scheduler.go @@ -0,0 +1,139 @@ +// Copyright 2023 Upbound Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package terraform + +import ( + "sync" + + "github.com/crossplane/crossplane-runtime/pkg/logging" + "github.com/pkg/errors" +) + +type ProviderHandle string + +const ( + InvalidProviderHandle ProviderHandle = "" + + ttlBudget = 0.1 +) + +type ProviderScheduler interface { + Start(ProviderHandle) (InUse, string, error) +} + +type InUse interface { + Increment() error + Decrement() +} + +type NoOpProviderScheduler struct{} + +func NewNoOpProviderScheduler() NoOpProviderScheduler { + return NoOpProviderScheduler{} +} + +func (NoOpProviderScheduler) Start(ProviderHandle) (InUse, string, error) { + return nil, "", nil +} + +type schedulerEntry struct { + ProviderRunner + inUse int + invocationCount int +} + +type providerInUse struct { + scheduler *SharedProviderScheduler + handle ProviderHandle +} + +func (p *providerInUse) Increment() error { + p.scheduler.mu.Lock() + defer p.scheduler.mu.Unlock() + r := p.scheduler.runners[p.handle] + if r == nil { + return errors.Errorf("cannot mark provider runner as in-use with handle: %s", p.handle) + } + r.inUse++ + r.invocationCount++ + return nil +} + +func (p *providerInUse) Decrement() { + p.scheduler.mu.Lock() + defer p.scheduler.mu.Unlock() + if p.scheduler.runners[p.handle].inUse == 0 { + return + } + p.scheduler.runners[p.handle].inUse-- +} + +type SharedProviderScheduler struct { + runnerOpts []SharedGRPCRunnerOption + runners map[ProviderHandle]*schedulerEntry + ttl int + mu *sync.Mutex + logger logging.Logger +} + +func NewSharedProviderScheduler(l logging.Logger, ttl int, opts ...SharedGRPCRunnerOption) *SharedProviderScheduler { + return &SharedProviderScheduler{ + runnerOpts: opts, + mu: &sync.Mutex{}, + runners: make(map[ProviderHandle]*schedulerEntry), + logger: l, + ttl: ttl, + } +} + +func (s *SharedProviderScheduler) Start(h ProviderHandle) (InUse, string, error) { + s.mu.Lock() + defer s.mu.Unlock() + + r := s.runners[h] + logger := s.logger.WithValues("handle", h, "ttl", s.ttl, "ttlBudget", ttlBudget) + switch { + case r != nil && (r.invocationCount < s.ttl || r.inUse > 0): + if r.invocationCount > int(float64(s.ttl)*(1+ttlBudget)) { + logger.Debug("Reuse budget has been exceeded. Caller will need to retry.") + return nil, "", errors.Errorf("native provider reuse budget has been exceeded: invocationCount: %d, ttl: %d", r.invocationCount, s.ttl) + } + + logger.Debug("Reusing the provider runner", "invocationCount", r.invocationCount) + rc, err := r.Start() + return &providerInUse{ + scheduler: s, + handle: h, + }, rc, errors.Wrapf(err, "cannot use already started provider with handle: %s", h) + case r != nil: + logger.Debug("The provider runner has expired. Attempting to stop...", "invocationCount", r.invocationCount) + if err := r.Stop(); err != nil { + return nil, "", errors.Wrapf(err, "cannot schedule a new Terraform provider for handle: %s", h) + } + } + + runner := NewSharedProvider(s.runnerOpts...) + r = &schedulerEntry{ + ProviderRunner: runner, + } + runner.logger = logger + s.runners[h] = r + logger.Debug("Starting new shared provider...") + rc, err := s.runners[h].Start() + return &providerInUse{ + scheduler: s, + handle: h, + }, rc, errors.Wrapf(err, "cannot start the scheduled provider runner for handle: %s", h) +} diff --git a/pkg/terraform/store.go b/pkg/terraform/store.go index 556cdaf8..604db8a3 100644 --- a/pkg/terraform/store.go +++ b/pkg/terraform/store.go @@ -16,9 +16,11 @@ package terraform import ( "context" + "crypto/md5" "fmt" "os" "path/filepath" + "sort" "strings" "sync" "time" @@ -57,6 +59,44 @@ type ProviderRequirement struct { // ProviderConfiguration holds the setup configuration body type ProviderConfiguration map[string]any +// ToProviderHandle converts a provider configuration to a handle +// for the provider scheduler. +func (pc ProviderConfiguration) ToProviderHandle() (ProviderHandle, error) { + h := strings.Join(getSortedKeyValuePairs("", pc), ",") + hash := md5.New() + if _, err := hash.Write([]byte(h)); err != nil { + return InvalidProviderHandle, errors.Wrap(err, "cannot convert provider configuration to scheduler handle") + } + return ProviderHandle(fmt.Sprintf("%x", hash.Sum(nil))), nil +} + +func getSortedKeyValuePairs(parent string, m map[string]any) []string { + result := make([]string, 0, len(m)) + sortedKeys := make([]string, 0, len(m)) + for k := range m { + sortedKeys = append(sortedKeys, k) + } + sort.Strings(sortedKeys) + for _, k := range sortedKeys { + v := m[k] + switch t := v.(type) { + case []string: + result = append(result, fmt.Sprintf("%q:%q", parent+k, strings.Join(t, ","))) + case map[string]any: + result = append(result, getSortedKeyValuePairs(parent+k+".", t)...) + case []map[string]any: + cArr := make([]string, 0, len(t)) + for i, e := range t { + cArr = append(cArr, getSortedKeyValuePairs(fmt.Sprintf("%s%s[%d].", parent, k, i), e)...) + } + result = append(result, fmt.Sprintf("%q:%q", parent+k, strings.Join(cArr, ","))) + default: + result = append(result, fmt.Sprintf("%q:%q", parent+k, t)) + } + } + return result +} + // Setup holds values for the Terraform version and setup // requirements and configuration body type Setup struct { @@ -105,10 +145,11 @@ func WithFs(fs afero.Fs) WorkspaceStoreOption { } } -// WithProviderRunner sets the ProviderRunner to be used. -func WithProviderRunner(pr ProviderRunner) WorkspaceStoreOption { +// WithProviderScheduler sets the ProviderScheduler to be used with +// a WorkspaceStore. +func WithProviderScheduler(ps ProviderScheduler) WorkspaceStoreOption { return func(ws *WorkspaceStore) { - ws.providerRunner = pr + ws.providerScheduler = ps } } @@ -121,15 +162,24 @@ func WithProcessReportInterval(d time.Duration) WorkspaceStoreOption { } } +// WithDisableInit disables `terraform init` invocations in case +// workspace initialization is not needed (e.g., when using the +// shared gRPC server runtime). +func WithDisableInit(disable bool) WorkspaceStoreOption { + return func(ws *WorkspaceStore) { + ws.disableInit = disable + } +} + // NewWorkspaceStore returns a new WorkspaceStore. func NewWorkspaceStore(l logging.Logger, opts ...WorkspaceStoreOption) *WorkspaceStore { ws := &WorkspaceStore{ - store: map[types.UID]*Workspace{}, - logger: l, - mu: sync.Mutex{}, - fs: afero.Afero{Fs: afero.NewOsFs()}, - executor: exec.New(), - providerRunner: NewNoOpProviderRunner(), + store: map[types.UID]*Workspace{}, + logger: l, + mu: sync.Mutex{}, + fs: afero.Afero{Fs: afero.NewOsFs()}, + executor: exec.New(), + providerScheduler: NewNoOpProviderScheduler(), } for _, f := range opts { f(ws) @@ -149,11 +199,12 @@ type WorkspaceStore struct { // cause rehashing in some cases. store map[types.UID]*Workspace logger logging.Logger - providerRunner ProviderRunner + providerScheduler ProviderScheduler mu sync.Mutex processReportInterval time.Duration fs afero.Afero executor exec.Interface + disableInit bool } // Workspace makes sure the Terraform workspace for the given resource is ready @@ -184,11 +235,17 @@ func (ws *WorkspaceStore) Workspace(ctx context.Context, c resource.SecretClient if err := fp.EnsureTFState(ctx); err != nil { return nil, errors.Wrap(err, "cannot ensure tfstate file") } - isNeedProviderUpgrade, err := fp.needProviderUpgrade() - if err != nil { - return nil, errors.Wrap(err, "cannot check if a Terraform dependency update is required") + + isNeedProviderUpgrade := false + if !ws.disableInit { + isNeedProviderUpgrade, err = fp.needProviderUpgrade() + if err != nil { + return nil, errors.Wrap(err, "cannot check if a Terraform dependency update is required") + } } - if err := fp.WriteMainTF(); err != nil { + + ph, err := fp.WriteMainTF() + if err != nil { return nil, errors.Wrap(err, "cannot write main tf file") } if isNeedProviderUpgrade { @@ -198,16 +255,20 @@ func (ws *WorkspaceStore) Workspace(ctx context.Context, c resource.SecretClient return w, errors.Wrapf(err, "cannot upgrade workspace: %s", ts.filterSensitiveInformation(string(out))) } } - attachmentConfig, err := ws.providerRunner.Start() + inuse, attachmentConfig, err := ws.providerScheduler.Start(ph) if err != nil { - return nil, err + return nil, errors.Wrap(err, "cannot schedule a shared provider for the workspace") + } + w.env = append(w.env, fmt.Sprintf(fmtEnv, envReattachConfig, attachmentConfig)) + w.providerInUse = inuse + + if ws.disableInit { + return w, nil } _, err = ws.fs.Stat(filepath.Join(dir, ".terraform.lock.hcl")) if xpresource.Ignore(os.IsNotExist, err) != nil { return nil, errors.Wrap(err, "cannot stat init lock file") } - w.env = append(w.env, fmt.Sprintf(fmtEnv, envReattachConfig, attachmentConfig)) - // We need to initialize only if the workspace hasn't been initialized yet. if !os.IsNotExist(err) { return w, nil diff --git a/pkg/terraform/workspace.go b/pkg/terraform/workspace.go index 886c4dd9..51a8fa98 100644 --- a/pkg/terraform/workspace.go +++ b/pkg/terraform/workspace.go @@ -101,9 +101,10 @@ type Workspace struct { dir string env []string - logger logging.Logger - executor k8sExec.Interface - fs afero.Afero + logger logging.Logger + executor k8sExec.Interface + providerInUse InUse + fs afero.Afero filterFn func(string) string } @@ -288,6 +289,10 @@ func (w *Workspace) runTF(ctx context.Context, execMode metrics.ExecMode, args . if len(args) < 1 { return nil, errors.New("args cannot be empty") } + if err := w.providerInUse.Increment(); err != nil { + return nil, errors.Wrap(err, "cannot increment in-use counter for the shared provider") + } + defer w.providerInUse.Decrement() cmd := w.executor.CommandContext(ctx, "terraform", args...) cmd.SetEnv(append(os.Environ(), w.env...)) cmd.SetDir(w.dir) From 967a3a5a8faa4ef5bdb199e84164b216d6903898 Mon Sep 17 00:00:00 2001 From: Alper Rifat Ulucinar Date: Tue, 21 Mar 2023 16:43:04 +0300 Subject: [PATCH 2/4] Move scheduler from WorskpaceStore to ExternalClient Signed-off-by: Alper Rifat Ulucinar --- pkg/controller/external.go | 44 +++++++- pkg/controller/external_test.go | 6 +- pkg/pipeline/templates/controller.go.tmpl | 2 +- pkg/terraform/files_test.go | 2 +- pkg/terraform/provider_runner.go | 36 +++---- pkg/terraform/provider_runner_test.go | 10 +- pkg/terraform/provider_scheduler.go | 118 +++++++++++++++++++--- pkg/terraform/store.go | 44 +++----- pkg/terraform/workspace.go | 43 +++++++- pkg/terraform/workspace_test.go | 4 +- 10 files changed, 233 insertions(+), 76 deletions(-) diff --git a/pkg/controller/external.go b/pkg/controller/external.go index ae468afc..daf9cad0 100644 --- a/pkg/controller/external.go +++ b/pkg/controller/external.go @@ -32,6 +32,7 @@ const ( errApply = "cannot apply" errDestroy = "cannot destroy" errStatusUpdate = "cannot update status of custom resource" + errScheduleProvider = "cannot schedule native Terraform provider process" ) // Option allows you to configure Connector. @@ -46,6 +47,14 @@ func WithCallbackProvider(ac CallbackProvider) Option { } } +// WithProviderScheduler sets the native Terraform provider scheduler to be used +// by a Connector. +func WithProviderScheduler(s terraform.ProviderScheduler) Option { + return func(c *Connector) { + c.providerScheduler = s + } +} + // NewConnector returns a new Connector object. func NewConnector(kube client.Client, ws Store, sf terraform.SetupFn, cfg *config.Resource, opts ...Option) *Connector { c := &Connector{ @@ -53,6 +62,7 @@ func NewConnector(kube client.Client, ws Store, sf terraform.SetupFn, cfg *confi getTerraformSetup: sf, store: ws, config: cfg, + providerScheduler: terraform.NewNoOpProviderScheduler(), } for _, f := range opts { f(c) @@ -68,6 +78,8 @@ type Connector struct { getTerraformSetup terraform.SetupFn config *config.Resource callback CallbackProvider + providerScheduler terraform.ProviderScheduler + providerHandle terraform.ProviderHandle } // Connect makes sure the underlying client is ready to issue requests to the @@ -82,19 +94,45 @@ func (c *Connector) Connect(ctx context.Context, mg xpresource.Managed) (managed if err != nil { return nil, errors.Wrap(err, errGetTerraformSetup) } + if ts.Scheduler != nil { + c.providerScheduler = ts.Scheduler + } - tf, err := c.store.Workspace(ctx, &APISecretClient{kube: c.kube}, tr, ts, c.config) + ws, err := c.store.Workspace(ctx, &APISecretClient{kube: c.kube}, tr, ts, c.config) if err != nil { return nil, errors.Wrap(err, errGetWorkspace) } - + if err := c.scheduleProvider(ws); err != nil { + return nil, errors.Wrap(err, errScheduleProvider) + } return &external{ - workspace: tf, + workspace: ws, config: c.config, callback: c.callback, }, nil } +func (c *Connector) scheduleProvider(ws *terraform.Workspace) error { + if c.providerScheduler == nil || ws == nil { + return nil + } + inuse, attachmentConfig, err := c.providerScheduler.Start(ws.ProviderHandle) + if err != nil { + return errors.Wrap(err, "cannot schedule a shared provider for the workspace") + } + ws.UseSharedProvider(inuse, attachmentConfig) + c.providerHandle = ws.ProviderHandle + return nil +} + +// Disconnect releases any resources held by the Connector. +func (c *Connector) Disconnect(_ context.Context) error { + if c.providerScheduler == nil { + return nil + } + return errors.Wrap(c.providerScheduler.Stop(c.providerHandle), "cannot stop the shared provider for the workspace") +} + type external struct { workspace Workspace config *config.Resource diff --git a/pkg/controller/external_test.go b/pkg/controller/external_test.go index 7bc4ce3c..afbb563c 100644 --- a/pkg/controller/external_test.go +++ b/pkg/controller/external_test.go @@ -26,6 +26,10 @@ import ( "github.com/upbound/upjet/pkg/terraform" ) +const ( + testPath = "test/path" +) + var ( errBoom = errors.New("boom") exampleState = &json.StateV4{ @@ -154,7 +158,7 @@ func TestConnect(t *testing.T) { }, store: StoreFns{ WorkspaceFn: func(_ context.Context, _ resource.SecretClient, _ resource.Terraformed, _ terraform.Setup, _ *config.Resource) (*terraform.Workspace, error) { - return nil, nil + return terraform.NewWorkspace(testPath), nil }, }, }, diff --git a/pkg/pipeline/templates/controller.go.tmpl b/pkg/pipeline/templates/controller.go.tmpl index 7dd13a43..cfc422c9 100644 --- a/pkg/pipeline/templates/controller.go.tmpl +++ b/pkg/pipeline/templates/controller.go.tmpl @@ -37,7 +37,7 @@ func Setup(mgr ctrl.Manager, o tjcontroller.Options) error { } r := managed.NewReconciler(mgr, xpresource.ManagedKind({{ .TypePackageAlias }}{{ .CRD.Kind }}_GroupVersionKind), - managed.WithExternalConnecter(tjcontroller.NewConnector(mgr.GetClient(), o.WorkspaceStore, o.SetupFn, o.Provider.Resources["{{ .ResourceType }}"], + managed.WithExternalConnectDisconnecter(tjcontroller.NewConnector(mgr.GetClient(), o.WorkspaceStore, o.SetupFn, o.Provider.Resources["{{ .ResourceType }}"], {{- if .UseAsync }} tjcontroller.WithCallbackProvider(tjcontroller.NewAPICallbacks(mgr, xpresource.ManagedKind({{ .TypePackageAlias }}{{ .CRD.Kind }}_GroupVersionKind))), {{- end}} diff --git a/pkg/terraform/files_test.go b/pkg/terraform/files_test.go index 84bfdff1..d43747fb 100644 --- a/pkg/terraform/files_test.go +++ b/pkg/terraform/files_test.go @@ -414,7 +414,7 @@ func TestWriteMainTF(t *testing.T) { if err != nil { t.Errorf("cannot initialize a file producer: %s", err.Error()) } - err = fp.WriteMainTF() + _, err = fp.WriteMainTF() if diff := cmp.Diff(tc.want.err, err, test.EquateErrors()); diff != "" { t.Errorf("\n%s\nWriteMainTF(...): -want error, +got error:\n%s", tc.reason, diff) } diff --git a/pkg/terraform/provider_runner.go b/pkg/terraform/provider_runner.go index 72272a9c..3ecfebc4 100644 --- a/pkg/terraform/provider_runner.go +++ b/pkg/terraform/provider_runner.go @@ -33,10 +33,9 @@ const ( errFmtTimeout = "timed out after %v while waiting for the reattach configuration string" // an example value would be: '{"registry.terraform.io/hashicorp/aws": {"Protocol": "grpc", "ProtocolVersion":5, "Pid":... "Addr":{"Network": "unix","String": "..."}}}' - fmtReattachEnv = `{"%s":{"Protocol":"grpc","ProtocolVersion":%d,"Pid":%d,"Test": true,"Addr":{"Network": "unix","String": "%s"}}}` - fmtSetEnv = "%s=%s" - envReattachConfig = "TF_REATTACH_PROVIDERS" - envMagicCookie = "TF_PLUGIN_MAGIC_COOKIE" + fmtReattachEnv = `{"%s":{"Protocol":"grpc","ProtocolVersion":%d,"Pid":%d,"Test": true,"Addr":{"Network": "unix","String": "%s"}}}` + fmtSetEnv = "%s=%s" + envMagicCookie = "TF_PLUGIN_MAGIC_COOKIE" // Terraform provider plugin expects this magic cookie in its environment // (as the value of key TF_PLUGIN_MAGIC_COOKIE): // https://github.com/hashicorp/terraform/blob/d35bc0531255b496beb5d932f185cbcdb2d61a99/internal/plugin/serve.go#L33 @@ -90,18 +89,18 @@ type SharedProvider struct { stopCh chan bool } -// SharedGRPCRunnerOption lets you configure the shared gRPC runner. -type SharedGRPCRunnerOption func(runner *SharedProvider) +// SharedProviderOption lets you configure the shared gRPC runner. +type SharedProviderOption func(runner *SharedProvider) // WithNativeProviderArgs are the arguments to be passed to the native provider -func WithNativeProviderArgs(args ...string) SharedGRPCRunnerOption { +func WithNativeProviderArgs(args ...string) SharedProviderOption { return func(sr *SharedProvider) { sr.nativeProviderArgs = args } } // WithNativeProviderExecutor sets the process executor to be used -func WithNativeProviderExecutor(e exec.Interface) SharedGRPCRunnerOption { +func WithNativeProviderExecutor(e exec.Interface) SharedProviderOption { return func(sr *SharedProvider) { sr.executor = e } @@ -109,7 +108,7 @@ func WithNativeProviderExecutor(e exec.Interface) SharedGRPCRunnerOption { // WithProtocolVersion sets the gRPC protocol version in use between // the Terraform CLI and the native provider. -func WithProtocolVersion(protocolVersion int) SharedGRPCRunnerOption { +func WithProtocolVersion(protocolVersion int) SharedProviderOption { return func(sr *SharedProvider) { sr.protocolVersion = protocolVersion } @@ -117,7 +116,7 @@ func WithProtocolVersion(protocolVersion int) SharedGRPCRunnerOption { // WithNativeProviderPath configures the Terraform provider executable path // for the runner. -func WithNativeProviderPath(p string) SharedGRPCRunnerOption { +func WithNativeProviderPath(p string) SharedProviderOption { return func(sr *SharedProvider) { sr.nativeProviderPath = p } @@ -125,15 +124,22 @@ func WithNativeProviderPath(p string) SharedGRPCRunnerOption { // WithNativeProviderName configures the Terraform provider name // for the runner. -func WithNativeProviderName(n string) SharedGRPCRunnerOption { +func WithNativeProviderName(n string) SharedProviderOption { return func(sr *SharedProvider) { sr.nativeProviderName = n } } +// WithNativeProviderLogger configures the logger for the runner. +func WithNativeProviderLogger(logger logging.Logger) SharedProviderOption { + return func(sr *SharedProvider) { + sr.logger = logger + } +} + // NewSharedProvider instantiates a SharedProvider runner with an // OS executor using the supplied options. -func NewSharedProvider(opts ...SharedGRPCRunnerOption) *SharedProvider { +func NewSharedProvider(opts ...SharedProviderOption) *SharedProvider { sr := &SharedProvider{ protocolVersion: defaultProtocolVersion, executor: exec.New(), @@ -204,12 +210,6 @@ func (sr *SharedProvider) Start() (string, error) { //nolint:gocyclo log.Info("Native Terraform provider process error", "error", err) errCh <- err case <-sr.stopCh: - defer func() { - // we have observed a panic in k8s.io/utils/exec.Cmd.Stop() - if r := recover(); r != nil { - sr.logger.Info("Recovered from a panic in SharedProvider.Stop: %v", r) - } - }() cmd.Stop() } }() diff --git a/pkg/terraform/provider_runner_test.go b/pkg/terraform/provider_runner_test.go index 5e60037b..7087d482 100644 --- a/pkg/terraform/provider_runner_test.go +++ b/pkg/terraform/provider_runner_test.go @@ -48,8 +48,8 @@ func TestStartSharedServer(t *testing.T) { }, "SuccessfullyStarted": { args: args{ - runner: NewSharedProvider(logging.NewNopLogger(), testPath, testName, WithNativeProviderArgs(testArgs...), - WithNativeProviderExecutor(newExecutorWithStoutPipe(testReattachConfig1, nil))), + runner: NewSharedProvider(WithNativeProviderLogger(logging.NewNopLogger()), WithNativeProviderPath(testPath), + WithNativeProviderName(testName), WithNativeProviderArgs(testArgs...), WithNativeProviderExecutor(newExecutorWithStoutPipe(testReattachConfig1, nil))), }, want: want{ reattachConfig: fmt.Sprintf(`{"provider-test":{"Protocol":"grpc","ProtocolVersion":5,"Pid":%d,"Test": true,"Addr":{"Network": "unix","String": "test1"}}}`, os.Getpid()), @@ -71,8 +71,8 @@ func TestStartSharedServer(t *testing.T) { }, "NativeProviderError": { args: args{ - runner: NewSharedProvider(logging.NewNopLogger(), testPath, testName, - WithNativeProviderExecutor(newExecutorWithStoutPipe(testReattachConfig1, testErr))), + runner: NewSharedProvider(WithNativeProviderLogger(logging.NewNopLogger()), WithNativeProviderPath(testPath), + WithNativeProviderName(testName), WithNativeProviderArgs(testArgs...), WithNativeProviderExecutor(newExecutorWithStoutPipe(testReattachConfig1, testErr))), }, want: want{ err: testErr, @@ -102,7 +102,7 @@ func TestStartSharedServer(t *testing.T) { if err != nil { return } - if diff := cmp.Diff(reattachConfig, tt.want.reattachConfig); diff != "" { + if diff := cmp.Diff(tt.want.reattachConfig, reattachConfig); diff != "" { t.Errorf("\n%s\nStartSharedServer(): -want reattachConfig, +got reattachConfig:\n%s", name, diff) } }) diff --git a/pkg/terraform/provider_scheduler.go b/pkg/terraform/provider_scheduler.go index e1d3fc26..98abfdf1 100644 --- a/pkg/terraform/provider_scheduler.go +++ b/pkg/terraform/provider_scheduler.go @@ -21,31 +21,62 @@ import ( "github.com/pkg/errors" ) +// ProviderHandle represents native plugin (Terraform provider) process +// handles used by the various schedulers to map Terraform workspaces +// to these processes. type ProviderHandle string const ( + // InvalidProviderHandle is an invalid ProviderHandle. InvalidProviderHandle ProviderHandle = "" ttlBudget = 0.1 ) +// ProviderScheduler represents a shared native plugin process scheduler. type ProviderScheduler interface { + // Start forks or reuses a native plugin process associated with + // the supplied ProviderHandle. Start(ProviderHandle) (InUse, string, error) + // Stop terminates the native plugin process, if it exists, for + // the specified ProviderHandle. + Stop(ProviderHandle) error } +// InUse keeps track of the usage of a shared resource, +// like a native plugin process. type InUse interface { - Increment() error + // Increment marks one more user of a shared resource + // such as a native plugin process. + Increment() + // Decrement marks when a user of a shared resource, + // such as a native plugin process, has released the resource. Decrement() } +// noopInUse satisfies the InUse interface and is a noop implementation. +type noopInUse struct{} + +func (noopInUse) Increment() {} + +func (noopInUse) Decrement() {} + +// NoOpProviderScheduler satisfied the ProviderScheduler interface +// and is a noop implementation, i.e., it does not schedule any +// native plugin processes. type NoOpProviderScheduler struct{} +// NewNoOpProviderScheduler initializes a new NoOpProviderScheduler. func NewNoOpProviderScheduler() NoOpProviderScheduler { return NoOpProviderScheduler{} } func (NoOpProviderScheduler) Start(ProviderHandle) (InUse, string, error) { - return nil, "", nil + return noopInUse{}, "", nil +} + +func (NoOpProviderScheduler) Stop(ProviderHandle) error { + return nil } type schedulerEntry struct { @@ -59,16 +90,12 @@ type providerInUse struct { handle ProviderHandle } -func (p *providerInUse) Increment() error { +func (p *providerInUse) Increment() { p.scheduler.mu.Lock() defer p.scheduler.mu.Unlock() r := p.scheduler.runners[p.handle] - if r == nil { - return errors.Errorf("cannot mark provider runner as in-use with handle: %s", p.handle) - } r.inUse++ r.invocationCount++ - return nil } func (p *providerInUse) Decrement() { @@ -80,15 +107,24 @@ func (p *providerInUse) Decrement() { p.scheduler.runners[p.handle].inUse-- } +// SharedProviderScheduler is a ProviderScheduler that +// shares a native plugin (Terraform provider) process between +// MR reconciliation loops whose MRs yield the same ProviderHandle, i.e., +// whose Terraform resource blocks are configuration-wise identical. +// SharedProviderScheduler is configured with a max TTL and it will gracefully +// attempt to replace ProviderRunners whose TTL exceed this maximum, +// if they are not in-use. type SharedProviderScheduler struct { - runnerOpts []SharedGRPCRunnerOption + runnerOpts []SharedProviderOption runners map[ProviderHandle]*schedulerEntry ttl int mu *sync.Mutex logger logging.Logger } -func NewSharedProviderScheduler(l logging.Logger, ttl int, opts ...SharedGRPCRunnerOption) *SharedProviderScheduler { +// NewSharedProviderScheduler initializes a new SharedProviderScheduler +// with the specified logger and options. +func NewSharedProviderScheduler(l logging.Logger, ttl int, opts ...SharedProviderOption) *SharedProviderScheduler { return &SharedProviderScheduler{ runnerOpts: opts, mu: &sync.Mutex{}, @@ -99,11 +135,11 @@ func NewSharedProviderScheduler(l logging.Logger, ttl int, opts ...SharedGRPCRun } func (s *SharedProviderScheduler) Start(h ProviderHandle) (InUse, string, error) { + logger := s.logger.WithValues("handle", h, "ttl", s.ttl, "ttlBudget", ttlBudget) s.mu.Lock() defer s.mu.Unlock() r := s.runners[h] - logger := s.logger.WithValues("handle", h, "ttl", s.ttl, "ttlBudget", ttlBudget) switch { case r != nil && (r.invocationCount < s.ttl || r.inUse > 0): if r.invocationCount > int(float64(s.ttl)*(1+ttlBudget)) { @@ -120,7 +156,7 @@ func (s *SharedProviderScheduler) Start(h ProviderHandle) (InUse, string, error) case r != nil: logger.Debug("The provider runner has expired. Attempting to stop...", "invocationCount", r.invocationCount) if err := r.Stop(); err != nil { - return nil, "", errors.Wrapf(err, "cannot schedule a new Terraform provider for handle: %s", h) + return nil, "", errors.Wrapf(err, "cannot schedule a new shared provider for handle: %s", h) } } @@ -135,5 +171,63 @@ func (s *SharedProviderScheduler) Start(h ProviderHandle) (InUse, string, error) return &providerInUse{ scheduler: s, handle: h, - }, rc, errors.Wrapf(err, "cannot start the scheduled provider runner for handle: %s", h) + }, rc, errors.Wrapf(err, "cannot start the shared provider runner for handle: %s", h) +} + +func (s *SharedProviderScheduler) Stop(ProviderHandle) error { + // noop + return nil +} + +// WorkspaceProviderScheduler is a ProviderScheduler that +// shares a native plugin (Terraform provider) process between +// the Terraform CLI invocations in the context of a single +// reconciliation loop (belonging to a single workspace). +// When the managed.ExternalDisconnecter disconnects, +// the scheduler terminates the native plugin process. +type WorkspaceProviderScheduler struct { + runner ProviderRunner + logger logging.Logger + inUse *workspaceInUse +} + +type workspaceInUse struct { + wg *sync.WaitGroup +} + +func (w *workspaceInUse) Increment() { + w.wg.Add(1) +} + +func (w *workspaceInUse) Decrement() { + w.wg.Done() +} + +// NewWorkspaceProviderScheduler initializes a new WorkspaceProviderScheduler. +func NewWorkspaceProviderScheduler(l logging.Logger, opts ...SharedProviderOption) *WorkspaceProviderScheduler { + return &WorkspaceProviderScheduler{ + logger: l, + runner: NewSharedProvider(append([]SharedProviderOption{WithNativeProviderLogger(l)}, opts...)...), + inUse: &workspaceInUse{ + wg: &sync.WaitGroup{}, + }, + } +} + +func (s *WorkspaceProviderScheduler) Start(h ProviderHandle) (InUse, string, error) { + s.logger.Debug("Starting workspace scoped shared provider runner.", "handle", h) + reattachConfig, err := s.runner.Start() + return s.inUse, reattachConfig, errors.Wrap(err, "cannot start a workspace provider runner") +} + +func (s *WorkspaceProviderScheduler) Stop(h ProviderHandle) error { + s.logger.Debug("Attempting to stop workspace scoped shared provider runner.", "handle", h) + go func() { + s.inUse.wg.Wait() + s.logger.Debug("Provider runner not in-use, stopping it.", "handle", h) + if err := s.runner.Stop(); err != nil { + s.logger.Info("Failed to stop provider runner", "error", errors.Wrap(err, "cannot stop a workspace provider runner")) + } + }() + return nil } diff --git a/pkg/terraform/store.go b/pkg/terraform/store.go index 604db8a3..663128ec 100644 --- a/pkg/terraform/store.go +++ b/pkg/terraform/store.go @@ -16,7 +16,7 @@ package terraform import ( "context" - "crypto/md5" + "crypto/sha256" "fmt" "os" "path/filepath" @@ -39,10 +39,6 @@ import ( "github.com/upbound/upjet/pkg/resource" ) -const ( - fmtEnv = "%s=%s" -) - // SetupFn is a function that returns Terraform setup which contains // provider requirement, configuration and Terraform version. type SetupFn func(ctx context.Context, client client.Client, mg xpresource.Managed) (Setup, error) @@ -63,7 +59,7 @@ type ProviderConfiguration map[string]any // for the provider scheduler. func (pc ProviderConfiguration) ToProviderHandle() (ProviderHandle, error) { h := strings.Join(getSortedKeyValuePairs("", pc), ",") - hash := md5.New() + hash := sha256.New() if _, err := hash.Write([]byte(h)); err != nil { return InvalidProviderHandle, errors.Wrap(err, "cannot convert provider configuration to scheduler handle") } @@ -118,6 +114,12 @@ type Setup struct { // not part of the Terraform AWS Provider configuration, so it could be // made available only by this map. ClientMetadata map[string]string + + // Scheduler specifies the provider scheduler to be used for the Terraform + // workspace being setup. If not set, no scheduler is configured and + // the lifecycle of Terraform provider processes will be managed by + // the Terraform CLI. + Scheduler ProviderScheduler } // Map returns the Setup object in map form. The initial reason was so that @@ -145,14 +147,6 @@ func WithFs(fs afero.Fs) WorkspaceStoreOption { } } -// WithProviderScheduler sets the ProviderScheduler to be used with -// a WorkspaceStore. -func WithProviderScheduler(ps ProviderScheduler) WorkspaceStoreOption { - return func(ws *WorkspaceStore) { - ws.providerScheduler = ps - } -} - // WithProcessReportInterval enables the upjet.terraform.running_processes // metric, which periodically reports the total number of Terraform CLI and // Terraform provider processes in the system. @@ -174,12 +168,11 @@ func WithDisableInit(disable bool) WorkspaceStoreOption { // NewWorkspaceStore returns a new WorkspaceStore. func NewWorkspaceStore(l logging.Logger, opts ...WorkspaceStoreOption) *WorkspaceStore { ws := &WorkspaceStore{ - store: map[types.UID]*Workspace{}, - logger: l, - mu: sync.Mutex{}, - fs: afero.Afero{Fs: afero.NewOsFs()}, - executor: exec.New(), - providerScheduler: NewNoOpProviderScheduler(), + store: map[types.UID]*Workspace{}, + logger: l, + mu: sync.Mutex{}, + fs: afero.Afero{Fs: afero.NewOsFs()}, + executor: exec.New(), } for _, f := range opts { f(ws) @@ -199,7 +192,6 @@ type WorkspaceStore struct { // cause rehashing in some cases. store map[types.UID]*Workspace logger logging.Logger - providerScheduler ProviderScheduler mu sync.Mutex processReportInterval time.Duration fs afero.Afero @@ -244,8 +236,7 @@ func (ws *WorkspaceStore) Workspace(ctx context.Context, c resource.SecretClient } } - ph, err := fp.WriteMainTF() - if err != nil { + if w.ProviderHandle, err = fp.WriteMainTF(); err != nil { return nil, errors.Wrap(err, "cannot write main tf file") } if isNeedProviderUpgrade { @@ -255,13 +246,6 @@ func (ws *WorkspaceStore) Workspace(ctx context.Context, c resource.SecretClient return w, errors.Wrapf(err, "cannot upgrade workspace: %s", ts.filterSensitiveInformation(string(out))) } } - inuse, attachmentConfig, err := ws.providerScheduler.Start(ph) - if err != nil { - return nil, errors.Wrap(err, "cannot schedule a shared provider for the workspace") - } - w.env = append(w.env, fmt.Sprintf(fmtEnv, envReattachConfig, attachmentConfig)) - w.providerInUse = inuse - if ws.disableInit { return w, nil } diff --git a/pkg/terraform/workspace.go b/pkg/terraform/workspace.go index 51a8fa98..e72caab2 100644 --- a/pkg/terraform/workspace.go +++ b/pkg/terraform/workspace.go @@ -16,9 +16,11 @@ package terraform import ( "context" + "fmt" "os" "path/filepath" "strings" + "sync" "time" "github.com/pkg/errors" @@ -34,6 +36,8 @@ import ( const ( defaultAsyncTimeout = 1 * time.Hour + envReattachConfig = "TF_REATTACH_PROVIDERS" + fmtEnv = "%s=%s" ) // WorkspaceOption allows you to configure Workspace objects. @@ -67,12 +71,21 @@ func WithAferoFs(fs afero.Fs) WorkspaceOption { } } +// WithFilterFn configures the debug log sensitive information filtering func. func WithFilterFn(filterFn func(string) string) WorkspaceOption { return func(w *Workspace) { w.filterFn = filterFn } } +// WithProviderInUse configures an InUse for keeping track of +// the shared provider InUse by this Terraform workspace. +func WithProviderInUse(providerInUse InUse) WorkspaceOption { + return func(w *Workspace) { + w.providerInUse = providerInUse + } +} + // NewWorkspace returns a new Workspace object that operates in the given // directory. func NewWorkspace(dir string, opts ...WorkspaceOption) *Workspace { @@ -81,6 +94,8 @@ func NewWorkspace(dir string, opts ...WorkspaceOption) *Workspace { dir: dir, logger: logging.NewNopLogger(), fs: afero.Afero{Fs: afero.NewOsFs()}, + providerInUse: noopInUse{}, + mu: &sync.Mutex{}, } for _, f := range opts { f(w) @@ -97,6 +112,10 @@ type CallbackFn func(error, context.Context) error type Workspace struct { // LastOperation contains information about the last operation performed. LastOperation *Operation + // ProviderHandle is the handle of the associated native Terraform provider + // computed from the generated provider resource configuration block + // of the Terraform workspace. + ProviderHandle ProviderHandle dir string env []string @@ -105,10 +124,27 @@ type Workspace struct { executor k8sExec.Interface providerInUse InUse fs afero.Afero + mu *sync.Mutex filterFn func(string) string } +func (w *Workspace) UseSharedProvider(inuse InUse, attachmentConfig string) { + w.mu.Lock() + defer w.mu.Unlock() + // remove existing reattach configs + env := make([]string, 0, len(w.env)) + prefix := fmt.Sprintf(fmtEnv, envReattachConfig, "") + for _, e := range w.env { + if !strings.HasPrefix(e, prefix) { + env = append(env, e) + } + } + env = append(env, prefix+attachmentConfig) + w.env = env + w.providerInUse = inuse +} + // ApplyAsync makes a terraform apply call without blocking and calls the given // function once that apply call finishes. func (w *Workspace) ApplyAsync(callback CallbackFn) error { @@ -289,9 +325,10 @@ func (w *Workspace) runTF(ctx context.Context, execMode metrics.ExecMode, args . if len(args) < 1 { return nil, errors.New("args cannot be empty") } - if err := w.providerInUse.Increment(); err != nil { - return nil, errors.Wrap(err, "cannot increment in-use counter for the shared provider") - } + w.mu.Lock() + defer w.mu.Unlock() + w.logger.Debug("Running terraform", "args", args) + w.providerInUse.Increment() defer w.providerInUse.Decrement() cmd := w.executor.CommandContext(ctx, "terraform", args...) cmd.SetEnv(append(os.Environ(), w.env...)) diff --git a/pkg/terraform/workspace_test.go b/pkg/terraform/workspace_test.go index cfef66f7..abbcc1e0 100644 --- a/pkg/terraform/workspace_test.go +++ b/pkg/terraform/workspace_test.go @@ -98,7 +98,7 @@ func TestWorkspaceApply(t *testing.T) { "Success": { args: args{ w: NewWorkspace(directory, WithExecutor(&testingexec.FakeExec{DisableScripts: true}), WithAferoFs(fs), - WithFilterFn(filterFn)), + WithFilterFn(filterFn), WithProviderInUse(noopInUse{})), }, want: want{ r: ApplyResult{ @@ -109,7 +109,7 @@ func TestWorkspaceApply(t *testing.T) { "Failure": { args: args{ w: NewWorkspace(directory, WithExecutor(newFakeExec(errBoom.Error(), errBoom)), WithAferoFs(fs), - WithFilterFn(filterFn)), + WithFilterFn(filterFn), WithProviderInUse(noopInUse{})), }, want: want{ err: tferrors.NewApplyFailed([]byte(errBoom.Error())), From 616ea71453e7963f00e45e593fb576e40f5e3c14 Mon Sep 17 00:00:00 2001 From: Alper Rifat Ulucinar Date: Fri, 24 Mar 2023 00:05:44 +0300 Subject: [PATCH 3/4] Call InUse.Increment from reconciliation goroutine Signed-off-by: Alper Rifat Ulucinar --- pkg/controller/external.go | 83 ++++++++++++++--------- pkg/controller/interfaces.go | 5 ++ pkg/metrics/metrics.go | 22 ------ pkg/pipeline/templates/controller.go.tmpl | 2 +- pkg/terraform/provider_runner.go | 4 ++ pkg/terraform/provider_scheduler.go | 2 +- pkg/terraform/store.go | 10 ++- pkg/terraform/workspace.go | 49 ++++++++++--- 8 files changed, 106 insertions(+), 71 deletions(-) diff --git a/pkg/controller/external.go b/pkg/controller/external.go index daf9cad0..8ced921c 100644 --- a/pkg/controller/external.go +++ b/pkg/controller/external.go @@ -9,6 +9,7 @@ import ( "time" xpv1 "github.com/crossplane/crossplane-runtime/apis/common/v1" + "github.com/crossplane/crossplane-runtime/pkg/logging" "github.com/crossplane/crossplane-runtime/pkg/reconciler/managed" xpresource "github.com/crossplane/crossplane-runtime/pkg/resource" "github.com/pkg/errors" @@ -47,11 +48,10 @@ func WithCallbackProvider(ac CallbackProvider) Option { } } -// WithProviderScheduler sets the native Terraform provider scheduler to be used -// by a Connector. -func WithProviderScheduler(s terraform.ProviderScheduler) Option { +// WithLogger configures a logger for the Connector. +func WithLogger(l logging.Logger) Option { return func(c *Connector) { - c.providerScheduler = s + c.logger = l } } @@ -62,7 +62,7 @@ func NewConnector(kube client.Client, ws Store, sf terraform.SetupFn, cfg *confi getTerraformSetup: sf, store: ws, config: cfg, - providerScheduler: terraform.NewNoOpProviderScheduler(), + logger: logging.NewNopLogger(), } for _, f := range opts { f(c) @@ -78,8 +78,7 @@ type Connector struct { getTerraformSetup terraform.SetupFn config *config.Resource callback CallbackProvider - providerScheduler terraform.ProviderScheduler - providerHandle terraform.ProviderHandle + logger logging.Logger } // Connect makes sure the underlying client is ready to issue requests to the @@ -94,49 +93,51 @@ func (c *Connector) Connect(ctx context.Context, mg xpresource.Managed) (managed if err != nil { return nil, errors.Wrap(err, errGetTerraformSetup) } - if ts.Scheduler != nil { - c.providerScheduler = ts.Scheduler - } ws, err := c.store.Workspace(ctx, &APISecretClient{kube: c.kube}, tr, ts, c.config) if err != nil { return nil, errors.Wrap(err, errGetWorkspace) } - if err := c.scheduleProvider(ws); err != nil { - return nil, errors.Wrap(err, errScheduleProvider) - } return &external{ - workspace: ws, - config: c.config, - callback: c.callback, + workspace: ws, + config: c.config, + callback: c.callback, + providerScheduler: ts.Scheduler, + providerHandle: ws.ProviderHandle, + logger: c.logger.WithValues("uid", mg.GetUID()), }, nil } -func (c *Connector) scheduleProvider(ws *terraform.Workspace) error { - if c.providerScheduler == nil || ws == nil { +type external struct { + workspace Workspace + config *config.Resource + callback CallbackProvider + providerScheduler terraform.ProviderScheduler + providerHandle terraform.ProviderHandle + logger logging.Logger +} + +func (e *external) scheduleProvider() error { + if e.providerScheduler == nil || e.workspace == nil { return nil } - inuse, attachmentConfig, err := c.providerScheduler.Start(ws.ProviderHandle) + inuse, attachmentConfig, err := e.providerScheduler.Start(e.providerHandle) if err != nil { - return errors.Wrap(err, "cannot schedule a shared provider for the workspace") + return errors.Wrap(err, errScheduleProvider) + } + if ps, ok := e.workspace.(ProviderSharer); ok { + ps.UseProvider(inuse, attachmentConfig) } - ws.UseSharedProvider(inuse, attachmentConfig) - c.providerHandle = ws.ProviderHandle return nil } -// Disconnect releases any resources held by the Connector. -func (c *Connector) Disconnect(_ context.Context) error { - if c.providerScheduler == nil { - return nil +func (e *external) stopProvider() { + if e.providerScheduler == nil { + return + } + if err := e.providerScheduler.Stop(e.providerHandle); err != nil { + e.logger.Info("ExternalClient failed to stop the native provider", "error", err) } - return errors.Wrap(c.providerScheduler.Stop(c.providerHandle), "cannot stop the shared provider for the workspace") -} - -type external struct { - workspace Workspace - config *config.Resource - callback CallbackProvider } func (e *external) Observe(ctx context.Context, mg xpresource.Managed) (managed.ExternalObservation, error) { //nolint:gocyclo @@ -144,6 +145,10 @@ func (e *external) Observe(ctx context.Context, mg xpresource.Managed) (managed. // and serial. // TODO(muvaf): Look for ways to reduce the cyclomatic complexity without // increasing the difficulty of understanding the flow. + if err := e.scheduleProvider(); err != nil { + return managed.ExternalObservation{}, errors.Wrapf(err, "cannot schedule a native provider during observe: %s", mg.GetUID()) + } + defer e.stopProvider() tr, ok := mg.(resource.Terraformed) if !ok { return managed.ExternalObservation{}, errors.New(errUnexpectedObject) @@ -258,6 +263,10 @@ func addTTR(mg xpresource.Managed) { } func (e *external) Create(ctx context.Context, mg xpresource.Managed) (managed.ExternalCreation, error) { + if err := e.scheduleProvider(); err != nil { + return managed.ExternalCreation{}, errors.Wrapf(err, "cannot schedule a native provider during create: %s", mg.GetUID()) + } + defer e.stopProvider() if e.config.UseAsync { return managed.ExternalCreation{}, errors.Wrap(e.workspace.ApplyAsync(e.callback.Apply(mg.GetName())), errStartAsyncApply) } @@ -285,6 +294,10 @@ func (e *external) Create(ctx context.Context, mg xpresource.Managed) (managed.E } func (e *external) Update(ctx context.Context, mg xpresource.Managed) (managed.ExternalUpdate, error) { + if err := e.scheduleProvider(); err != nil { + return managed.ExternalUpdate{}, errors.Wrapf(err, "cannot schedule a native provider during update: %s", mg.GetUID()) + } + defer e.stopProvider() if e.config.UseAsync { return managed.ExternalUpdate{}, errors.Wrap(e.workspace.ApplyAsync(e.callback.Apply(mg.GetName())), errStartAsyncApply) } @@ -304,6 +317,10 @@ func (e *external) Update(ctx context.Context, mg xpresource.Managed) (managed.E } func (e *external) Delete(ctx context.Context, mg xpresource.Managed) error { + if err := e.scheduleProvider(); err != nil { + return errors.Wrapf(err, "cannot schedule a native provider during delete: %s", mg.GetUID()) + } + defer e.stopProvider() if e.config.UseAsync { return errors.Wrap(e.workspace.DestroyAsync(e.callback.Destroy(mg.GetName())), errStartAsyncDestroy) } diff --git a/pkg/controller/interfaces.go b/pkg/controller/interfaces.go index e0af8141..d8978e57 100644 --- a/pkg/controller/interfaces.go +++ b/pkg/controller/interfaces.go @@ -26,6 +26,11 @@ type Workspace interface { Plan(context.Context) (terraform.PlanResult, error) } +// ProviderSharer shares a native provider process with the receiver. +type ProviderSharer interface { + UseProvider(inuse terraform.InUse, attachmentConfig string) +} + // Store is where we can get access to the Terraform workspace of given resource. type Store interface { Workspace(ctx context.Context, c resource.SecretClient, tr resource.Terraformed, ts terraform.Setup, cfg *config.Resource) (*terraform.Workspace, error) diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 14e3f4bb..31d73f0b 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -25,28 +25,6 @@ const ( promSysResource = "resource" ) -// ExecMode is the Terraform CLI execution mode label -type ExecMode int - -const ( - // ModeSync represents the synchronous execution mode - ModeSync ExecMode = iota - // ModeASync represents the asynchronous execution mode - ModeASync -) - -// String converts an execMode to string -func (em ExecMode) String() string { - switch em { - case ModeSync: - return "sync" - case ModeASync: - return "async" - default: - return "unknown" - } -} - var ( // CLITime is the Terraform CLI execution times histogram. CLITime = prometheus.NewHistogramVec(prometheus.HistogramOpts{ diff --git a/pkg/pipeline/templates/controller.go.tmpl b/pkg/pipeline/templates/controller.go.tmpl index cfc422c9..e3279001 100644 --- a/pkg/pipeline/templates/controller.go.tmpl +++ b/pkg/pipeline/templates/controller.go.tmpl @@ -37,7 +37,7 @@ func Setup(mgr ctrl.Manager, o tjcontroller.Options) error { } r := managed.NewReconciler(mgr, xpresource.ManagedKind({{ .TypePackageAlias }}{{ .CRD.Kind }}_GroupVersionKind), - managed.WithExternalConnectDisconnecter(tjcontroller.NewConnector(mgr.GetClient(), o.WorkspaceStore, o.SetupFn, o.Provider.Resources["{{ .ResourceType }}"], + managed.WithExternalConnecter(tjcontroller.NewConnector(mgr.GetClient(), o.WorkspaceStore, o.SetupFn, o.Provider.Resources["{{ .ResourceType }}"], tjcontroller.WithLogger(o.Logger), {{- if .UseAsync }} tjcontroller.WithCallbackProvider(tjcontroller.NewAPICallbacks(mgr, xpresource.ManagedKind({{ .TypePackageAlias }}{{ .CRD.Kind }}_GroupVersionKind))), {{- end}} diff --git a/pkg/terraform/provider_runner.go b/pkg/terraform/provider_runner.go index 3ecfebc4..d29446b4 100644 --- a/pkg/terraform/provider_runner.go +++ b/pkg/terraform/provider_runner.go @@ -165,6 +165,7 @@ func (sr *SharedProvider) Start() (string, error) { //nolint:gocyclo log.Debug("Shared gRPC server is running...", "reattachConfig", sr.reattachConfig) return sr.reattachConfig, nil } + log.Debug("Provider runner not yet started. Will fork a new native provider.") errCh := make(chan error, 1) reattachCh := make(chan string, 1) sr.stopCh = make(chan bool, 1) @@ -189,6 +190,7 @@ func (sr *SharedProvider) Start() (string, error) { //nolint:gocyclo errCh <- err return } + log.Debug("Forked new native provider.") scanner := bufio.NewScanner(stdout) for scanner.Scan() { t := scanner.Text() @@ -211,6 +213,7 @@ func (sr *SharedProvider) Start() (string, error) { //nolint:gocyclo errCh <- err case <-sr.stopCh: cmd.Stop() + log.Debug("Stopped the provider runner.") } }() @@ -229,6 +232,7 @@ func (sr *SharedProvider) Start() (string, error) { //nolint:gocyclo func (sr *SharedProvider) Stop() error { sr.mu.Lock() defer sr.mu.Unlock() + sr.logger.Debug("Attempting to stop the provider runner.") if sr.stopCh == nil { return errors.New("shared provider process not started yet") } diff --git a/pkg/terraform/provider_scheduler.go b/pkg/terraform/provider_scheduler.go index 98abfdf1..578c8fe0 100644 --- a/pkg/terraform/provider_scheduler.go +++ b/pkg/terraform/provider_scheduler.go @@ -215,7 +215,7 @@ func NewWorkspaceProviderScheduler(l logging.Logger, opts ...SharedProviderOptio } func (s *WorkspaceProviderScheduler) Start(h ProviderHandle) (InUse, string, error) { - s.logger.Debug("Starting workspace scoped shared provider runner.", "handle", h) + s.logger.Debug("Starting workspace scoped provider runner.", "handle", h) reattachConfig, err := s.runner.Start() return s.inUse, reattachConfig, errors.Wrap(err, "cannot start a workspace provider runner") } diff --git a/pkg/terraform/store.go b/pkg/terraform/store.go index 663128ec..22143703 100644 --- a/pkg/terraform/store.go +++ b/pkg/terraform/store.go @@ -86,6 +86,10 @@ func getSortedKeyValuePairs(parent string, m map[string]any) []string { cArr = append(cArr, getSortedKeyValuePairs(fmt.Sprintf("%s%s[%d].", parent, k, i), e)...) } result = append(result, fmt.Sprintf("%q:%q", parent+k, strings.Join(cArr, ","))) + case *string: + if t != nil { + result = append(result, fmt.Sprintf("%q:%q", parent+k, *t)) + } default: result = append(result, fmt.Sprintf("%q:%q", parent+k, t)) } @@ -240,7 +244,7 @@ func (ws *WorkspaceStore) Workspace(ctx context.Context, c resource.SecretClient return nil, errors.Wrap(err, "cannot write main tf file") } if isNeedProviderUpgrade { - out, err := w.runTF(ctx, metrics.ModeSync, "init", "-upgrade", "-input=false") + out, err := w.runTF(ctx, ModeSync, "init", "-upgrade", "-input=false") w.logger.Debug("init -upgrade ended", "out", ts.filterSensitiveInformation(string(out))) if err != nil { return w, errors.Wrapf(err, "cannot upgrade workspace: %s", ts.filterSensitiveInformation(string(out))) @@ -257,7 +261,7 @@ func (ws *WorkspaceStore) Workspace(ctx context.Context, c resource.SecretClient if !os.IsNotExist(err) { return w, nil } - out, err := w.runTF(ctx, metrics.ModeSync, "init", "-input=false") + out, err := w.runTF(ctx, ModeSync, "init", "-input=false") w.logger.Debug("init ended", "out", ts.filterSensitiveInformation(string(out))) return w, errors.Wrapf(err, "cannot init workspace: %s", ts.filterSensitiveInformation(string(out))) } @@ -279,7 +283,7 @@ func (ws *WorkspaceStore) Remove(obj xpresource.Object) error { } func (ws *WorkspaceStore) initMetrics() { - for _, mode := range []metrics.ExecMode{metrics.ModeSync, metrics.ModeASync} { + for _, mode := range []ExecMode{ModeSync, ModeASync} { for _, subcommand := range []string{"init", "apply", "destroy", "plan"} { metrics.CLIExecutions.WithLabelValues(subcommand, mode.String()).Set(0) } diff --git a/pkg/terraform/workspace.go b/pkg/terraform/workspace.go index e72caab2..ae560593 100644 --- a/pkg/terraform/workspace.go +++ b/pkg/terraform/workspace.go @@ -40,6 +40,28 @@ const ( fmtEnv = "%s=%s" ) +// ExecMode is the Terraform CLI execution mode label +type ExecMode int + +const ( + // ModeSync represents the synchronous execution mode + ModeSync ExecMode = iota + // ModeASync represents the asynchronous execution mode + ModeASync +) + +// String converts an execMode to string +func (em ExecMode) String() string { + switch em { + case ModeSync: + return "sync" + case ModeASync: + return "async" + default: + return "unknown" + } +} + // WorkspaceOption allows you to configure Workspace objects. type WorkspaceOption func(*Workspace) @@ -129,7 +151,8 @@ type Workspace struct { filterFn func(string) string } -func (w *Workspace) UseSharedProvider(inuse InUse, attachmentConfig string) { +// UseProvider shares a native provider with the receiver Workspace. +func (w *Workspace) UseProvider(inuse InUse, attachmentConfig string) { w.mu.Lock() defer w.mu.Unlock() // remove existing reattach configs @@ -152,9 +175,10 @@ func (w *Workspace) ApplyAsync(callback CallbackFn) error { return errors.Errorf("%s operation that started at %s is still running", w.LastOperation.Type, w.LastOperation.StartTime().String()) } ctx, cancel := context.WithDeadline(context.TODO(), w.LastOperation.StartTime().Add(defaultAsyncTimeout)) + w.providerInUse.Increment() go func() { defer cancel() - out, err := w.runTF(ctx, metrics.ModeASync, "apply", "-auto-approve", "-input=false", "-lock=false", "-json") + out, err := w.runTF(ctx, ModeASync, "apply", "-auto-approve", "-input=false", "-lock=false", "-json") if err != nil { err = tferrors.NewApplyFailed(out) } @@ -179,7 +203,7 @@ func (w *Workspace) Apply(ctx context.Context) (ApplyResult, error) { if w.LastOperation.IsRunning() { return ApplyResult{}, errors.Errorf("%s operation that started at %s is still running", w.LastOperation.Type, w.LastOperation.StartTime().String()) } - out, err := w.runTF(ctx, metrics.ModeSync, "apply", "-auto-approve", "-input=false", "-lock=false", "-json") + out, err := w.runTF(ctx, ModeSync, "apply", "-auto-approve", "-input=false", "-lock=false", "-json") w.logger.Debug("apply ended", "out", w.filterFn(string(out))) if err != nil { return ApplyResult{}, tferrors.NewApplyFailed(out) @@ -210,9 +234,10 @@ func (w *Workspace) DestroyAsync(callback CallbackFn) error { return errors.Errorf("%s operation that started at %s is still running", w.LastOperation.Type, w.LastOperation.StartTime().String()) } ctx, cancel := context.WithDeadline(context.TODO(), w.LastOperation.StartTime().Add(defaultAsyncTimeout)) + w.providerInUse.Increment() go func() { defer cancel() - out, err := w.runTF(ctx, metrics.ModeASync, "destroy", "-auto-approve", "-input=false", "-lock=false", "-json") + out, err := w.runTF(ctx, ModeASync, "destroy", "-auto-approve", "-input=false", "-lock=false", "-json") if err != nil { err = tferrors.NewDestroyFailed(out) } @@ -232,7 +257,7 @@ func (w *Workspace) Destroy(ctx context.Context) error { if w.LastOperation.IsRunning() { return errors.Errorf("%s operation that started at %s is still running", w.LastOperation.Type, w.LastOperation.StartTime().String()) } - out, err := w.runTF(ctx, metrics.ModeSync, "destroy", "-auto-approve", "-input=false", "-lock=false", "-json") + out, err := w.runTF(ctx, ModeSync, "destroy", "-auto-approve", "-input=false", "-lock=false", "-json") w.logger.Debug("destroy ended", "out", w.filterFn(string(out))) if err != nil { return tferrors.NewDestroyFailed(out) @@ -258,7 +283,7 @@ func (w *Workspace) Refresh(ctx context.Context) (RefreshResult, error) { case w.LastOperation.IsEnded(): defer w.LastOperation.Flush() } - out, err := w.runTF(ctx, metrics.ModeSync, "apply", "-refresh-only", "-auto-approve", "-input=false", "-lock=false", "-json") + out, err := w.runTF(ctx, ModeSync, "apply", "-refresh-only", "-auto-approve", "-input=false", "-lock=false", "-json") w.logger.Debug("refresh ended", "out", w.filterFn(string(out))) if err != nil { return RefreshResult{}, tferrors.NewRefreshFailed(out) @@ -290,7 +315,7 @@ func (w *Workspace) Plan(ctx context.Context) (PlanResult, error) { if w.LastOperation.IsRunning() { return PlanResult{}, errors.Errorf("%s operation that started at %s is still running", w.LastOperation.Type, w.LastOperation.StartTime().String()) } - out, err := w.runTF(ctx, metrics.ModeSync, "plan", "-refresh=false", "-input=false", "-lock=false", "-json") + out, err := w.runTF(ctx, ModeSync, "plan", "-refresh=false", "-input=false", "-lock=false", "-json") w.logger.Debug("plan ended", "out", w.filterFn(string(out))) if err != nil { return PlanResult{}, tferrors.NewPlanFailed(out) @@ -321,15 +346,17 @@ func (w *Workspace) Plan(ctx context.Context) (PlanResult, error) { }, nil } -func (w *Workspace) runTF(ctx context.Context, execMode metrics.ExecMode, args ...string) ([]byte, error) { +func (w *Workspace) runTF(ctx context.Context, execMode ExecMode, args ...string) ([]byte, error) { if len(args) < 1 { return nil, errors.New("args cannot be empty") } - w.mu.Lock() - defer w.mu.Unlock() w.logger.Debug("Running terraform", "args", args) - w.providerInUse.Increment() + if execMode == ModeSync { + w.providerInUse.Increment() + } defer w.providerInUse.Decrement() + w.mu.Lock() + defer w.mu.Unlock() cmd := w.executor.CommandContext(ctx, "terraform", args...) cmd.SetEnv(append(os.Environ(), w.env...)) cmd.SetDir(w.dir) From 76d344cacb2188f5a166414eac9af04496161914 Mon Sep 17 00:00:00 2001 From: Alper Rifat Ulucinar Date: Mon, 27 Mar 2023 18:00:14 +0300 Subject: [PATCH 4/4] Rename ttlBudget as ttlMargin Signed-off-by: Alper Rifat Ulucinar --- pkg/terraform/provider_scheduler.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/terraform/provider_scheduler.go b/pkg/terraform/provider_scheduler.go index 578c8fe0..0639b83b 100644 --- a/pkg/terraform/provider_scheduler.go +++ b/pkg/terraform/provider_scheduler.go @@ -30,7 +30,7 @@ const ( // InvalidProviderHandle is an invalid ProviderHandle. InvalidProviderHandle ProviderHandle = "" - ttlBudget = 0.1 + ttlMargin = 0.1 ) // ProviderScheduler represents a shared native plugin process scheduler. @@ -135,14 +135,14 @@ func NewSharedProviderScheduler(l logging.Logger, ttl int, opts ...SharedProvide } func (s *SharedProviderScheduler) Start(h ProviderHandle) (InUse, string, error) { - logger := s.logger.WithValues("handle", h, "ttl", s.ttl, "ttlBudget", ttlBudget) + logger := s.logger.WithValues("handle", h, "ttl", s.ttl, "ttlMargin", ttlMargin) s.mu.Lock() defer s.mu.Unlock() r := s.runners[h] switch { case r != nil && (r.invocationCount < s.ttl || r.inUse > 0): - if r.invocationCount > int(float64(s.ttl)*(1+ttlBudget)) { + if r.invocationCount > int(float64(s.ttl)*(1+ttlMargin)) { logger.Debug("Reuse budget has been exceeded. Caller will need to retry.") return nil, "", errors.Errorf("native provider reuse budget has been exceeded: invocationCount: %d, ttl: %d", r.invocationCount, s.ttl) }