From 02f8c0b35a584ed2cb92bc6006931562cf5fbb71 Mon Sep 17 00:00:00 2001 From: Eron Wright Date: Tue, 3 Oct 2023 15:27:30 -0700 Subject: [PATCH] Use output properties for await --- provider/pkg/await/await.go | 57 +++++++++-------- provider/pkg/await/awaiters.go | 84 +++++++++++++------------- provider/pkg/await/deployment.go | 46 +++++++------- provider/pkg/await/deployment_test.go | 10 +-- provider/pkg/await/ingress.go | 20 +++--- provider/pkg/await/job.go | 14 ++--- provider/pkg/await/pod.go | 12 ++-- provider/pkg/await/service.go | 12 ++-- provider/pkg/await/statefulset.go | 16 ++--- provider/pkg/await/statefulset_test.go | 6 +- provider/pkg/await/util_test.go | 1 - provider/pkg/metadata/naming.go | 10 ++- provider/pkg/provider/provider.go | 27 +++++---- 13 files changed, 159 insertions(+), 156 deletions(-) diff --git a/provider/pkg/await/await.go b/provider/pkg/await/await.go index 7a77b6b15e..3b41f2608b 100644 --- a/provider/pkg/await/await.go +++ b/provider/pkg/await/await.go @@ -90,17 +90,18 @@ type ReadConfig struct { type UpdateConfig struct { ProviderConfig - Previous *unstructured.Unstructured - Inputs *unstructured.Unstructured - Timeout float64 - Preview bool + OldInputs *unstructured.Unstructured + OldOutputs *unstructured.Unstructured + Inputs *unstructured.Unstructured + Timeout float64 + Preview bool // IgnoreChanges is a list of fields to ignore when diffing the old and new objects. IgnoreChanges []string } type DeleteConfig struct { ProviderConfig - Inputs *unstructured.Unstructured + Outputs *unstructured.Unstructured Name string Timeout float64 } @@ -162,6 +163,8 @@ func Creation(c CreateConfig) (*unstructured.Unstructured, error) { // nolint // https://github.com/kubernetes/kubernetes/blob/54889d581a35acf940d52a8a384cccaa0b597ddc/pkg/kubectl/cmd/apply/apply.go#L94 + patchResource := kinds.PatchQualifiedTypes.Has(c.URN.QualifiedType().String()) + var outputs *unstructured.Unstructured var client dynamic.ResourceInterface err := retry.SleepingRetry( @@ -187,7 +190,7 @@ func Creation(c CreateConfig) (*unstructured.Unstructured, error) { } } - if c.ServerSideApply { + if c.ServerSideApply && patchResource { force := patchForce(c.Inputs, nil, c.Preview) options := metav1.PatchOptions{ FieldManager: c.FieldManager, @@ -251,7 +254,7 @@ func Creation(c CreateConfig) (*unstructured.Unstructured, error) { id := fmt.Sprintf("%s/%s", c.Inputs.GetAPIVersion(), c.Inputs.GetKind()) if awaiter, exists := awaiters[id]; exists { if metadata.SkipAwaitLogic(c.Inputs) { - logger.V(1).Infof("Skipping await logic for %v", c.Inputs.GetName()) + logger.V(1).Infof("Skipping await logic for %v", outputs.GetName()) } else { if awaiter.awaitCreation != nil { conf := createAwaitConfig{ @@ -260,7 +263,6 @@ func Creation(c CreateConfig) (*unstructured.Unstructured, error) { urn: c.URN, initialAPIVersion: c.InitialAPIVersion, clientSet: c.ClientSet, - currentInputs: c.Inputs, currentOutputs: outputs, logger: c.DedupLogger, timeout: c.Timeout, @@ -280,7 +282,7 @@ func Creation(c CreateConfig) (*unstructured.Unstructured, error) { // If the client fails to get the live object for some reason, DO NOT return the error. This // will leak the fact that the object was successfully created. Instead, fall back to the // last-seen live object. - live, err := client.Get(c.Context, c.Inputs.GetName(), metav1.GetOptions{}) + live, err := client.Get(c.Context, outputs.GetName(), metav1.GetOptions{}) if err != nil { return outputs, nil } @@ -306,7 +308,7 @@ func Read(c ReadConfig) (*unstructured.Unstructured, error) { id := fmt.Sprintf("%s/%s", outputs.GetAPIVersion(), outputs.GetKind()) if awaiter, exists := awaiters[id]; exists { if metadata.SkipAwaitLogic(c.Inputs) { - logger.V(1).Infof("Skipping await logic for %v", c.Inputs.GetName()) + logger.V(1).Infof("Skipping await logic for %v", outputs.GetName()) } else { if awaiter.awaitRead != nil { conf := createAwaitConfig{ @@ -315,7 +317,6 @@ func Read(c ReadConfig) (*unstructured.Unstructured, error) { urn: c.URN, initialAPIVersion: c.InitialAPIVersion, clientSet: c.ClientSet, - currentInputs: c.Inputs, currentOutputs: outputs, logger: c.DedupLogger, clusterVersion: c.ClusterVersion, @@ -353,14 +354,14 @@ func Read(c ReadConfig) (*unstructured.Unstructured, error) { // [3]: // https://kubernetes.io/docs/reference/using-api/server-side-apply func Update(c UpdateConfig) (*unstructured.Unstructured, error) { - client, err := c.ClientSet.ResourceClientForObject(c.Inputs) + client, err := c.ClientSet.ResourceClientForObject(c.OldOutputs) if err != nil { return nil, err } // Get the "live" version of the last submitted object. This is necessary because the server may // have populated some fields automatically, updated status fields, and so on. - liveOldObj, err := client.Get(c.Context, c.Previous.GetName(), metav1.GetOptions{}) + liveOldObj, err := client.Get(c.Context, c.OldOutputs.GetName(), metav1.GetOptions{}) if err != nil { return nil, err } @@ -380,7 +381,7 @@ func Update(c UpdateConfig) (*unstructured.Unstructured, error) { id := fmt.Sprintf("%s/%s", c.Inputs.GetAPIVersion(), c.Inputs.GetKind()) if awaiter, exists := awaiters[id]; exists { if metadata.SkipAwaitLogic(c.Inputs) { - logger.V(1).Infof("Skipping await logic for %v", c.Inputs.GetName()) + logger.V(1).Infof("Skipping await logic for %v", currentOutputs.GetName()) } else { if awaiter.awaitUpdate != nil { conf := updateAwaitConfig{ @@ -390,13 +391,11 @@ func Update(c UpdateConfig) (*unstructured.Unstructured, error) { urn: c.URN, initialAPIVersion: c.InitialAPIVersion, clientSet: c.ClientSet, - currentInputs: c.Inputs, currentOutputs: currentOutputs, logger: c.DedupLogger, timeout: c.Timeout, clusterVersion: c.ClusterVersion, }, - lastInputs: c.Previous, lastOutputs: liveOldObj, } waitErr := awaiter.awaitUpdate(conf) @@ -411,12 +410,12 @@ func Update(c UpdateConfig) (*unstructured.Unstructured, error) { gvk := c.Inputs.GroupVersionKind() logger.V(3).Infof("Resource %s/%s/%s '%s.%s' patched and updated", gvk.Group, gvk.Version, - gvk.Kind, c.Inputs.GetNamespace(), c.Inputs.GetName()) + gvk.Kind, c.Inputs.GetNamespace(), currentOutputs.GetName()) // If the client fails to get the live object for some reason, DO NOT return the error. This // will leak the fact that the object was successfully created. Instead, fall back to the // last-seen live object. - live, err := client.Get(c.Context, c.Inputs.GetName(), metav1.GetOptions{}) + live, err := client.Get(c.Context, currentOutputs.GetName(), metav1.GetOptions{}) if err != nil { return currentOutputs, nil } @@ -450,7 +449,7 @@ func csaUpdate(c *UpdateConfig, liveOldObj *unstructured.Unstructured, client dy // optimistically rather than failing the update. _ = handleCSAIgnoreFields(c, liveOldObj) // Create merge patch (prefer strategic merge patch, fall back to JSON merge patch). - patch, patchType, _, err := openapi.PatchForResourceUpdate(c.Resources, c.Previous, c.Inputs, liveOldObj) + patch, patchType, _, err := openapi.PatchForResourceUpdate(c.Resources, c.OldInputs, c.Inputs, liveOldObj) if err != nil { return nil, err } @@ -462,7 +461,7 @@ func csaUpdate(c *UpdateConfig, liveOldObj *unstructured.Unstructured, client dy options.DryRun = []string{metav1.DryRunAll} } - return client.Patch(c.Context, c.Inputs.GetName(), patchType, patch, options) + return client.Patch(c.Context, liveOldObj.GetName(), patchType, patch, options) } // ssaUpdate handles the logic for updating a resource using server-side apply. @@ -490,7 +489,7 @@ func ssaUpdate(c *UpdateConfig, liveOldObj *unstructured.Unstructured, client dy options.DryRun = []string{metav1.DryRunAll} } - currentOutputs, err := client.Patch(c.Context, c.Inputs.GetName(), types.ApplyPatchType, objYAML, options) + currentOutputs, err := client.Patch(c.Context, liveOldObj.GetName(), types.ApplyPatchType, objYAML, options) if err != nil { if errors.IsConflict(err) { err = fmt.Errorf("Server-Side Apply field conflict detected. See %s for troubleshooting help\n: %w", @@ -543,7 +542,7 @@ func handleSSAIgnoreFields(c *UpdateConfig, liveOldObj *unstructured.Unstructure for _, f := range managedFields { s, err := fluxssa.FieldsToSet(*f.FieldsV1) if err != nil { - return fmt.Errorf("unable to parse managed fields from resource %q into fieldpath.Set: %w", c.Inputs.GetName(), err) + return fmt.Errorf("unable to parse managed fields from resource %q into fieldpath.Set: %w", liveOldObj.GetName(), err) } switch f.Manager { @@ -698,18 +697,18 @@ func Deletion(c DeleteConfig) error { } // Obtain client for the resource being deleted. - client, err := c.ClientSet.ResourceClientForObject(c.Inputs) + client, err := c.ClientSet.ResourceClientForObject(c.Outputs) if err != nil { return nilIfGVKDeleted(err) } patchResource := kinds.PatchQualifiedTypes.Has(c.URN.QualifiedType().String()) if c.ServerSideApply && patchResource { - err = ssa.Relinquish(c.Context, client, c.Inputs, c.FieldManager) + err = ssa.Relinquish(c.Context, client, c.Outputs, c.FieldManager) return err } - timeout := metadata.TimeoutDuration(c.Timeout, c.Inputs, 300) + timeout := metadata.TimeoutDuration(c.Timeout, c.Outputs, 300) timeoutSeconds := int64(timeout.Seconds()) listOpts := metav1.ListOptions{ FieldSelector: fields.OneTermEqualSelector("metadata.name", c.Name).String(), @@ -731,10 +730,10 @@ func Deletion(c DeleteConfig) error { // if we don't have an entry for the resource type; in the event that we do, but the await logic // is blank, simply do nothing instead of logging. var waitErr error - id := fmt.Sprintf("%s/%s", c.Inputs.GetAPIVersion(), c.Inputs.GetKind()) + id := fmt.Sprintf("%s/%s", c.Outputs.GetAPIVersion(), c.Outputs.GetKind()) if awaiter, exists := awaiters[id]; exists && awaiter.awaitDeletion != nil { - if metadata.SkipAwaitLogic(c.Inputs) { - logger.V(1).Infof("Skipping await logic for %v", c.Inputs.GetName()) + if metadata.SkipAwaitLogic(c.Outputs) { + logger.V(1).Infof("Skipping await logic for %v", c.Name) } else { waitErr = awaiter.awaitDeletion(deleteAwaitConfig{ createAwaitConfig: createAwaitConfig{ @@ -743,7 +742,7 @@ func Deletion(c DeleteConfig) error { urn: c.URN, initialAPIVersion: c.InitialAPIVersion, clientSet: c.ClientSet, - currentInputs: c.Inputs, + currentOutputs: c.Outputs, logger: c.DedupLogger, timeout: c.Timeout, clusterVersion: c.ClusterVersion, diff --git a/provider/pkg/await/awaiters.go b/provider/pkg/await/awaiters.go index 7dbde44d25..48ac9d5a89 100644 --- a/provider/pkg/await/awaiters.go +++ b/provider/pkg/await/awaiters.go @@ -51,7 +51,6 @@ type createAwaitConfig struct { initialAPIVersion string logger *logging.DedupLogger clientSet *clients.DynamicClientSet - currentInputs *unstructured.Unstructured currentOutputs *unstructured.Unstructured timeout float64 clusterVersion *cluster.ServerVersion @@ -73,7 +72,6 @@ func (cac *createAwaitConfig) logMessage(message checkerlog.Message) { // reasonably efficient. type updateAwaitConfig struct { createAwaitConfig - lastInputs *unstructured.Unstructured lastOutputs *unstructured.Unstructured } @@ -301,19 +299,19 @@ func untilAppsDeploymentDeleted(config deleteAwaitConfig) error { specReplicas, _ := deploymentSpecReplicas(d) return watcher.RetryableError( - fmt.Errorf("deployment %q still exists (%d / %d replicas exist)", config.currentInputs.GetName(), + fmt.Errorf("deployment %q still exists (%d / %d replicas exist)", d.GetName(), currReplicas, specReplicas)) } // Wait until all replicas are gone. 10 minutes should be enough for ~10k replicas. - timeout := metadata.TimeoutDuration(config.timeout, config.currentInputs, 600) - err := watcher.ForObject(config.ctx, config.clientForResource, config.currentInputs.GetName()). + timeout := metadata.TimeoutDuration(config.timeout, config.currentOutputs, 600) + err := watcher.ForObject(config.ctx, config.clientForResource, config.currentOutputs.GetName()). RetryUntil(deploymentMissing, timeout) if err != nil { return err } - logger.V(3).Infof("Deployment '%s' deleted", config.currentInputs.GetName()) + logger.V(3).Infof("Deployment '%s' deleted", config.currentOutputs.GetName()) return nil } @@ -344,19 +342,19 @@ func untilAppsStatefulSetDeleted(config deleteAwaitConfig) error { specReplicas, _ := specReplicas(d) return watcher.RetryableError( - fmt.Errorf("StatefulSet %q still exists (%d / %d replicas exist)", config.currentInputs.GetName(), + fmt.Errorf("StatefulSet %q still exists (%d / %d replicas exist)", d.GetName(), currReplicas, specReplicas)) } // Wait until all replicas are gone. 10 minutes should be enough for ~10k replicas. - timeout := metadata.TimeoutDuration(config.timeout, config.currentInputs, 600) - err := watcher.ForObject(config.ctx, config.clientForResource, config.currentInputs.GetName()). + timeout := metadata.TimeoutDuration(config.timeout, config.currentOutputs, 600) + err := watcher.ForObject(config.ctx, config.clientForResource, config.currentOutputs.GetName()). RetryUntil(statefulsetmissing, timeout) if err != nil { return err } - logger.V(3).Infof("StatefulSet %q deleted", config.currentInputs.GetName()) + logger.V(3).Infof("StatefulSet %q deleted", config.currentOutputs.GetName()) return nil } @@ -375,12 +373,12 @@ func untilBatchV1JobDeleted(config deleteAwaitConfig) error { return err } - e := fmt.Errorf("job %q still exists", config.currentInputs.GetName()) + e := fmt.Errorf("job %q still exists", pod.GetName()) return watcher.RetryableError(e) } - timeout := metadata.TimeoutDuration(config.timeout, config.currentInputs, 300) - return watcher.ForObject(config.ctx, config.clientForResource, config.currentInputs.GetName()). + timeout := metadata.TimeoutDuration(config.timeout, config.currentOutputs, 300) + return watcher.ForObject(config.ctx, config.clientForResource, config.currentOutputs.GetName()). RetryUntil(jobMissingOrKilled, timeout) } @@ -396,22 +394,22 @@ func untilCoreV1NamespaceDeleted(config deleteAwaitConfig) error { return nil } else if err != nil { logger.V(3).Infof("Received error deleting namespace %q: %#v", - config.currentInputs.GetName(), err) + ns.GetName(), err) return err } statusPhase, _ := openapi.Pluck(ns.Object, "status", "phase") - logger.V(3).Infof("Namespace %q status received: %#v", config.currentInputs.GetName(), statusPhase) + logger.V(3).Infof("Namespace %q status received: %#v", ns.GetName(), statusPhase) if statusPhase == "" { return nil } return watcher.RetryableError(fmt.Errorf("namespace %q still exists (%v)", - config.currentInputs.GetName(), statusPhase)) + ns.GetName(), statusPhase)) } - timeout := metadata.TimeoutDuration(config.timeout, config.currentInputs, 300) - return watcher.ForObject(config.ctx, config.clientForResource, config.currentInputs.GetName()). + timeout := metadata.TimeoutDuration(config.timeout, config.currentOutputs, 300) + return watcher.ForObject(config.ctx, config.clientForResource, config.currentOutputs.GetName()). RetryUntil(namespaceMissingOrKilled, timeout) } @@ -433,11 +431,11 @@ func untilCoreV1PersistentVolumeInitialized(c createAwaitConfig) error { return statusPhase == statusAvailable || statusPhase == statusBound } - client, err := c.clientSet.ResourceClient(c.currentInputs.GroupVersionKind(), c.currentInputs.GetNamespace()) + client, err := c.clientSet.ResourceClient(c.currentOutputs.GroupVersionKind(), c.currentOutputs.GetNamespace()) if err != nil { return err } - return watcher.ForObject(c.ctx, client, c.currentInputs.GetName()). + return watcher.ForObject(c.ctx, client, c.currentOutputs.GetName()). WatchUntil(pvAvailableOrBound, 5*time.Minute) } @@ -454,11 +452,11 @@ func untilCoreV1PersistentVolumeClaimBound(c createAwaitConfig) error { return statusPhase == statusBound } - client, err := c.clientSet.ResourceClient(c.currentInputs.GroupVersionKind(), c.currentInputs.GetNamespace()) + client, err := c.clientSet.ResourceClient(c.currentOutputs.GroupVersionKind(), c.currentOutputs.GetNamespace()) if err != nil { return err } - return watcher.ForObject(c.ctx, client, c.currentInputs.GetName()). + return watcher.ForObject(c.ctx, client, c.currentOutputs.GetName()). WatchUntil(pvcBound, 5*time.Minute) } @@ -478,13 +476,13 @@ func untilCoreV1PodDeleted(config deleteAwaitConfig) error { } statusPhase, _ := openapi.Pluck(pod.Object, "status", "phase") - logger.V(3).Infof("Current state of pod %q: %#v", config.currentInputs.GetName(), statusPhase) - e := fmt.Errorf("pod %q still exists (%v)", config.currentInputs.GetName(), statusPhase) + logger.V(3).Infof("Current state of pod %q: %#v", pod.GetName(), statusPhase) + e := fmt.Errorf("pod %q still exists (%v)", pod.GetName(), statusPhase) return watcher.RetryableError(e) } - timeout := metadata.TimeoutDuration(config.timeout, config.currentInputs, 300) - return watcher.ForObject(config.ctx, config.clientForResource, config.currentInputs.GetName()). + timeout := metadata.TimeoutDuration(config.timeout, config.currentOutputs, 300) + return watcher.ForObject(config.ctx, config.clientForResource, config.currentOutputs.GetName()). RetryUntil(podMissingOrKilled, timeout) } @@ -503,13 +501,13 @@ func untilCoreV1ReplicationControllerInitialized(c createAwaitConfig) error { return openapi.Pluck(rc.Object, "status", "availableReplicas") } - name := c.currentInputs.GetName() + name := c.currentOutputs.GetName() - replicas, _ := openapi.Pluck(c.currentInputs.Object, "spec", "replicas") + replicas, _ := openapi.Pluck(c.currentOutputs.Object, "spec", "replicas") logger.V(3).Infof("Waiting for replication controller %q to schedule '%v' replicas", name, replicas) - client, err := c.clientSet.ResourceClient(c.currentInputs.GroupVersionKind(), c.currentInputs.GetNamespace()) + client, err := c.clientSet.ResourceClient(c.currentOutputs.GroupVersionKind(), c.currentOutputs.GetNamespace()) if err != nil { return err } @@ -525,8 +523,8 @@ func untilCoreV1ReplicationControllerInitialized(c createAwaitConfig) error { // but that means checking each pod status separately (which can be expensive at scale) // as there's no aggregate data available from the API - logger.V(3).Infof("Replication controller %q initialized: %#v", c.currentInputs.GetName(), - c.currentInputs) + logger.V(3).Infof("Replication controller %q initialized: %#v", name, + c.currentOutputs) return nil } @@ -559,18 +557,18 @@ func untilCoreV1ReplicationControllerDeleted(config deleteAwaitConfig) error { return watcher.RetryableError( fmt.Errorf("ReplicationController %q still exists (%d / %d replicas exist)", - config.currentInputs.GetName(), currReplicas, specReplicas)) + rc.GetName(), currReplicas, specReplicas)) } // Wait until all replicas are gone. 10 minutes should be enough for ~10k replicas. - timeout := metadata.TimeoutDuration(config.timeout, config.currentInputs, 600) - err := watcher.ForObject(config.ctx, config.clientForResource, config.currentInputs.GetName()). + timeout := metadata.TimeoutDuration(config.timeout, config.currentOutputs, 600) + err := watcher.ForObject(config.ctx, config.clientForResource, config.currentOutputs.GetName()). RetryUntil(rcMissing, timeout) if err != nil { return err } - logger.V(3).Infof("ReplicationController %q deleted", config.currentInputs.GetName()) + logger.V(3).Infof("ReplicationController %q deleted", config.currentOutputs.GetName()) return nil } @@ -589,8 +587,8 @@ func untilCoreV1ResourceQuotaInitialized(c createAwaitConfig) error { hard, hardIsMap := hardRaw.(map[string]any) hardStatus, hardStatusIsMap := hardStatusRaw.(map[string]any) if hardIsMap && hardStatusIsMap && reflect.DeepEqual(hard, hardStatus) { - logger.V(3).Infof("ResourceQuota %q initialized: %#v", c.currentInputs.GetName(), - c.currentInputs) + logger.V(3).Infof("ResourceQuota %q initialized: %#v", quota.GetName(), + quota) return true } logger.V(3).Infof("Quotas don't match after creation.\nExpected: %#v\nGiven: %#v", @@ -598,17 +596,17 @@ func untilCoreV1ResourceQuotaInitialized(c createAwaitConfig) error { return false } - client, err := c.clientSet.ResourceClient(c.currentInputs.GroupVersionKind(), c.currentInputs.GetNamespace()) + client, err := c.clientSet.ResourceClient(c.currentOutputs.GroupVersionKind(), c.currentOutputs.GetNamespace()) if err != nil { return err } - return watcher.ForObject(c.ctx, client, c.currentInputs.GetName()). + return watcher.ForObject(c.ctx, client, c.currentOutputs.GetName()). WatchUntil(rqInitialized, 1*time.Minute) } func untilCoreV1ResourceQuotaUpdated(c updateAwaitConfig) error { - oldSpec, _ := openapi.Pluck(c.lastInputs.Object, "spec") - newSpec, _ := openapi.Pluck(c.currentInputs.Object, "spec") + oldSpec, _ := openapi.Pluck(c.lastOutputs.Object, "spec") + newSpec, _ := openapi.Pluck(c.currentOutputs.Object, "spec") if !reflect.DeepEqual(oldSpec, newSpec) { return untilCoreV1ResourceQuotaInitialized(c.createAwaitConfig) } @@ -626,7 +624,7 @@ func untilCoreV1SecretInitialized(c createAwaitConfig) error { // Some types secrets do not have data available immediately and therefore are not considered initialized where data map is empty. // For example service-account-token as described in the docs: https://kubernetes.io/docs/reference/access-authn-authz/service-accounts-admin/#to-create-additional-api-tokens // - secretType, _ := openapi.Pluck(c.currentInputs.Object, "type") + secretType, _ := openapi.Pluck(c.currentOutputs.Object, "type") // Other secret types are not generated by controller therefore we do not need to create a watcher for them. // nolint:gosec @@ -672,7 +670,7 @@ func untilCoreV1ServiceAccountInitialized(c createAwaitConfig) error { // secrets array (i.e., in addition to the secrets specified by the user). // - specSecrets, _ := openapi.Pluck(c.currentInputs.Object, "secrets") + specSecrets, _ := openapi.Pluck(c.currentOutputs.Object, "secrets") var numSpecSecrets int if specSecretsArr, isArr := specSecrets.([]any); isArr { numSpecSecrets = len(specSecretsArr) diff --git a/provider/pkg/await/deployment.go b/provider/pkg/await/deployment.go index ee55a7f7e0..88b828f2bf 100644 --- a/provider/pkg/await/deployment.go +++ b/provider/pkg/await/deployment.go @@ -228,7 +228,7 @@ func (dia *deploymentInitAwaiter) Await() error { aggregateErrorTicker := time.NewTicker(10 * time.Second) defer aggregateErrorTicker.Stop() - timeout := metadata.TimeoutDuration(dia.config.timeout, dia.config.currentInputs, DefaultDeploymentTimeoutMins*60) + timeout := metadata.TimeoutDuration(dia.config.timeout, dia.config.currentOutputs, DefaultDeploymentTimeoutMins*60) return dia.await( deploymentEvents, @@ -248,7 +248,7 @@ func (dia *deploymentInitAwaiter) Read() error { // Get live versions of Deployment, ReplicaSets, and Pods. deployment, err := deploymentClient.Get(dia.config.ctx, - dia.config.currentInputs.GetName(), + dia.config.currentOutputs.GetName(), metav1.GetOptions{}) if err != nil { // IMPORTANT: Do not wrap this error! If this is a 404, the provider need to know so that it @@ -416,7 +416,7 @@ func (dia *deploymentInitAwaiter) checkAndLogStatus() bool { } func (dia *deploymentInitAwaiter) processDeploymentEvent(event watch.Event) { - inputDeploymentName := dia.config.currentInputs.GetName() + currentDeploymentName := dia.config.currentOutputs.GetName() deployment, isUnstructured := event.Object.(*unstructured.Unstructured) if !isUnstructured { @@ -429,7 +429,7 @@ func (dia *deploymentInitAwaiter) processDeploymentEvent(event watch.Event) { dia.deploymentErrors = map[string]string{} // Do nothing if this is not the Deployment we're waiting for. - if deployment.GetName() != inputDeploymentName { + if deployment.GetName() != currentDeploymentName { return } @@ -543,11 +543,11 @@ func (dia *deploymentInitAwaiter) processReplicaSetEvent(event watch.Event) { logger.V(3).Infof("Received update for ReplicaSet %q", rs.GetName()) // Check whether this ReplicaSet was created by our Deployment. - if !isOwnedBy(rs, dia.config.currentInputs) { + if !isOwnedBy(rs, dia.config.currentOutputs) { return } - logger.V(3).Infof("ReplicaSet %q is owned by %q", rs.GetName(), dia.config.currentInputs.GetName()) + logger.V(3).Infof("ReplicaSet %q is owned by %q", rs.GetName(), dia.config.currentOutputs.GetName()) // If Pod was deleted, remove it from our aggregated checkers. generation := rs.GetAnnotations()[revision] @@ -560,9 +560,9 @@ func (dia *deploymentInitAwaiter) processReplicaSetEvent(event watch.Event) { } func (dia *deploymentInitAwaiter) checkReplicaSetStatus() { - inputs := dia.config.currentInputs + outputs := dia.config.currentOutputs - logger.V(3).Infof("Checking ReplicaSet status for Deployment %q", inputs.GetName()) + logger.V(3).Infof("Checking ReplicaSet status for Deployment %q", outputs.GetName()) rs, updatedReplicaSetCreated := dia.replicaSets[dia.replicaSetGeneration] if dia.replicaSetGeneration == "0" || !updatedReplicaSetCreated { @@ -570,14 +570,14 @@ func (dia *deploymentInitAwaiter) checkReplicaSetStatus() { } logger.V(3).Infof("Deployment %q has generation %q, which corresponds to ReplicaSet %q", - inputs.GetName(), dia.replicaSetGeneration, rs.GetName()) + outputs.GetName(), dia.replicaSetGeneration, rs.GetName()) var lastRevision string if outputs := dia.config.lastOutputs; outputs != nil { lastRevision = outputs.GetAnnotations()[revision] } - logger.V(3).Infof("The last generation of Deployment %q was %q", inputs.GetName(), lastRevision) + logger.V(3).Infof("The last generation of Deployment %q was %q", outputs.GetName(), lastRevision) // NOTE: Check `.spec.replicas` in the live `ReplicaSet` instead of the last input `Deployment`, // since this is the plan of record. This protects against (e.g.) a user running `kubectl scale` @@ -686,18 +686,18 @@ func (dia *deploymentInitAwaiter) checkReplicaSetStatus() { } func (dia *deploymentInitAwaiter) changeTriggeredRollout() bool { - if dia.config.lastInputs == nil { + if dia.config.lastOutputs == nil { return true } fields, err := openapi.PropertiesChanged( - dia.config.lastInputs.Object, dia.config.currentInputs.Object, + dia.config.lastOutputs.Object, dia.config.currentOutputs.Object, []string{ ".spec.template.spec", }) if err != nil { logger.V(3).Infof("Failed to check whether Pod template for Deployment %q changed", - dia.config.currentInputs.GetName()) + dia.config.currentOutputs.GetName()) return false } @@ -705,9 +705,9 @@ func (dia *deploymentInitAwaiter) changeTriggeredRollout() bool { } func (dia *deploymentInitAwaiter) checkPersistentVolumeClaimStatus() { - inputs := dia.config.currentInputs + currentOutputs := dia.config.currentOutputs - logger.V(3).Infof("Checking PersistentVolumeClaims status for Deployment %q", inputs.GetName()) + logger.V(3).Infof("Checking PersistentVolumeClaims status for Deployment %q", currentOutputs.GetName()) allPVCsReady := true for _, pvc := range dia.pvcs { @@ -881,31 +881,31 @@ func (dia *deploymentInitAwaiter) makeClients() ( deploymentClient, replicaSetClient, podClient, pvcClient dynamic.ResourceInterface, err error, ) { deploymentClient, err = clients.ResourceClient( - kinds.Deployment, dia.config.currentInputs.GetNamespace(), dia.config.clientSet) + kinds.Deployment, dia.config.currentOutputs.GetNamespace(), dia.config.clientSet) if err != nil { err = errors.Wrapf(err, "Could not make client to watch Deployment %q", - dia.config.currentInputs.GetName()) + dia.config.currentOutputs.GetName()) return nil, nil, nil, nil, err } replicaSetClient, err = clients.ResourceClient( - kinds.ReplicaSet, dia.config.currentInputs.GetNamespace(), dia.config.clientSet) + kinds.ReplicaSet, dia.config.currentOutputs.GetNamespace(), dia.config.clientSet) if err != nil { err = errors.Wrapf(err, "Could not make client to watch ReplicaSets associated with Deployment %q", - dia.config.currentInputs.GetName()) + dia.config.currentOutputs.GetName()) return nil, nil, nil, nil, err } podClient, err = clients.ResourceClient( - kinds.Pod, dia.config.currentInputs.GetNamespace(), dia.config.clientSet) + kinds.Pod, dia.config.currentOutputs.GetNamespace(), dia.config.clientSet) if err != nil { err = errors.Wrapf(err, "Could not make client to watch Pods associated with Deployment %q", - dia.config.currentInputs.GetName()) + dia.config.currentOutputs.GetName()) return nil, nil, nil, nil, err } pvcClient, err = clients.ResourceClient( - kinds.PersistentVolumeClaim, dia.config.currentInputs.GetNamespace(), dia.config.clientSet) + kinds.PersistentVolumeClaim, dia.config.currentOutputs.GetNamespace(), dia.config.clientSet) if err != nil { err = errors.Wrapf(err, "Could not make client to watch PVCs associated with Deployment %q", - dia.config.currentInputs.GetName()) + dia.config.currentOutputs.GetName()) return nil, nil, nil, nil, err } diff --git a/provider/pkg/await/deployment_test.go b/provider/pkg/await/deployment_test.go index 34d9edd9b8..5495942352 100644 --- a/provider/pkg/await/deployment_test.go +++ b/provider/pkg/await/deployment_test.go @@ -702,14 +702,14 @@ func Test_Apps_Deployment_Without_PersistentVolumeClaims(t *testing.T) { } } -type setLastInputs func(obj *unstructured.Unstructured) +type setLastOutputs func(obj *unstructured.Unstructured) func Test_Apps_Deployment_MultipleUpdates(t *testing.T) { tests := []struct { description string inputs func() *unstructured.Unstructured firstUpdate func(deployments, replicaSets, pods chan watch.Event, timeout chan time.Time, - setLast setLastInputs) + setLast setLastOutputs) secondUpdate func(deployments, replicaSets, pods chan watch.Event, timeout chan time.Time) expectedError error }{ @@ -718,13 +718,13 @@ func Test_Apps_Deployment_MultipleUpdates(t *testing.T) { inputs: regressionDeploymentScaled3Input, firstUpdate: func( deployments, replicaSets, pods chan watch.Event, timeout chan time.Time, - setLast setLastInputs, + setLast setLastOutputs, ) { computed := regressionDeploymentScaled3() deployments <- watchAddedEvent(computed) replicaSets <- watchAddedEvent(regressionReplicaSetScaled3()) - setLast(regressionDeploymentScaled3Input()) + setLast(computed) // Timeout. Success. timeout <- time.Now() }, @@ -752,7 +752,7 @@ func Test_Apps_Deployment_MultipleUpdates(t *testing.T) { period := make(chan time.Time) go test.firstUpdate(deployments, replicaSets, pods, timeout, func(obj *unstructured.Unstructured) { - awaiter.config.lastInputs = obj + awaiter.config.lastOutputs = obj }) err := awaiter.await(deployments, replicaSets, pods, pvcs, timeout, period) diff --git a/provider/pkg/await/ingress.go b/provider/pkg/await/ingress.go index 0b136fc920..510497a140 100644 --- a/provider/pkg/await/ingress.go +++ b/provider/pkg/await/ingress.go @@ -116,7 +116,7 @@ func (iia *ingressInitAwaiter) Await() error { defer close(stopper) informerFactory := informers.NewInformerFactory(iia.config.clientSet, - informers.WithNamespaceOrDefault(iia.config.currentInputs.GetNamespace())) + informers.WithNamespaceOrDefault(iia.config.currentOutputs.GetNamespace())) informerFactory.Start(stopper) ingressEvents := make(chan watch.Event) @@ -144,7 +144,7 @@ func (iia *ingressInitAwaiter) Await() error { } go serviceInformer.Informer().Run(stopper) - timeout := metadata.TimeoutDuration(iia.config.timeout, iia.config.currentInputs, DefaultIngressTimeoutMins*60) + timeout := metadata.TimeoutDuration(iia.config.timeout, iia.config.currentOutputs, DefaultIngressTimeoutMins*60) return iia.await(ingressEvents, serviceEvents, endpointsEvents, make(chan struct{}), time.After(60*time.Second), time.After(timeout)) } @@ -155,7 +155,7 @@ func (iia *ingressInitAwaiter) Read() error { } // Get live versions of Ingress. - ingress, err := ingressClient.Get(iia.config.ctx, iia.config.currentInputs.GetName(), metav1.GetOptions{}) + ingress, err := ingressClient.Get(iia.config.ctx, iia.config.currentOutputs.GetName(), metav1.GetOptions{}) if err != nil { // IMPORTANT: Do not wrap this error! If this is a 404, the provider need to know so that it // can mark the deployment as having been deleted. @@ -288,7 +288,7 @@ func (iia *ingressInitAwaiter) processServiceEvent(event watch.Event) { } func (iia *ingressInitAwaiter) processIngressEvent(event watch.Event) { - inputIngressName := iia.config.currentInputs.GetName() + inputIngressName := iia.config.currentOutputs.GetName() ingress, isUnstructured := event.Object.(*unstructured.Unstructured) if !isUnstructured { @@ -509,25 +509,25 @@ func (iia *ingressInitAwaiter) makeClients() ( ingressClient, endpointsClient, servicesClient dynamic.ResourceInterface, err error, ) { ingressClient, err = clients.ResourceClient( - kinds.Ingress, iia.config.currentInputs.GetNamespace(), iia.config.clientSet) + kinds.Ingress, iia.config.currentOutputs.GetNamespace(), iia.config.clientSet) if err != nil { return nil, nil, nil, errors.Wrapf(err, "Could not make client to watch Ingress %q", - iia.config.currentInputs.GetName()) + iia.config.currentOutputs.GetName()) } endpointsClient, err = clients.ResourceClient( - kinds.Endpoints, iia.config.currentInputs.GetNamespace(), iia.config.clientSet) + kinds.Endpoints, iia.config.currentOutputs.GetNamespace(), iia.config.clientSet) if err != nil { return nil, nil, nil, errors.Wrapf(err, "Could not make client to watch Endpoints associated with Ingress %q", - iia.config.currentInputs.GetName()) + iia.config.currentOutputs.GetName()) } servicesClient, err = clients.ResourceClient( - kinds.Service, iia.config.currentInputs.GetNamespace(), iia.config.clientSet) + kinds.Service, iia.config.currentOutputs.GetNamespace(), iia.config.clientSet) if err != nil { return nil, nil, nil, errors.Wrapf(err, "Could not make client to watch Services associated with Ingress %q", - iia.config.currentInputs.GetName()) + iia.config.currentOutputs.GetName()) } return diff --git a/provider/pkg/await/job.go b/provider/pkg/await/job.go index 76c42a1e6d..b6e8af09f3 100644 --- a/provider/pkg/await/job.go +++ b/provider/pkg/await/job.go @@ -102,7 +102,7 @@ func (jia *jobInitAwaiter) Await() error { defer close(stopper) informerFactory := informers.NewInformerFactory(jia.config.clientSet, - informers.WithNamespaceOrDefault(jia.config.currentInputs.GetNamespace())) + informers.WithNamespaceOrDefault(jia.config.currentOutputs.GetNamespace())) informerFactory.Start(stopper) jobEvents := make(chan watch.Event) @@ -123,7 +123,7 @@ func (jia *jobInitAwaiter) Await() error { podAggregator.Start(podEvents) defer podAggregator.Stop() - timeout := metadata.TimeoutDuration(jia.config.timeout, jia.config.currentInputs, DefaultJobTimeoutMins*60) + timeout := metadata.TimeoutDuration(jia.config.timeout, jia.config.currentOutputs, DefaultJobTimeoutMins*60) for { if jia.ready { return nil @@ -156,18 +156,18 @@ func (jia *jobInitAwaiter) Read() error { stopper := make(chan struct{}) defer close(stopper) - namespace := jia.config.currentInputs.GetNamespace() + namespace := jia.config.currentOutputs.GetNamespace() informerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(jia.config.clientSet.GenericClient, 60*time.Second, namespace, nil) informerFactory.Start(stopper) - jobClient, err := clients.ResourceClient(kinds.Job, jia.config.currentInputs.GetNamespace(), jia.config.clientSet) + jobClient, err := clients.ResourceClient(kinds.Job, jia.config.currentOutputs.GetNamespace(), jia.config.clientSet) if err != nil { return errors.Wrapf(err, "Could not make client to get Job %q", - jia.config.currentInputs.GetName()) + jia.config.currentOutputs.GetName()) } // Get live version of Job. - job, err := jobClient.Get(jia.config.ctx, jia.config.currentInputs.GetName(), metav1.GetOptions{}) + job, err := jobClient.Get(jia.config.ctx, jia.config.currentOutputs.GetName(), metav1.GetOptions{}) if err != nil { // IMPORTANT: Do not wrap this error! If this is a 404, the provider need to know so that it // can mark the Pod as having been deleted. @@ -212,7 +212,7 @@ func (jia *jobInitAwaiter) processJobEvent(event watch.Event) error { } // Do nothing if this is not the job we're waiting for. - if job.GetName() != jia.config.currentInputs.GetName() { + if job.GetName() != jia.config.currentOutputs.GetName() { return nil } diff --git a/provider/pkg/await/pod.go b/provider/pkg/await/pod.go index 07a23068e1..d9689a4cf2 100644 --- a/provider/pkg/await/pod.go +++ b/provider/pkg/await/pod.go @@ -150,7 +150,7 @@ func (pia *podInitAwaiter) Await() error { defer close(stopper) informerFactory := informers.NewInformerFactory(pia.config.clientSet, - informers.WithNamespaceOrDefault(pia.config.currentInputs.GetNamespace())) + informers.WithNamespaceOrDefault(pia.config.currentOutputs.GetNamespace())) informerFactory.Start(stopper) podEvents := make(chan watch.Event) @@ -160,7 +160,7 @@ func (pia *podInitAwaiter) Await() error { } go podInformer.Informer().Run(stopper) - timeout := metadata.TimeoutDuration(pia.config.timeout, pia.config.currentInputs, DefaultPodTimeoutMins*60) + timeout := metadata.TimeoutDuration(pia.config.timeout, pia.config.currentOutputs, DefaultPodTimeoutMins*60) for { if pia.ready { return nil @@ -187,14 +187,14 @@ func (pia *podInitAwaiter) Await() error { func (pia *podInitAwaiter) Read() error { podClient, err := clients.ResourceClient( - kinds.Pod, pia.config.currentInputs.GetNamespace(), pia.config.clientSet) + kinds.Pod, pia.config.currentOutputs.GetNamespace(), pia.config.clientSet) if err != nil { return errors.Wrapf(err, "Could not make client to get Pod %q", - pia.config.currentInputs.GetName()) + pia.config.currentOutputs.GetName()) } // Get live version of Pod. - pod, err := podClient.Get(pia.config.ctx, pia.config.currentInputs.GetName(), metav1.GetOptions{}) + pod, err := podClient.Get(pia.config.ctx, pia.config.currentOutputs.GetName(), metav1.GetOptions{}) if err != nil { // IMPORTANT: Do not wrap this error! If this is a 404, the provider need to know so that it // can mark the Pod as having been deleted. @@ -226,7 +226,7 @@ func (pia *podInitAwaiter) processPodEvent(event watch.Event) { } // Do nothing if this is not the pod we're waiting for. - if pod.GetName() != pia.config.currentInputs.GetName() { + if pod.GetName() != pia.config.currentOutputs.GetName() { return } diff --git a/provider/pkg/await/service.go b/provider/pkg/await/service.go index b33739992c..4594922e0f 100644 --- a/provider/pkg/await/service.go +++ b/provider/pkg/await/service.go @@ -137,7 +137,7 @@ func (sia *serviceInitAwaiter) Await() error { defer close(stopper) informerFactory := informers.NewInformerFactory(sia.config.clientSet, - informers.WithNamespaceOrDefault(sia.config.currentInputs.GetNamespace())) + informers.WithNamespaceOrDefault(sia.config.currentOutputs.GetNamespace())) informerFactory.Start(stopper) serviceEvents := make(chan watch.Event) @@ -156,7 +156,7 @@ func (sia *serviceInitAwaiter) Await() error { version := cluster.TryGetServerVersion(sia.config.clientSet.DiscoveryClientCached) - timeout := metadata.TimeoutDuration(sia.config.timeout, sia.config.currentInputs, DefaultServiceTimeoutMins*60) + timeout := metadata.TimeoutDuration(sia.config.timeout, sia.config.currentOutputs, DefaultServiceTimeoutMins*60) return sia.await(serviceEvents, endpointsEvents, time.After(timeout), make(chan struct{}), version) } @@ -273,7 +273,7 @@ func (sia *serviceInitAwaiter) await( } func (sia *serviceInitAwaiter) processServiceEvent(event watch.Event) { - inputServiceName := sia.config.currentInputs.GetName() + serviceName := sia.config.currentOutputs.GetName() service, isUnstructured := event.Object.(*unstructured.Unstructured) if !isUnstructured { @@ -283,7 +283,7 @@ func (sia *serviceInitAwaiter) processServiceEvent(event watch.Event) { } // Do nothing if this is not the service we're waiting for. - if service.GetName() != inputServiceName { + if service.GetName() != serviceName { return } @@ -302,14 +302,14 @@ func (sia *serviceInitAwaiter) processServiceEvent(event watch.Event) { lbIngress, _ := openapi.Pluck(service.Object, "status", "loadBalancer", "ingress") status, _ := openapi.Pluck(service.Object, "status") - logger.V(3).Infof("Received status for service %q: %#v", inputServiceName, status) + logger.V(3).Infof("Received status for service %q: %#v", serviceName, status) ing, isSlice := lbIngress.([]any) // Update status of service object so that we can check success. sia.serviceReady = isSlice && len(ing) > 0 logger.V(3).Infof("Waiting for service %q to assign IP/hostname for a load balancer", - inputServiceName) + serviceName) } else { // If it's not type `LoadBalancer`, report success. sia.serviceReady = true diff --git a/provider/pkg/await/statefulset.go b/provider/pkg/await/statefulset.go index 40c6c5aaf4..804a42d6ef 100644 --- a/provider/pkg/await/statefulset.go +++ b/provider/pkg/await/statefulset.go @@ -192,7 +192,7 @@ func (sia *statefulsetInitAwaiter) Await() error { defer close(stopper) informerFactory := informers.NewInformerFactory(sia.config.clientSet, - informers.WithNamespaceOrDefault(sia.config.currentInputs.GetNamespace())) + informers.WithNamespaceOrDefault(sia.config.currentOutputs.GetNamespace())) informerFactory.Start(stopper) statefulSetEvents := make(chan watch.Event) @@ -216,7 +216,7 @@ func (sia *statefulsetInitAwaiter) Await() error { aggregateErrorTicker := time.NewTicker(10 * time.Second) defer aggregateErrorTicker.Stop() - timeout := metadata.TimeoutDuration(sia.config.timeout, sia.config.currentInputs, DefaultStatefulSetTimeoutMins*60) + timeout := metadata.TimeoutDuration(sia.config.timeout, sia.config.currentOutputs, DefaultStatefulSetTimeoutMins*60) return sia.await(statefulSetEvents, podEvents, time.After(timeout), aggregateErrorTicker.C) } @@ -229,7 +229,7 @@ func (sia *statefulsetInitAwaiter) Read() error { // Get live versions of StatefulSet and Pods. statefulset, err := statefulSetClient.Get(sia.config.ctx, - sia.config.currentInputs.GetName(), + sia.config.currentOutputs.GetName(), metav1.GetOptions{}) if err != nil { // IMPORTANT: Do not wrap this error! If this is a 404, the provider need to know so that it @@ -343,7 +343,7 @@ func (sia *statefulsetInitAwaiter) checkAndLogStatus() bool { } func (sia *statefulsetInitAwaiter) processStatefulSetEvent(event watch.Event) { - inputStatefulSetName := sia.config.currentInputs.GetName() + inputStatefulSetName := sia.config.currentOutputs.GetName() statefulset, isUnstructured := event.Object.(*unstructured.Unstructured) if !isUnstructured { @@ -509,18 +509,18 @@ func (sia *statefulsetInitAwaiter) makeClients() ( statefulSetClient, podClient dynamic.ResourceInterface, err error, ) { statefulSetClient, err = clients.ResourceClient( - kinds.StatefulSet, sia.config.currentInputs.GetNamespace(), sia.config.clientSet) + kinds.StatefulSet, sia.config.currentOutputs.GetNamespace(), sia.config.clientSet) if err != nil { return nil, nil, errors.Wrapf(err, "Could not make client to watch StatefulSet %q", - sia.config.currentInputs.GetName()) + sia.config.currentOutputs.GetName()) } podClient, err = clients.ResourceClient( - kinds.Pod, sia.config.currentInputs.GetNamespace(), sia.config.clientSet) + kinds.Pod, sia.config.currentOutputs.GetNamespace(), sia.config.clientSet) if err != nil { return nil, nil, errors.Wrapf(err, "Could not make client to watch Pods associated with StatefulSet %q", - sia.config.currentInputs.GetName()) + sia.config.currentOutputs.GetName()) } return statefulSetClient, podClient, nil diff --git a/provider/pkg/await/statefulset_test.go b/provider/pkg/await/statefulset_test.go index 677a988f76..8614c41287 100644 --- a/provider/pkg/await/statefulset_test.go +++ b/provider/pkg/await/statefulset_test.go @@ -266,7 +266,7 @@ func Test_Apps_StatefulSet_MultipleUpdates(t *testing.T) { description string inputs func() *unstructured.Unstructured firstUpdate func(statefulsets, pods chan watch.Event, timeout chan time.Time, - setLast setLastInputs) + setLast setLastOutputs) secondUpdate func(statefulsets, pods chan watch.Event, timeout chan time.Time) firstExpectedError error secondExpectedError error @@ -276,7 +276,7 @@ func Test_Apps_StatefulSet_MultipleUpdates(t *testing.T) { inputs: statefulsetFailed, firstUpdate: func( statefulsets, pods chan watch.Event, timeout chan time.Time, - setLast setLastInputs, + setLast setLastOutputs, ) { statefulsets <- watchAddedEvent(statefulsetFailed()) @@ -311,7 +311,7 @@ func Test_Apps_StatefulSet_MultipleUpdates(t *testing.T) { period := make(chan time.Time) go test.firstUpdate(statefulsets, pods, timeout, func(obj *unstructured.Unstructured) { - awaiter.config.lastInputs = obj + awaiter.config.lastOutputs = obj }) err := awaiter.await(statefulsets, pods, timeout, period) diff --git a/provider/pkg/await/util_test.go b/provider/pkg/await/util_test.go index b16cba4d2d..74c52cc86b 100644 --- a/provider/pkg/await/util_test.go +++ b/provider/pkg/await/util_test.go @@ -14,7 +14,6 @@ func mockAwaitConfig(inputs *unstructured.Unstructured) createAwaitConfig { return createAwaitConfig{ ctx: context.Background(), //TODO: complete this mock if needed - currentInputs: inputs, currentOutputs: inputs, logger: logging.NewLogger(context.Background(), nil, ""), } diff --git a/provider/pkg/metadata/naming.go b/provider/pkg/metadata/naming.go index ed5de80dc7..fb82ef45f5 100644 --- a/provider/pkg/metadata/naming.go +++ b/provider/pkg/metadata/naming.go @@ -31,6 +31,12 @@ func AssignNameIfAutonamable(randomSeed []byte, obj *unstructured.Unstructured, } } + if obj.GetGenerateName() != "" { + // let the Kubernetes API server produce a name. + // TODO assign a computed output? + return + } + if obj.GetName() == "" { prefix := urn.Name().String() + "-" autoname, err := resource.NewUniqueName(randomSeed, prefix, 0, 0, nil) @@ -43,8 +49,8 @@ func AssignNameIfAutonamable(randomSeed []byte, obj *unstructured.Unstructured, // AdoptOldAutonameIfUnnamed checks if `newObj` has a name, and if not, "adopts" the name of `oldObj` // instead. If `oldObj` was autonamed, then we mark `newObj` as autonamed, too. func AdoptOldAutonameIfUnnamed(newObj, oldObj *unstructured.Unstructured) { - contract.Assertf(oldObj.GetName() != "", "expected nonempty name for object: %s", oldObj) - if newObj.GetName() == "" && IsAutonamed(oldObj) { + if newObj.GetName() == "" && newObj.GetGenerateName() == "" && IsAutonamed(oldObj) { + contract.Assertf(oldObj.GetName() != "", "expected nonempty name for object: %s", oldObj) newObj.SetName(oldObj.GetName()) SetAnnotationTrue(newObj, AnnotationAutonamed) } diff --git a/provider/pkg/provider/provider.go b/provider/pkg/provider/provider.go index fbf37122c1..1582991df8 100644 --- a/provider/pkg/provider/provider.go +++ b/provider/pkg/provider/provider.go @@ -1358,9 +1358,8 @@ func (k *kubeProvider) Check(ctx context.Context, req *pulumirpc.CheckRequest) ( // needs to be `DeleteBeforeReplace`'d. If the resource is marked `DeleteBeforeReplace`, then // `Create` will allocate it a new name later. if len(oldInputs.Object) > 0 { - // NOTE: If old inputs exist, they have a name, either provided by the user or filled in with a + // NOTE: If old inputs exist, they MAY have a name, either provided by the user or filled in with a // previous run of `Check`. - contract.Assertf(oldInputs.GetName() != "", "expected object name to be nonempty: %v", oldInputs) metadata.AdoptOldAutonameIfUnnamed(newInputs, oldInputs) // If the resource has existing state, we only set the "managed-by: pulumi" label if it is already present. This @@ -1692,7 +1691,7 @@ func (k *kubeProvider) Diff(ctx context.Context, req *pulumirpc.DiffRequest) (*p switch newInputs.GetKind() { case "Job": // Fetch current Job status and check point-in-time readiness. Errors are ignored. - if live, err := k.readLiveObject(newInputs); err == nil { + if live, err := k.readLiveObject(oldLive); err == nil { jobChecker := checkjob.NewJobChecker() job, err := clients.FromUnstructured(live) if err == nil { @@ -1714,12 +1713,12 @@ func (k *kubeProvider) Diff(ctx context.Context, req *pulumirpc.DiffRequest) (*p len(replaces) > 0 && // 2. Object is NOT autonamed (i.e., user manually named it, and therefore we can't // auto-generate the name). - !metadata.IsAutonamed(newInputs) && + !(metadata.IsAutonamed(newInputs) || newInputs.GetGenerateName() != "") && // 3. The new, user-specified name is the same as the old name. - newInputs.GetName() == oldLivePruned.GetName() && + newInputs.GetName() == oldLive.GetName() && // 4. The resource is being deployed to the same namespace (i.e., we aren't creating the // object in a new namespace and then deleting the old one). - newInputs.GetNamespace() == oldLivePruned.GetNamespace() + newInputs.GetNamespace() == oldLive.GetNamespace() return &pulumirpc.DiffResponse{ Changes: hasChanges, @@ -1880,6 +1879,7 @@ func (k *kubeProvider) Create( partialErr, isPartialErr := awaitErr.(await.PartialError) if !isPartialErr { // Object creation failed. + return nil, pkgerrors.Wrapf( awaitErr, "resource %s was not successfully created by the Kubernetes API server ", fqObjName(newInputs)) @@ -2320,7 +2320,8 @@ func (k *kubeProvider) Update( Resources: resources, ServerSideApply: k.serverSideApplyMode, }, - Previous: oldLivePruned, + OldInputs: oldLivePruned, + OldOutputs: oldLive, Inputs: newInputs, Timeout: req.Timeout, Preview: req.GetPreview(), @@ -2345,12 +2346,12 @@ func (k *kubeProvider) Update( } var getErr error - initialized, getErr = k.readLiveObject(newInputs) + initialized, getErr = k.readLiveObject(oldLive) if getErr != nil { // Object update/creation failed. return nil, pkgerrors.Wrapf( awaitErr, "update of resource %s failed because the Kubernetes API server "+ - "reported that it failed to fully initialize or become live", fqObjName(newInputs)) + "reported that it failed to fully initialize or become live", fqObjName(oldLive)) } // If we get here, resource successfully registered with the API server, but failed to // initialize. @@ -2375,7 +2376,7 @@ func (k *kubeProvider) Update( fqObjName(initialized), pkgerrors.Wrapf( awaitErr, "the Kubernetes API server reported that %q failed to fully initialize "+ - "or become live", fqObjName(newInputs)), + "or become live", fqObjName(initialized)), inputsAndComputed, nil) } @@ -2383,12 +2384,12 @@ func (k *kubeProvider) Update( if k.serverSideApplyMode { // For non-preview updates, drop the old fieldManager if the value changes. if !req.GetPreview() && fieldManagerOld != fieldManager { - client, err := k.clientSet.ResourceClientForObject(newInputs) + client, err := k.clientSet.ResourceClientForObject(initialized) if err != nil { return nil, err } - err = ssa.Relinquish(k.canceler.context, client, newInputs, fieldManagerOld) + err = ssa.Relinquish(k.canceler.context, client, initialized, fieldManagerOld) if err != nil { return nil, err } @@ -2475,7 +2476,7 @@ func (k *kubeProvider) Delete(ctx context.Context, req *pulumirpc.DeleteRequest) Resources: resources, ServerSideApply: k.serverSideApplyMode, }, - Inputs: current, + Outputs: current, Name: name, Timeout: req.Timeout, }