Skip to content

Commit

Permalink
Use output properties for await
Browse files Browse the repository at this point in the history
  • Loading branch information
EronWright committed Oct 3, 2023
1 parent 7a84ace commit 02f8c0b
Show file tree
Hide file tree
Showing 13 changed files with 159 additions and 156 deletions.
57 changes: 28 additions & 29 deletions provider/pkg/await/await.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,18 @@ type ReadConfig struct {

type UpdateConfig struct {
ProviderConfig
Previous *unstructured.Unstructured
Inputs *unstructured.Unstructured
Timeout float64
Preview bool
OldInputs *unstructured.Unstructured
OldOutputs *unstructured.Unstructured
Inputs *unstructured.Unstructured
Timeout float64
Preview bool
// IgnoreChanges is a list of fields to ignore when diffing the old and new objects.
IgnoreChanges []string
}

type DeleteConfig struct {
ProviderConfig
Inputs *unstructured.Unstructured
Outputs *unstructured.Unstructured
Name string
Timeout float64
}
Expand Down Expand Up @@ -162,6 +163,8 @@ func Creation(c CreateConfig) (*unstructured.Unstructured, error) {
// nolint
// https://github.com/kubernetes/kubernetes/blob/54889d581a35acf940d52a8a384cccaa0b597ddc/pkg/kubectl/cmd/apply/apply.go#L94

patchResource := kinds.PatchQualifiedTypes.Has(c.URN.QualifiedType().String())

var outputs *unstructured.Unstructured
var client dynamic.ResourceInterface
err := retry.SleepingRetry(
Expand All @@ -187,7 +190,7 @@ func Creation(c CreateConfig) (*unstructured.Unstructured, error) {
}
}

if c.ServerSideApply {
if c.ServerSideApply && patchResource {
force := patchForce(c.Inputs, nil, c.Preview)
options := metav1.PatchOptions{
FieldManager: c.FieldManager,
Expand Down Expand Up @@ -251,7 +254,7 @@ func Creation(c CreateConfig) (*unstructured.Unstructured, error) {
id := fmt.Sprintf("%s/%s", c.Inputs.GetAPIVersion(), c.Inputs.GetKind())
if awaiter, exists := awaiters[id]; exists {
if metadata.SkipAwaitLogic(c.Inputs) {
logger.V(1).Infof("Skipping await logic for %v", c.Inputs.GetName())
logger.V(1).Infof("Skipping await logic for %v", outputs.GetName())
} else {
if awaiter.awaitCreation != nil {
conf := createAwaitConfig{
Expand All @@ -260,7 +263,6 @@ func Creation(c CreateConfig) (*unstructured.Unstructured, error) {
urn: c.URN,
initialAPIVersion: c.InitialAPIVersion,
clientSet: c.ClientSet,
currentInputs: c.Inputs,
currentOutputs: outputs,
logger: c.DedupLogger,
timeout: c.Timeout,
Expand All @@ -280,7 +282,7 @@ func Creation(c CreateConfig) (*unstructured.Unstructured, error) {
// If the client fails to get the live object for some reason, DO NOT return the error. This
// will leak the fact that the object was successfully created. Instead, fall back to the
// last-seen live object.
live, err := client.Get(c.Context, c.Inputs.GetName(), metav1.GetOptions{})
live, err := client.Get(c.Context, outputs.GetName(), metav1.GetOptions{})
if err != nil {
return outputs, nil
}
Expand All @@ -306,7 +308,7 @@ func Read(c ReadConfig) (*unstructured.Unstructured, error) {
id := fmt.Sprintf("%s/%s", outputs.GetAPIVersion(), outputs.GetKind())
if awaiter, exists := awaiters[id]; exists {
if metadata.SkipAwaitLogic(c.Inputs) {
logger.V(1).Infof("Skipping await logic for %v", c.Inputs.GetName())
logger.V(1).Infof("Skipping await logic for %v", outputs.GetName())
} else {
if awaiter.awaitRead != nil {
conf := createAwaitConfig{
Expand All @@ -315,7 +317,6 @@ func Read(c ReadConfig) (*unstructured.Unstructured, error) {
urn: c.URN,
initialAPIVersion: c.InitialAPIVersion,
clientSet: c.ClientSet,
currentInputs: c.Inputs,
currentOutputs: outputs,
logger: c.DedupLogger,
clusterVersion: c.ClusterVersion,
Expand Down Expand Up @@ -353,14 +354,14 @@ func Read(c ReadConfig) (*unstructured.Unstructured, error) {
// [3]:
// https://kubernetes.io/docs/reference/using-api/server-side-apply
func Update(c UpdateConfig) (*unstructured.Unstructured, error) {
client, err := c.ClientSet.ResourceClientForObject(c.Inputs)
client, err := c.ClientSet.ResourceClientForObject(c.OldOutputs)
if err != nil {
return nil, err
}

// Get the "live" version of the last submitted object. This is necessary because the server may
// have populated some fields automatically, updated status fields, and so on.
liveOldObj, err := client.Get(c.Context, c.Previous.GetName(), metav1.GetOptions{})
liveOldObj, err := client.Get(c.Context, c.OldOutputs.GetName(), metav1.GetOptions{})
if err != nil {
return nil, err
}
Expand All @@ -380,7 +381,7 @@ func Update(c UpdateConfig) (*unstructured.Unstructured, error) {
id := fmt.Sprintf("%s/%s", c.Inputs.GetAPIVersion(), c.Inputs.GetKind())
if awaiter, exists := awaiters[id]; exists {
if metadata.SkipAwaitLogic(c.Inputs) {
logger.V(1).Infof("Skipping await logic for %v", c.Inputs.GetName())
logger.V(1).Infof("Skipping await logic for %v", currentOutputs.GetName())
} else {
if awaiter.awaitUpdate != nil {
conf := updateAwaitConfig{
Expand All @@ -390,13 +391,11 @@ func Update(c UpdateConfig) (*unstructured.Unstructured, error) {
urn: c.URN,
initialAPIVersion: c.InitialAPIVersion,
clientSet: c.ClientSet,
currentInputs: c.Inputs,
currentOutputs: currentOutputs,
logger: c.DedupLogger,
timeout: c.Timeout,
clusterVersion: c.ClusterVersion,
},
lastInputs: c.Previous,
lastOutputs: liveOldObj,
}
waitErr := awaiter.awaitUpdate(conf)
Expand All @@ -411,12 +410,12 @@ func Update(c UpdateConfig) (*unstructured.Unstructured, error) {

gvk := c.Inputs.GroupVersionKind()
logger.V(3).Infof("Resource %s/%s/%s '%s.%s' patched and updated", gvk.Group, gvk.Version,
gvk.Kind, c.Inputs.GetNamespace(), c.Inputs.GetName())
gvk.Kind, c.Inputs.GetNamespace(), currentOutputs.GetName())

// If the client fails to get the live object for some reason, DO NOT return the error. This
// will leak the fact that the object was successfully created. Instead, fall back to the
// last-seen live object.
live, err := client.Get(c.Context, c.Inputs.GetName(), metav1.GetOptions{})
live, err := client.Get(c.Context, currentOutputs.GetName(), metav1.GetOptions{})
if err != nil {
return currentOutputs, nil
}
Expand Down Expand Up @@ -450,7 +449,7 @@ func csaUpdate(c *UpdateConfig, liveOldObj *unstructured.Unstructured, client dy
// optimistically rather than failing the update.
_ = handleCSAIgnoreFields(c, liveOldObj)
// Create merge patch (prefer strategic merge patch, fall back to JSON merge patch).
patch, patchType, _, err := openapi.PatchForResourceUpdate(c.Resources, c.Previous, c.Inputs, liveOldObj)
patch, patchType, _, err := openapi.PatchForResourceUpdate(c.Resources, c.OldInputs, c.Inputs, liveOldObj)
if err != nil {
return nil, err
}
Expand All @@ -462,7 +461,7 @@ func csaUpdate(c *UpdateConfig, liveOldObj *unstructured.Unstructured, client dy
options.DryRun = []string{metav1.DryRunAll}
}

return client.Patch(c.Context, c.Inputs.GetName(), patchType, patch, options)
return client.Patch(c.Context, liveOldObj.GetName(), patchType, patch, options)
}

// ssaUpdate handles the logic for updating a resource using server-side apply.
Expand Down Expand Up @@ -490,7 +489,7 @@ func ssaUpdate(c *UpdateConfig, liveOldObj *unstructured.Unstructured, client dy
options.DryRun = []string{metav1.DryRunAll}
}

currentOutputs, err := client.Patch(c.Context, c.Inputs.GetName(), types.ApplyPatchType, objYAML, options)
currentOutputs, err := client.Patch(c.Context, liveOldObj.GetName(), types.ApplyPatchType, objYAML, options)
if err != nil {
if errors.IsConflict(err) {
err = fmt.Errorf("Server-Side Apply field conflict detected. See %s for troubleshooting help\n: %w",
Expand Down Expand Up @@ -543,7 +542,7 @@ func handleSSAIgnoreFields(c *UpdateConfig, liveOldObj *unstructured.Unstructure
for _, f := range managedFields {
s, err := fluxssa.FieldsToSet(*f.FieldsV1)
if err != nil {
return fmt.Errorf("unable to parse managed fields from resource %q into fieldpath.Set: %w", c.Inputs.GetName(), err)
return fmt.Errorf("unable to parse managed fields from resource %q into fieldpath.Set: %w", liveOldObj.GetName(), err)
}

switch f.Manager {
Expand Down Expand Up @@ -698,18 +697,18 @@ func Deletion(c DeleteConfig) error {
}

// Obtain client for the resource being deleted.
client, err := c.ClientSet.ResourceClientForObject(c.Inputs)
client, err := c.ClientSet.ResourceClientForObject(c.Outputs)
if err != nil {
return nilIfGVKDeleted(err)
}

patchResource := kinds.PatchQualifiedTypes.Has(c.URN.QualifiedType().String())
if c.ServerSideApply && patchResource {
err = ssa.Relinquish(c.Context, client, c.Inputs, c.FieldManager)
err = ssa.Relinquish(c.Context, client, c.Outputs, c.FieldManager)
return err
}

timeout := metadata.TimeoutDuration(c.Timeout, c.Inputs, 300)
timeout := metadata.TimeoutDuration(c.Timeout, c.Outputs, 300)
timeoutSeconds := int64(timeout.Seconds())
listOpts := metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", c.Name).String(),
Expand All @@ -731,10 +730,10 @@ func Deletion(c DeleteConfig) error {
// if we don't have an entry for the resource type; in the event that we do, but the await logic
// is blank, simply do nothing instead of logging.
var waitErr error
id := fmt.Sprintf("%s/%s", c.Inputs.GetAPIVersion(), c.Inputs.GetKind())
id := fmt.Sprintf("%s/%s", c.Outputs.GetAPIVersion(), c.Outputs.GetKind())
if awaiter, exists := awaiters[id]; exists && awaiter.awaitDeletion != nil {
if metadata.SkipAwaitLogic(c.Inputs) {
logger.V(1).Infof("Skipping await logic for %v", c.Inputs.GetName())
if metadata.SkipAwaitLogic(c.Outputs) {
logger.V(1).Infof("Skipping await logic for %v", c.Name)
} else {
waitErr = awaiter.awaitDeletion(deleteAwaitConfig{
createAwaitConfig: createAwaitConfig{
Expand All @@ -743,7 +742,7 @@ func Deletion(c DeleteConfig) error {
urn: c.URN,
initialAPIVersion: c.InitialAPIVersion,
clientSet: c.ClientSet,
currentInputs: c.Inputs,
currentOutputs: c.Outputs,
logger: c.DedupLogger,
timeout: c.Timeout,
clusterVersion: c.ClusterVersion,
Expand Down
Loading

0 comments on commit 02f8c0b

Please sign in to comment.