Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for metadata.generateName (CSA) #2594

Closed
wants to merge 13 commits into from
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
## Unreleased
- Support for metadata.generateName (https://github.com/pulumi/pulumi-kubernetes/pull/2594)

## 4.7.1 (January 17, 2024)

Expand Down
45 changes: 24 additions & 21 deletions provider/pkg/await/await.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,19 @@ type ReadConfig struct {

type UpdateConfig struct {
ProviderConfig
Previous *unstructured.Unstructured
Inputs *unstructured.Unstructured
Timeout float64
Preview bool
OldInputs *unstructured.Unstructured
OldOutputs *unstructured.Unstructured
Inputs *unstructured.Unstructured
EronWright marked this conversation as resolved.
Show resolved Hide resolved
Timeout float64
Preview bool
// IgnoreChanges is a list of fields to ignore when diffing the old and new objects.
IgnoreChanges []string
}

type DeleteConfig struct {
ProviderConfig
Inputs *unstructured.Unstructured
Outputs *unstructured.Unstructured
Name string
Timeout float64
}
Expand Down Expand Up @@ -251,7 +253,7 @@ func Creation(c CreateConfig) (*unstructured.Unstructured, error) {
id := fmt.Sprintf("%s/%s", c.Inputs.GetAPIVersion(), c.Inputs.GetKind())
if awaiter, exists := awaiters[id]; exists {
if metadata.SkipAwaitLogic(c.Inputs) {
logger.V(1).Infof("Skipping await logic for %v", c.Inputs.GetName())
logger.V(1).Infof("Skipping await logic for %v", outputs.GetName())
} else {
if awaiter.awaitCreation != nil {
conf := createAwaitConfig{
Expand Down Expand Up @@ -280,7 +282,7 @@ func Creation(c CreateConfig) (*unstructured.Unstructured, error) {
// If the client fails to get the live object for some reason, DO NOT return the error. This
// will leak the fact that the object was successfully created. Instead, fall back to the
// last-seen live object.
live, err := client.Get(c.Context, c.Inputs.GetName(), metav1.GetOptions{})
live, err := client.Get(c.Context, outputs.GetName(), metav1.GetOptions{})
if err != nil {
return outputs, nil
}
Expand All @@ -306,7 +308,7 @@ func Read(c ReadConfig) (*unstructured.Unstructured, error) {
id := fmt.Sprintf("%s/%s", outputs.GetAPIVersion(), outputs.GetKind())
if awaiter, exists := awaiters[id]; exists {
if metadata.SkipAwaitLogic(c.Inputs) {
logger.V(1).Infof("Skipping await logic for %v", c.Inputs.GetName())
logger.V(1).Infof("Skipping await logic for %v", outputs.GetName())
} else {
if awaiter.awaitRead != nil {
conf := createAwaitConfig{
Expand Down Expand Up @@ -353,14 +355,14 @@ func Read(c ReadConfig) (*unstructured.Unstructured, error) {
// [3]:
// https://kubernetes.io/docs/reference/using-api/server-side-apply
func Update(c UpdateConfig) (*unstructured.Unstructured, error) {
client, err := c.ClientSet.ResourceClientForObject(c.Inputs)
client, err := c.ClientSet.ResourceClientForObject(c.OldOutputs)
if err != nil {
return nil, err
}

// Get the "live" version of the last submitted object. This is necessary because the server may
// have populated some fields automatically, updated status fields, and so on.
liveOldObj, err := client.Get(c.Context, c.Previous.GetName(), metav1.GetOptions{})
liveOldObj, err := client.Get(c.Context, c.OldOutputs.GetName(), metav1.GetOptions{})
if err != nil {
return nil, err
}
Expand All @@ -380,7 +382,7 @@ func Update(c UpdateConfig) (*unstructured.Unstructured, error) {
id := fmt.Sprintf("%s/%s", c.Inputs.GetAPIVersion(), c.Inputs.GetKind())
if awaiter, exists := awaiters[id]; exists {
if metadata.SkipAwaitLogic(c.Inputs) {
logger.V(1).Infof("Skipping await logic for %v", c.Inputs.GetName())
logger.V(1).Infof("Skipping await logic for %v", currentOutputs.GetName())
} else {
if awaiter.awaitUpdate != nil {
conf := updateAwaitConfig{
Expand All @@ -396,7 +398,7 @@ func Update(c UpdateConfig) (*unstructured.Unstructured, error) {
timeout: c.Timeout,
clusterVersion: c.ClusterVersion,
},
lastInputs: c.Previous,
lastInputs: c.OldInputs,
lastOutputs: liveOldObj,
}
waitErr := awaiter.awaitUpdate(conf)
Expand All @@ -411,12 +413,12 @@ func Update(c UpdateConfig) (*unstructured.Unstructured, error) {

gvk := c.Inputs.GroupVersionKind()
logger.V(3).Infof("Resource %s/%s/%s '%s.%s' patched and updated", gvk.Group, gvk.Version,
gvk.Kind, c.Inputs.GetNamespace(), c.Inputs.GetName())
gvk.Kind, c.Inputs.GetNamespace(), currentOutputs.GetName())

// If the client fails to get the live object for some reason, DO NOT return the error. This
// will leak the fact that the object was successfully created. Instead, fall back to the
// last-seen live object.
live, err := client.Get(c.Context, c.Inputs.GetName(), metav1.GetOptions{})
live, err := client.Get(c.Context, currentOutputs.GetName(), metav1.GetOptions{})
if err != nil {
return currentOutputs, nil
}
Expand Down Expand Up @@ -450,7 +452,7 @@ func csaUpdate(c *UpdateConfig, liveOldObj *unstructured.Unstructured, client dy
// optimistically rather than failing the update.
_ = handleCSAIgnoreFields(c, liveOldObj)
// Create merge patch (prefer strategic merge patch, fall back to JSON merge patch).
patch, patchType, _, err := openapi.PatchForResourceUpdate(c.Resources, c.Previous, c.Inputs, liveOldObj)
patch, patchType, _, err := openapi.PatchForResourceUpdate(c.Resources, c.OldInputs, c.Inputs, liveOldObj)
if err != nil {
return nil, err
}
Expand All @@ -462,7 +464,7 @@ func csaUpdate(c *UpdateConfig, liveOldObj *unstructured.Unstructured, client dy
options.DryRun = []string{metav1.DryRunAll}
}

return client.Patch(c.Context, c.Inputs.GetName(), patchType, patch, options)
return client.Patch(c.Context, liveOldObj.GetName(), patchType, patch, options)
}

// ssaUpdate handles the logic for updating a resource using server-side apply.
Expand Down Expand Up @@ -490,7 +492,7 @@ func ssaUpdate(c *UpdateConfig, liveOldObj *unstructured.Unstructured, client dy
options.DryRun = []string{metav1.DryRunAll}
}

currentOutputs, err := client.Patch(c.Context, c.Inputs.GetName(), types.ApplyPatchType, objYAML, options)
currentOutputs, err := client.Patch(c.Context, liveOldObj.GetName(), types.ApplyPatchType, objYAML, options)
if err != nil {
if errors.IsConflict(err) {
err = fmt.Errorf("Server-Side Apply field conflict detected. See %s for troubleshooting help\n: %w",
Expand Down Expand Up @@ -543,7 +545,7 @@ func handleSSAIgnoreFields(c *UpdateConfig, liveOldObj *unstructured.Unstructure
for _, f := range managedFields {
s, err := fluxssa.FieldsToSet(*f.FieldsV1)
if err != nil {
return fmt.Errorf("unable to parse managed fields from resource %q into fieldpath.Set: %w", c.Inputs.GetName(), err)
return fmt.Errorf("unable to parse managed fields from resource %q into fieldpath.Set: %w", liveOldObj.GetName(), err)
}

switch f.Manager {
Expand Down Expand Up @@ -706,14 +708,14 @@ func Deletion(c DeleteConfig) error {
}

// Obtain client for the resource being deleted.
client, err := c.ClientSet.ResourceClientForObject(c.Inputs)
client, err := c.ClientSet.ResourceClientForObject(c.Outputs)
if err != nil {
return nilIfGVKDeleted(err)
}

patchResource := kinds.IsPatchURN(c.URN)
if c.ServerSideApply && patchResource {
err = ssa.Relinquish(c.Context, client, c.Inputs, c.FieldManager)
err = ssa.Relinquish(c.Context, client, c.Outputs, c.FieldManager)
return err
}

Expand All @@ -739,10 +741,10 @@ func Deletion(c DeleteConfig) error {
// if we don't have an entry for the resource type; in the event that we do, but the await logic
// is blank, simply do nothing instead of logging.
var waitErr error
id := fmt.Sprintf("%s/%s", c.Inputs.GetAPIVersion(), c.Inputs.GetKind())
id := fmt.Sprintf("%s/%s", c.Outputs.GetAPIVersion(), c.Outputs.GetKind())
if awaiter, exists := awaiters[id]; exists && awaiter.awaitDeletion != nil {
if metadata.SkipAwaitLogic(c.Inputs) {
logger.V(1).Infof("Skipping await logic for %v", c.Inputs.GetName())
logger.V(1).Infof("Skipping await logic for %v", c.Name)
} else {
waitErr = awaiter.awaitDeletion(deleteAwaitConfig{
createAwaitConfig: createAwaitConfig{
Expand All @@ -752,6 +754,7 @@ func Deletion(c DeleteConfig) error {
initialAPIVersion: c.InitialAPIVersion,
clientSet: c.ClientSet,
currentInputs: c.Inputs,
currentOutputs: c.Outputs,
logger: c.DedupLogger,
timeout: c.Timeout,
clusterVersion: c.ClusterVersion,
Expand Down
60 changes: 30 additions & 30 deletions provider/pkg/await/awaiters.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,19 +301,19 @@ func untilAppsDeploymentDeleted(config deleteAwaitConfig) error {
specReplicas, _ := deploymentSpecReplicas(d)

return watcher.RetryableError(
fmt.Errorf("deployment %q still exists (%d / %d replicas exist)", config.currentInputs.GetName(),
fmt.Errorf("deployment %q still exists (%d / %d replicas exist)", d.GetName(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we introduce a new method for our config structs that returns the name of the deployment object? This refactor would make it less error-prone when deciding on the deployment name to use, and will make it easier in the future should we decide on a different strategy for the deployment name (say, reverting back to using inputs).

currReplicas, specReplicas))
}

// Wait until all replicas are gone. 10 minutes should be enough for ~10k replicas.
timeout := metadata.TimeoutDuration(config.timeout, config.currentInputs, 600)
err := watcher.ForObject(config.ctx, config.clientForResource, config.currentInputs.GetName()).
err := watcher.ForObject(config.ctx, config.clientForResource, config.currentOutputs.GetName()).
RetryUntil(deploymentMissing, timeout)
if err != nil {
return err
}

logger.V(3).Infof("Deployment '%s' deleted", config.currentInputs.GetName())
logger.V(3).Infof("Deployment '%s' deleted", config.currentOutputs.GetName())

return nil
}
Expand Down Expand Up @@ -344,19 +344,19 @@ func untilAppsStatefulSetDeleted(config deleteAwaitConfig) error {
specReplicas, _ := specReplicas(d)

return watcher.RetryableError(
fmt.Errorf("StatefulSet %q still exists (%d / %d replicas exist)", config.currentInputs.GetName(),
fmt.Errorf("StatefulSet %q still exists (%d / %d replicas exist)", d.GetName(),
currReplicas, specReplicas))
}

// Wait until all replicas are gone. 10 minutes should be enough for ~10k replicas.
timeout := metadata.TimeoutDuration(config.timeout, config.currentInputs, 600)
err := watcher.ForObject(config.ctx, config.clientForResource, config.currentInputs.GetName()).
err := watcher.ForObject(config.ctx, config.clientForResource, config.currentOutputs.GetName()).
RetryUntil(statefulsetmissing, timeout)
if err != nil {
return err
}

logger.V(3).Infof("StatefulSet %q deleted", config.currentInputs.GetName())
logger.V(3).Infof("StatefulSet %q deleted", config.currentOutputs.GetName())

return nil
}
Expand All @@ -375,12 +375,12 @@ func untilBatchV1JobDeleted(config deleteAwaitConfig) error {
return err
}

e := fmt.Errorf("job %q still exists", config.currentInputs.GetName())
e := fmt.Errorf("job %q still exists", pod.GetName())
return watcher.RetryableError(e)
}

timeout := metadata.TimeoutDuration(config.timeout, config.currentInputs, 300)
return watcher.ForObject(config.ctx, config.clientForResource, config.currentInputs.GetName()).
return watcher.ForObject(config.ctx, config.clientForResource, config.currentOutputs.GetName()).
RetryUntil(jobMissingOrKilled, timeout)
}

Expand All @@ -396,22 +396,22 @@ func untilCoreV1NamespaceDeleted(config deleteAwaitConfig) error {
return nil
} else if err != nil {
logger.V(3).Infof("Received error deleting namespace %q: %#v",
config.currentInputs.GetName(), err)
ns.GetName(), err)
return err
}

statusPhase, _ := openapi.Pluck(ns.Object, "status", "phase")
logger.V(3).Infof("Namespace %q status received: %#v", config.currentInputs.GetName(), statusPhase)
logger.V(3).Infof("Namespace %q status received: %#v", ns.GetName(), statusPhase)
if statusPhase == "" {
return nil
}

return watcher.RetryableError(fmt.Errorf("namespace %q still exists (%v)",
config.currentInputs.GetName(), statusPhase))
ns.GetName(), statusPhase))
}

timeout := metadata.TimeoutDuration(config.timeout, config.currentInputs, 300)
return watcher.ForObject(config.ctx, config.clientForResource, config.currentInputs.GetName()).
return watcher.ForObject(config.ctx, config.clientForResource, config.currentOutputs.GetName()).
RetryUntil(namespaceMissingOrKilled, timeout)
}

Expand All @@ -433,11 +433,11 @@ func untilCoreV1PersistentVolumeInitialized(c createAwaitConfig) error {
return statusPhase == statusAvailable || statusPhase == statusBound
}

client, err := c.clientSet.ResourceClient(c.currentInputs.GroupVersionKind(), c.currentInputs.GetNamespace())
client, err := c.clientSet.ResourceClient(c.currentOutputs.GroupVersionKind(), c.currentOutputs.GetNamespace())
if err != nil {
return err
}
return watcher.ForObject(c.ctx, client, c.currentInputs.GetName()).
return watcher.ForObject(c.ctx, client, c.currentOutputs.GetName()).
WatchUntil(pvAvailableOrBound, 5*time.Minute)
}

Expand All @@ -454,11 +454,11 @@ func untilCoreV1PersistentVolumeClaimBound(c createAwaitConfig) error {
return statusPhase == statusBound
}

client, err := c.clientSet.ResourceClient(c.currentInputs.GroupVersionKind(), c.currentInputs.GetNamespace())
client, err := c.clientSet.ResourceClient(c.currentOutputs.GroupVersionKind(), c.currentOutputs.GetNamespace())
if err != nil {
return err
}
return watcher.ForObject(c.ctx, client, c.currentInputs.GetName()).
return watcher.ForObject(c.ctx, client, c.currentOutputs.GetName()).
WatchUntil(pvcBound, 5*time.Minute)
}

Expand All @@ -478,13 +478,13 @@ func untilCoreV1PodDeleted(config deleteAwaitConfig) error {
}

statusPhase, _ := openapi.Pluck(pod.Object, "status", "phase")
logger.V(3).Infof("Current state of pod %q: %#v", config.currentInputs.GetName(), statusPhase)
e := fmt.Errorf("pod %q still exists (%v)", config.currentInputs.GetName(), statusPhase)
logger.V(3).Infof("Current state of pod %q: %#v", pod.GetName(), statusPhase)
e := fmt.Errorf("pod %q still exists (%v)", pod.GetName(), statusPhase)
return watcher.RetryableError(e)
}

timeout := metadata.TimeoutDuration(config.timeout, config.currentInputs, 300)
return watcher.ForObject(config.ctx, config.clientForResource, config.currentInputs.GetName()).
return watcher.ForObject(config.ctx, config.clientForResource, config.currentOutputs.GetName()).
RetryUntil(podMissingOrKilled, timeout)
}

Expand All @@ -503,13 +503,13 @@ func untilCoreV1ReplicationControllerInitialized(c createAwaitConfig) error {
return openapi.Pluck(rc.Object, "status", "availableReplicas")
}

name := c.currentInputs.GetName()
name := c.currentOutputs.GetName()

replicas, _ := openapi.Pluck(c.currentInputs.Object, "spec", "replicas")
logger.V(3).Infof("Waiting for replication controller %q to schedule '%v' replicas",
name, replicas)

client, err := c.clientSet.ResourceClient(c.currentInputs.GroupVersionKind(), c.currentInputs.GetNamespace())
client, err := c.clientSet.ResourceClient(c.currentOutputs.GroupVersionKind(), c.currentOutputs.GetNamespace())
if err != nil {
return err
}
Expand All @@ -525,8 +525,8 @@ func untilCoreV1ReplicationControllerInitialized(c createAwaitConfig) error {
// but that means checking each pod status separately (which can be expensive at scale)
// as there's no aggregate data available from the API

logger.V(3).Infof("Replication controller %q initialized: %#v", c.currentInputs.GetName(),
c.currentInputs)
logger.V(3).Infof("Replication controller %q initialized: %#v", name,
c.currentOutputs)

return nil
}
Expand Down Expand Up @@ -559,18 +559,18 @@ func untilCoreV1ReplicationControllerDeleted(config deleteAwaitConfig) error {

return watcher.RetryableError(
fmt.Errorf("ReplicationController %q still exists (%d / %d replicas exist)",
config.currentInputs.GetName(), currReplicas, specReplicas))
rc.GetName(), currReplicas, specReplicas))
}

// Wait until all replicas are gone. 10 minutes should be enough for ~10k replicas.
timeout := metadata.TimeoutDuration(config.timeout, config.currentInputs, 600)
err := watcher.ForObject(config.ctx, config.clientForResource, config.currentInputs.GetName()).
err := watcher.ForObject(config.ctx, config.clientForResource, config.currentOutputs.GetName()).
RetryUntil(rcMissing, timeout)
if err != nil {
return err
}

logger.V(3).Infof("ReplicationController %q deleted", config.currentInputs.GetName())
logger.V(3).Infof("ReplicationController %q deleted", config.currentOutputs.GetName())

return nil
}
Expand All @@ -589,20 +589,20 @@ func untilCoreV1ResourceQuotaInitialized(c createAwaitConfig) error {
hard, hardIsMap := hardRaw.(map[string]any)
hardStatus, hardStatusIsMap := hardStatusRaw.(map[string]any)
if hardIsMap && hardStatusIsMap && reflect.DeepEqual(hard, hardStatus) {
logger.V(3).Infof("ResourceQuota %q initialized: %#v", c.currentInputs.GetName(),
c.currentInputs)
logger.V(3).Infof("ResourceQuota %q initialized: %#v", quota.GetName(),
quota)
return true
}
logger.V(3).Infof("Quotas don't match after creation.\nExpected: %#v\nGiven: %#v",
hard, hardStatus)
return false
}

client, err := c.clientSet.ResourceClient(c.currentInputs.GroupVersionKind(), c.currentInputs.GetNamespace())
client, err := c.clientSet.ResourceClient(c.currentOutputs.GroupVersionKind(), c.currentOutputs.GetNamespace())
if err != nil {
return err
}
return watcher.ForObject(c.ctx, client, c.currentInputs.GetName()).
return watcher.ForObject(c.ctx, client, c.currentOutputs.GetName()).
WatchUntil(rqInitialized, 1*time.Minute)
}

Expand Down
Loading
Loading