Skip to content

Commit

Permalink
Add CachingPoller and ApplyTimeMutator injection
Browse files Browse the repository at this point in the history
This allows resources to be cached and retrieved more abstractly,
without injecting the cache or dynamic client into task context or
modifying the runner to update the cache.
  • Loading branch information
karlkfi committed Aug 31, 2021
1 parent 2e33ed8 commit 5aca3cf
Show file tree
Hide file tree
Showing 11 changed files with 332 additions and 202 deletions.
21 changes: 19 additions & 2 deletions pkg/apply/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"sigs.k8s.io/cli-utils/pkg/apply/cache"
"sigs.k8s.io/cli-utils/pkg/apply/event"
"sigs.k8s.io/cli-utils/pkg/apply/filter"
"sigs.k8s.io/cli-utils/pkg/apply/info"
"sigs.k8s.io/cli-utils/pkg/apply/mutator"
"sigs.k8s.io/cli-utils/pkg/apply/poller"
"sigs.k8s.io/cli-utils/pkg/apply/prune"
"sigs.k8s.io/cli-utils/pkg/apply/solver"
Expand Down Expand Up @@ -190,10 +192,25 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
LocalNamespaces: localNamespaces(invInfo, object.UnstructuredsToObjMetasOrDie(objects)),
},
}
// Use an in-memory cache for the duration of this task run.
resourceCache := cache.NewResourceCacheMap()
// Use the status poller to update the resource cache.
cachingPoller := &cache.ResourceCachingPoller{
Poller: a.statusPoller,
ResourceCache: resourceCache,
}
// Build list of apply mutators.
applyMutators := []mutator.Interface{
&mutator.ApplyTimeMutator{
Client: client,
Mapper: mapper,
ResourceCache: resourceCache,
},
}
// Build the task queue by appending tasks in the proper order.
taskQueue, err := taskBuilder.
AppendInvAddTask(invInfo, applyObjs, options.DryRunStrategy).
AppendApplyWaitTasks(applyObjs, applyFilters, opts).
AppendApplyWaitTasks(applyObjs, applyFilters, applyMutators, opts).
AppendPruneWaitTasks(pruneObjs, pruneFilters, opts).
AppendInvSetTask(invInfo, options.DryRunStrategy).
Build()
Expand All @@ -211,7 +228,7 @@ func (a *Applier) Run(ctx context.Context, invInfo inventory.InventoryInfo, obje
// Create a new TaskStatusRunner to execute the taskQueue.
klog.V(4).Infoln("applier building TaskStatusRunner...")
allIds := object.UnstructuredsToObjMetasOrDie(append(applyObjs, pruneObjs...))
runner := taskrunner.NewTaskStatusRunner(allIds, a.statusPoller)
runner := taskrunner.NewTaskStatusRunner(allIds, cachingPoller)
klog.V(4).Infoln("applier running TaskStatusRunner...")
err = runner.Run(ctx, taskQueue.ToChannel(), eventChannel, taskrunner.Options{
PollInterval: options.PollInterval,
Expand Down
15 changes: 15 additions & 0 deletions pkg/apply/cache/resource_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package cache

import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"sigs.k8s.io/cli-utils/pkg/object"
)

// ResourceCache stores unstructured resource objects in memory
type ResourceCache interface {
Put(obj *unstructured.Unstructured) error
Set(objMeta object.ObjMetadata, obj *unstructured.Unstructured)
Get(objMeta object.ObjMetadata) (*unstructured.Unstructured, bool)
Remove(objMeta object.ObjMetadata)
Clear()
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2020 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0

package taskrunner
package cache

import (
"github.com/pkg/errors"
Expand All @@ -10,21 +10,21 @@ import (
"sigs.k8s.io/cli-utils/pkg/object"
)

// ResourceCache stores unstructured resource objects in memory
type ResourceCache struct {
// ResourceCacheMap stores unstructured resource objects in a map.
type ResourceCacheMap struct {
cache map[object.ObjMetadata]*unstructured.Unstructured
}

// NewResourceCache returns a new empty cache
func NewResourceCache() *ResourceCache {
rc := &ResourceCache{}
// NewResourceCacheMap returns a new empty ResourceCacheMap
func NewResourceCacheMap() *ResourceCacheMap {
rc := &ResourceCacheMap{}
rc.Clear()
return rc
}

// Put inserts the resource into the cache, replacing any existing resource with
// the same ID. Returns an error if resource is invalid.
func (rc *ResourceCache) Put(obj *unstructured.Unstructured) error {
func (rc *ResourceCacheMap) Put(obj *unstructured.Unstructured) error {
objMeta, err := object.UnstructuredToObjMeta(obj)
if err != nil {
return errors.Wrap(err, "failed to create resource cache key")
Expand All @@ -35,13 +35,13 @@ func (rc *ResourceCache) Put(obj *unstructured.Unstructured) error {

// Put inserts the resource into the cache, replacing any existing resource with
// the same ID.
func (rc *ResourceCache) Set(objMeta object.ObjMetadata, obj *unstructured.Unstructured) {
func (rc *ResourceCacheMap) Set(objMeta object.ObjMetadata, obj *unstructured.Unstructured) {
rc.cache[objMeta] = obj
}

// Get retrieves the resource from the cache by ID. Returns (nil, true) if not
// found in the cache.
func (rc *ResourceCache) Get(objMeta object.ObjMetadata) (*unstructured.Unstructured, bool) {
func (rc *ResourceCacheMap) Get(objMeta object.ObjMetadata) (*unstructured.Unstructured, bool) {
obj, found := rc.cache[objMeta]
if klog.V(4).Enabled() {
if found {
Expand All @@ -54,11 +54,11 @@ func (rc *ResourceCache) Get(objMeta object.ObjMetadata) (*unstructured.Unstruct
}

// Remove the resource from the cache by ID.
func (rc *ResourceCache) Remove(objMeta object.ObjMetadata) {
func (rc *ResourceCacheMap) Remove(objMeta object.ObjMetadata) {
delete(rc.cache, objMeta)
}

// Clear the cache.
func (rc *ResourceCache) Clear() {
func (rc *ResourceCacheMap) Clear() {
rc.cache = make(map[object.ObjMetadata]*unstructured.Unstructured)
}
60 changes: 60 additions & 0 deletions pkg/apply/cache/resource_caching_poller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package cache

import (
"context"

"sigs.k8s.io/cli-utils/pkg/apply/poller"
"sigs.k8s.io/cli-utils/pkg/kstatus/polling"
pollevent "sigs.k8s.io/cli-utils/pkg/kstatus/polling/event"
"sigs.k8s.io/cli-utils/pkg/kstatus/status"
"sigs.k8s.io/cli-utils/pkg/object"
)

// ResourceCachingPoller wraps a poller with an in-memory cache that is updated on
// every ResourceUpdateEvent.
type ResourceCachingPoller struct {
Poller poller.Poller
ResourceCache ResourceCache
}

// Poll starts polling using the wrapped Poller, caching the unstructured
// resources as their updates are received.
// All inputs are passed to the wrapped poller.
// All events are passed to the caller on the return channel.
// Shut down the poller by closing the Context.Done channel.
func (cp *ResourceCachingPoller) Poll(ctx context.Context, identifiers []object.ObjMetadata, options polling.Options) <-chan pollevent.Event {
outCh := make(chan pollevent.Event)
inCh := cp.Poller.Poll(ctx, identifiers, options)

// Assume the wrapped Poller will close the output channel when the context is done.
// This avoids losing any events sent after the context is done.

go func() {
// close the output channel on exit to signal completion to the caller
defer close(outCh)

for {
// Process events from the inCh and send to the outCh
event, ok := <-inCh

// if input channel is closed, stop watching events
if !ok {
break
}

// Update cache on resource update events
if event.EventType == pollevent.ResourceUpdateEvent {
if event.Resource.Status == status.NotFoundStatus {
cp.ResourceCache.Remove(event.Resource.Identifier)
} else {
cp.ResourceCache.Set(event.Resource.Identifier, event.Resource.Resource)
}
}

// echo the input event to the caller
outCh <- event
}
}()

return outCh
}
176 changes: 176 additions & 0 deletions pkg/apply/mutator/apply-time-mutator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package mutator

import (
"context"
"fmt"
"strings"

"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"
"sigs.k8s.io/cli-utils/pkg/apply/cache"
"sigs.k8s.io/cli-utils/pkg/object"
"sigs.k8s.io/cli-utils/pkg/object/mutation"
)

// ApplyTimeMutator mutates a resource by injecting values specified by the
// apply-time-mutation annotation.
// The optional ResourceCache will be used to speed up source reosurce lookups,
// if specified.
// Implements the Mutator interface
type ApplyTimeMutator struct {
Client dynamic.Interface
Mapper meta.RESTMapper
ResourceCache cache.ResourceCache
}

// Name returns a mutator identifier for logging.
func (atm *ApplyTimeMutator) Name() string {
return "ApplyTimeMutator"
}

// Mutate parses the apply-time-mutation annotation and loops through the
// substitutions, applying each of them to the supplied target object.
// Returns true with a reason, if mutation was performed.
func (atm *ApplyTimeMutator) Mutate(obj *unstructured.Unstructured) (bool, string, error) {
mutated := false
reason := ""

if !mutation.HasAnnotation(obj) {
return mutated, reason, nil
}

subs, err := mutation.ReadAnnotation(obj)
if err != nil {
return mutated, reason, errors.Wrapf(err, "failed to read jsonpath field in target resource: %v", mutation.NewResourceReference(obj))
}

targetID := object.UnstructuredToObjMetaOrDie(obj)
klog.V(5).Infof("target resource %s/%s: %#v", targetID.Namespace, targetID.Name, obj)

for _, sub := range subs {
sourceRef := sub.SourceRef

// lookup source resource from cache or cluster
sourceObj, err := atm.getObject(sourceRef)
if err != nil {
return mutated, reason, errors.Wrapf(err, "failed to retrieve resource from sourceRef: %v", sourceRef)
}

sourceID := object.UnstructuredToObjMetaOrDie(sourceObj)
klog.V(5).Infof("source resource %s/%s: %#v", sourceID.Namespace, sourceID.Name, sourceObj)

// lookup target field in target resource
targetValue, err := readFieldValue(obj, sub.TargetPath)
if err != nil {
return mutated, reason, errors.Wrapf(err, "failed to reading jsonpath field from target resource: %v", mutation.NewResourceReference(obj))
}

// lookup source field in source resource
sourceValue, err := readFieldValue(sourceObj, sub.SourcePath)
if err != nil {
return mutated, reason, errors.Wrapf(err, "failed to reading jsonpath field from source resource: %v", sourceRef)
}

// substitute token for source field value
newValue := strings.ReplaceAll(targetValue, sub.Token, sourceValue)

klog.V(5).Infof("substitution on %s/%s: source=%q, token=%q, old=%q, new=%q",
targetID.Namespace, targetID.Name, sourceValue, sub.Token, targetValue, newValue)

// update target field in target resource
err = writeFieldValue(obj, sub.TargetPath, newValue)
if err != nil {
return mutated, reason, errors.Wrapf(err, "failed to set value to jsonpath field in target resource: %v", mutation.NewResourceReference(obj))
}

mutated = true
reason = fmt.Sprintf("resource contained annotation: %s", mutation.Annotation)
}

return mutated, reason, nil
}

// getObject returns a cached resource, if cached and cache exists, otherwise
// the resource is revrieved from the cluster.
func (atm *ApplyTimeMutator) getObject(ref mutation.ResourceReference) (*unstructured.Unstructured, error) {
// validate resource reference
sourceObjMeta, err := mutation.ResourceReferenceToObjMeta(ref)
if err != nil {
return nil, errors.Wrapf(err, "failed to validate resource reference: %v", ref)
}

if atm.ResourceCache != nil {
// lookup source resource from cache
sourceObj, found := atm.ResourceCache.Get(sourceObjMeta)
if found && sourceObj != nil {
return sourceObj, nil
}
}

// lookup source resource using resource version, if specified
sourceGvk := ref.GroupVersionKind()
versions := []string{}
if sourceGvk.Version == "" {
versions = append(versions, sourceGvk.Version)
}

// lookup mapping of source resource
mapping, err := atm.Mapper.RESTMapping(sourceGvk.GroupKind(), versions...)
if err != nil {
return nil, errors.Wrapf(err, "failed to map resource reference to valid type: %v", ref)
}

// lookup source resource from cluster
namespacedClient := atm.Client.Resource(mapping.Resource).Namespace(ref.Namespace)
sourceObj, err := namespacedClient.Get(context.TODO(), ref.Name, metav1.GetOptions{})
if err != nil {
return nil, errors.Wrapf(err, "failed to retrieve resource from cluster: %v", ref)
}

return sourceObj, nil
}

func readFieldValue(obj *unstructured.Unstructured, path string) (string, error) {
if path == "" {
return "", errors.New("empty jsonpath")
}

// strip optional root index
pathArray := strings.Split(path, ".")
if pathArray[0] == "$" || pathArray[0] == "" {
pathArray = pathArray[1:]
}

// get path value
value, found, err := unstructured.NestedString(obj.Object, pathArray...)
if err != nil {
return "", errors.Wrapf(err, "failed to read jsonpath value: %q", path)
}
if !found {
return "", errors.Wrapf(err, "jsonpath field not found: %q", path)
}
return value, nil
}

func writeFieldValue(obj *unstructured.Unstructured, path, value string) error {
if path == "" {
return errors.New("empty jsonpath")
}

// strip optional root index
pathArray := strings.Split(path, ".")
if pathArray[0] == "$" || pathArray[0] == "" {
pathArray = pathArray[1:]
}

// set path value
err := unstructured.SetNestedField(obj.Object, value, pathArray...)
if err != nil {
return errors.Wrapf(err, "failed to read jsonpath value: %q", path)
}
return nil
}
20 changes: 20 additions & 0 deletions pkg/apply/mutator/mutator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright 2021 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0

package mutator

import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)

// Interface decouples apply-time-mutation
// from the concrete structs used for applying.
type Interface interface {
// Name returns a filter name (usually for logging).
Name() string
// Mutate returns true if the object was mutated.
// This allows the mutator to decide if mutation is needed.
// If mutated, a reason string is returned.
// If an error happens during mutation, it is returned.
Mutate(obj *unstructured.Unstructured) (bool, string, error)
}
Loading

0 comments on commit 5aca3cf

Please sign in to comment.