Skip to content

Commit

Permalink
Add apply-time-mutation
Browse files Browse the repository at this point in the history
- Detect apply-time-mutation annotation, add dependency wait edges,
  substitute source field value into target field, replacing token.
- Rename SetEqual -> IsSetEqual
- Move depends-on code to pkg/object/dependson/
- Add ResourceCache to TaskContext to avoid extra GETs
- Add ObjMetadataSet to hold []ObjMetadata functions
- Add Equal functions to be compatible with go-cmp for testing
  • Loading branch information
karlkfi committed Aug 28, 2021
1 parent b168a97 commit cee78a1
Show file tree
Hide file tree
Showing 33 changed files with 1,296 additions and 226 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ module sigs.k8s.io/cli-utils
go 1.16

require (
github.com/google/go-cmp v0.5.6 // indirect
github.com/google/uuid v1.2.0
github.com/onsi/ginkgo v1.16.2
github.com/onsi/gomega v1.12.0
github.com/pkg/errors v0.9.1
github.com/spf13/cobra v1.1.3
github.com/stretchr/testify v1.7.0
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,9 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gofuzz v1.1.0 h1:Hsa8mG0dQ46ij8Sl2AYJDUv1oA9/d6Vk+3LG99Oe02g=
github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
Expand Down
3 changes: 0 additions & 3 deletions pkg/apply/event/actiongroupeventtype_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions pkg/apply/event/applyeventoperation_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions pkg/apply/event/deleteeventoperation_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions pkg/apply/event/pruneeventoperation_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions pkg/apply/event/resourceaction_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions pkg/apply/event/type_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/apply/prune/prune_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ func TestGetPruneObjs(t *testing.T) {
expectedIds, err := object.UnstructuredsToObjMetas(tc.expectedObjs)
require.NoError(t, err)

if !object.SetEquals(expectedIds, actualIds) {
if !object.IsSetEqual(expectedIds, actualIds) {
t.Errorf("expected prune objects (%v), got (%v)", expectedIds, actualIds)
}
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/apply/solver/solver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func TestTaskQueueBuilder_AppendApplyWaitTasks(t *testing.T) {
actWaitTask := toWaitTask(t, actualTask)
assert.Equal(t, len(expTsk.Ids), len(actWaitTask.Ids))
// Order is NOT important for ids stored within task.
if !object.SetEquals(expTsk.Ids, actWaitTask.Ids) {
if !object.IsSetEqual(expTsk.Ids, actWaitTask.Ids) {
t.Errorf("expected wait ids (%v), got (%v)",
expTsk.Ids, actWaitTask.Ids)
}
Expand Down Expand Up @@ -659,7 +659,7 @@ func TestTaskQueueBuilder_AppendPruneWaitTasks(t *testing.T) {
case *taskrunner.WaitTask:
actWaitTask := toWaitTask(t, actualTask)
assert.Equal(t, len(expTsk.Ids), len(actWaitTask.Ids))
if !object.SetEquals(expTsk.Ids, actWaitTask.Ids) {
if !object.IsSetEqual(expTsk.Ids, actWaitTask.Ids) {
t.Errorf("expected wait ids (%v), got (%v)",
expTsk.Ids, actWaitTask.Ids)
}
Expand Down
158 changes: 158 additions & 0 deletions pkg/apply/task/apply_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io/ioutil"
"strings"

"github.com/pkg/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -27,6 +28,7 @@ import (
"sigs.k8s.io/cli-utils/pkg/common"
"sigs.k8s.io/cli-utils/pkg/inventory"
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/cli-utils/pkg/object/mutation"
)

// applyOptions defines the two key functions on the ApplyOptions
Expand Down Expand Up @@ -127,6 +129,27 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
continue
}
}
// Update cache
if clusterObj == nil {
taskContext.ResourceCache().Remove(id)
} else {
taskContext.ResourceCache().Set(id, clusterObj)
}

// Handle apply-time-mutation
if mutation.HasAnnotation(obj) {
err = applyMutation(taskContext.ResourceCache(), dynamic, a.Mapper, obj)
if err != nil {
if klog.V(4).Enabled() {
klog.Errorf("error (%s) rendering apply-time-mutation on %s/%s",
err, info.Namespace, info.Name)
}
taskContext.EventChannel() <- createApplyFailedEvent(id, err)
taskContext.CaptureResourceFailure(id)
continue
}
}

canApply, err := inventory.CanApply(a.InvInfo, clusterObj, a.InventoryPolicy)
if !canApply {
klog.V(5).Infof("can not apply %s/%s--continue",
Expand Down Expand Up @@ -169,6 +192,141 @@ func (a *ApplyTask) Start(taskContext *taskrunner.TaskContext) {
}()
}

// applyMutation parses the apply-time-mutation annotation and loops through the
// substitutions, applying each of them to the supplied target object.
func applyMutation(resourceCache *taskrunner.ResourceCache, client dynamic.Interface, mapper meta.RESTMapper, obj *unstructured.Unstructured) error {
subs, err := mutation.ReadAnnotation(obj)
if err != nil {
return errors.Wrapf(err, "failed to read jsonpath field in target resource: %v", mutation.NewResourceReference(obj))
}

targetID := object.UnstructuredToObjMetaOrDie(obj)
klog.V(4).Infof("target resource %s/%s: %#v", targetID.Namespace, targetID.Name, obj)

for _, sub := range subs {
sourceRef := sub.SourceRef

// lookup source resource from cache or cluster
sourceObj, err := getObjectFromReference(resourceCache, client, mapper, sourceRef)
if err != nil {
return errors.Wrapf(err, "failed to retrieve resource from sourceRef: %v", sourceRef)
}

sourceID := object.UnstructuredToObjMetaOrDie(sourceObj)
klog.V(4).Infof("source resource %s/%s: %#v", sourceID.Namespace, sourceID.Name, sourceObj)

// lookup target field in target resource
targetValue, err := readFieldValue(obj, sub.TargetPath)
if err != nil {
return errors.Wrapf(err, "failed to reading jsonpath field from target resource: %v", mutation.NewResourceReference(obj))
}

// lookup source field in source resource
sourceValue, err := readFieldValue(sourceObj, sub.SourcePath)
if err != nil {
return errors.Wrapf(err, "failed to reading jsonpath field from source resource: %v", sourceRef)
}

// substitute token for source field value
newValue := strings.ReplaceAll(targetValue, sub.Token, sourceValue)

klog.V(4).Infof("substitution on %s/%s: source=%q, token=%q, old=%q, new=%q",
targetID.Namespace, targetID.Name, sourceValue, sub.Token, targetValue, newValue)

// update target field in target resource
err = writeFieldValue(obj, sub.TargetPath, newValue)
if err != nil {
return errors.Wrapf(err, "failed to set value to jsonpath field in target resource: %v", mutation.NewResourceReference(obj))
}
}

return nil
}

// getObjectFromReference returns a cached resource, if cached, otherwise it
// gets the resource from the cluster.
func getObjectFromReference(
resourceCache *taskrunner.ResourceCache,
client dynamic.Interface,
mapper meta.RESTMapper,
ref mutation.ResourceReference,
) (*unstructured.Unstructured, error) {
// validate resource reference
sourceObjMeta, err := mutation.ResourceReferenceToObjMeta(ref)
if err != nil {
return nil, errors.Wrapf(err, "failed to validate resource reference: %v", ref)
}

// lookup source resource from cache
sourceObj, found := resourceCache.Get(sourceObjMeta)
if found && sourceObj != nil {
return sourceObj, nil
}

// lookup source resource using resource version, if specified
sourceGvk := ref.GroupVersionKind()
versions := []string{}
if sourceGvk.Version == "" {
versions = append(versions, sourceGvk.Version)
}

// lookup mapping of source resource
mapping, err := mapper.RESTMapping(sourceGvk.GroupKind(), versions...)
if err != nil {
return nil, errors.Wrapf(err, "failed to map resource reference to valid type: %v", ref)
}

// lookup source resource from cluster
namespacedClient := client.Resource(mapping.Resource).Namespace(ref.Namespace)
sourceObj, err = namespacedClient.Get(context.TODO(), ref.Name, metav1.GetOptions{})
if err != nil {
return nil, errors.Wrapf(err, "failed to retrieve resource from cluster: %v", ref)
}

return sourceObj, nil
}

func readFieldValue(obj *unstructured.Unstructured, path string) (string, error) {
if path == "" {
return "", errors.New("empty jsonpath")
}

// strip optional root index
pathArray := strings.Split(path, ".")
if pathArray[0] == "$" || pathArray[0] == "" {
pathArray = pathArray[1:]
}

// get path value
value, found, err := unstructured.NestedString(obj.Object, pathArray...)
if err != nil {
return "", errors.Wrapf(err, "failed to read jsonpath value: %q", path)
}
if !found {
return "", errors.Wrapf(err, "jsonpath field not found: %q", path)
}
return value, nil
}

func writeFieldValue(obj *unstructured.Unstructured, path, value string) error {
if path == "" {
return errors.New("empty jsonpath")
}

// strip optional root index
pathArray := strings.Split(path, ".")
if pathArray[0] == "$" || pathArray[0] == "" {
pathArray = pathArray[1:]
}

// set path value
err := unstructured.SetNestedField(obj.Object, value, pathArray...)
if err != nil {
return errors.Wrapf(err, "failed to read jsonpath value: %q", path)
}
return nil
}

func newApplyOptions(eventChannel chan event.Event, serverSideOptions common.ServerSideOptions,
strategy common.DryRunStrategy, factory util.Factory) (applyOptions, dynamic.Interface, error) {
discovery, err := factory.ToDiscoveryClient()
Expand Down
4 changes: 2 additions & 2 deletions pkg/apply/task/apply_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func TestApplyTask_BasicAppliedObjects(t *testing.T) {
require.NoError(t, err)

actual := taskContext.AppliedResources()
if !object.SetEquals(expected, actual) {
if !object.IsSetEqual(expected, actual) {
t.Errorf("expected (%s) inventory resources, got (%s)", expected, actual)
}
})
Expand Down Expand Up @@ -673,7 +673,7 @@ func TestApplyTaskWithDifferentInventoryAnnotation(t *testing.T) {
actualUids := taskContext.AppliedResourceUIDs()
assert.Equal(t, len(tc.expectedObjects), len(actualUids))
actualObjs := taskContext.AppliedResources()
if !object.SetEquals(tc.expectedObjects, actualObjs) {
if !object.IsSetEqual(tc.expectedObjects, actualObjs) {
t.Errorf("expected applied objects (%v), got (%v)", tc.expectedObjects, actualObjs)
}
})
Expand Down
4 changes: 2 additions & 2 deletions pkg/apply/task/inv_add_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestInvAddTask(t *testing.T) {
applyIds, err := object.UnstructuredsToObjMetas(tc.applyObjs)
require.NoError(t, err)

if !object.SetEquals(applyIds, task.Identifiers()) {
if !object.IsSetEqual(applyIds, task.Identifiers()) {
t.Errorf("expected task ids (%s), got (%s)", applyIds, task.Identifiers())
}
task.Start(context)
Expand All @@ -131,7 +131,7 @@ func TestInvAddTask(t *testing.T) {
t.Errorf("unexpected error running InvAddTask: %s", result.Err)
}
actual, _ := client.GetClusterObjs(nil, common.DryRunNone)
if !object.SetEquals(tc.expectedObjs, actual) {
if !object.IsSetEqual(tc.expectedObjs, actual) {
t.Errorf("expected merged inventory (%s), got (%s)", tc.expectedObjs, actual)
}
})
Expand Down
2 changes: 1 addition & 1 deletion pkg/apply/task/inv_set_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func TestInvSetTask(t *testing.T) {
t.Errorf("unexpected error running InvAddTask: %s", result.Err)
}
actual, _ := client.GetClusterObjs(nil, common.DryRunNone)
if !object.SetEquals(tc.expectedObjs, actual) {
if !object.IsSetEqual(tc.expectedObjs, actual) {
t.Errorf("expected merged inventory (%s), got (%s)", tc.expectedObjs, actual)
}
})
Expand Down
8 changes: 8 additions & 0 deletions pkg/apply/taskrunner/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func NewTaskContext(eventChannel chan event.Event) *TaskContext {
appliedResources: make(map[object.ObjMetadata]applyInfo),
failedResources: make(map[object.ObjMetadata]struct{}),
pruneFailures: make(map[object.ObjMetadata]struct{}),
resourceCache: NewResourceCache(),
}
}

Expand All @@ -35,6 +36,9 @@ type TaskContext struct {

// pruneFailures records the IDs of resources that are failed during pruning.
pruneFailures map[object.ObjMetadata]struct{}

// resourceCache records the latest known state of resources in the cluster.
resourceCache *ResourceCache
}

func (tc *TaskContext) TaskChannel() chan TaskResult {
Expand All @@ -45,6 +49,10 @@ func (tc *TaskContext) EventChannel() chan event.Event {
return tc.eventChannel
}

func (tc *TaskContext) ResourceCache() *ResourceCache {
return tc.resourceCache
}

// ResourceApplied updates the context with information about the
// resource identified by the provided id. Currently, we keep information
// about the generation of the resource after the apply operation completed.
Expand Down
Loading

0 comments on commit cee78a1

Please sign in to comment.