Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Delete resources no longer in git #1442

Merged
merged 24 commits into from
Feb 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
21f04ce
feat: annotate stack with label and checksum
Timer Oct 10, 2018
41a6d38
Scan cluster stack and compare hash during sync
Timer Oct 11, 2018
0840915
feat: delete orphaned resources
Timer Oct 12, 2018
f76156e
Move garbage collection into cluster implementation
squaremo Oct 18, 2018
e2b140f
Bring tests up to date with sync changes
squaremo Nov 8, 2018
9fd2441
Clear up use of annotations and labels for sync
squaremo Nov 26, 2018
9c15d5c
Avoid deleting resources that did not apply OK
squaremo Nov 26, 2018
5810dd2
Respect "ignore" when syncing and GCing
squaremo Dec 19, 2018
6173f6b
Move kubernetes Sync code into sync.go
squaremo Dec 20, 2018
b26ad33
Use checksum per resource
squaremo Dec 20, 2018
d52f799
Rationalise logging of sync ignore annotation
squaremo Jan 10, 2019
1ed6e23
Remove Manifests#ParseManifests
squaremo Jan 14, 2019
a6238ba
Account for namespace defaulting
squaremo Jan 16, 2019
d48609a
Assign namespaces after parsing
squaremo Feb 4, 2019
ce36998
Invalidate discovery cache when custom resources change
squaremo Feb 7, 2019
927f9f3
Simplify sync structures
squaremo Feb 20, 2019
60346ef
Use cached discovery client in sync too
squaremo Feb 21, 2019
692a96b
Separate cached discovery code (and testing)
squaremo Feb 21, 2019
145fabe
Tidy kuberesource methods
squaremo Feb 21, 2019
93bccbf
Use git repo config to identify SyncSet
squaremo Feb 25, 2019
1d86d57
Delegate GroupVersion parsing to schema.ParseGroupVersion
2opremio Feb 26, 2019
aeef2c6
Obtain the default namespace directly from kubeconfig
2opremio Feb 26, 2019
c6e2c4e
Factor out garbage-collection code
2opremio Feb 26, 2019
e949383
Document the experimental garbage collection feature
squaremo Feb 27, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 32 additions & 4 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ required = ["k8s.io/code-generator/cmd/client-gen"]
name = "k8s.io/apimachinery"
version = "kubernetes-1.11.0"

[[constraint]]
name = "k8s.io/apiextensions-apiserver"
version = "kubernetes-1.11.0"

[[constraint]]
name = "k8s.io/client-go"
version = "8.0.0"
Expand Down Expand Up @@ -57,3 +61,7 @@ required = ["k8s.io/code-generator/cmd/client-gen"]
[[override]]
name = "github.com/BurntSushi/toml"
version = "v0.3.1"

[[constraint]]
name = "github.com/imdario/mergo"
version = "0.3.2"
4 changes: 2 additions & 2 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type Cluster interface {
SomeControllers([]flux.ResourceID) ([]Controller, error)
Ping() error
Export() ([]byte, error)
Sync(SyncDef) error
Sync(SyncSet) error
PublicSSHKey(regenerate bool) (ssh.PublicKey, error)
}

Expand Down Expand Up @@ -74,7 +74,7 @@ type Controller struct {
Rollout RolloutStatus
// Errors during the recurring sync from the Git repository to the
// cluster will surface here.
SyncError error
SyncError error

Containers ContainersOrExcuse
}
Expand Down
102 changes: 102 additions & 0 deletions cluster/kubernetes/cached_disco.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package kubernetes

import (
"sync"
"time"

crdv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
crd "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/discovery"
discocache "k8s.io/client-go/discovery/cached"
toolscache "k8s.io/client-go/tools/cache"
)

// This exists so that we can do our own invalidation.
type cachedDiscovery struct {
discovery.CachedDiscoveryInterface

invalidMu sync.Mutex
invalid bool
}

// The k8s.io/client-go v8.0.0 implementation of MemCacheDiscovery
// refreshes the cached values, synchronously, when Invalidate() is
// called. Since we want to invalidate every time a CRD changes, but
// only refresh values when we need to read the cached values, this
// method defers the invalidation until a read is done.
func (d *cachedDiscovery) Invalidate() {
d.invalidMu.Lock()
d.invalid = true
d.invalidMu.Unlock()
}

// ServerResourcesForGroupVersion is the method used by the
// namespacer; so, this is the one where we check whether the cache
// has been invalidated. A cachedDiscovery implementation for more
// general use would do this for all methods (that weren't implemented
// purely in terms of other methods).
func (d *cachedDiscovery) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
d.invalidMu.Lock()
invalid := d.invalid
d.invalid = false
d.invalidMu.Unlock()
if invalid {
d.CachedDiscoveryInterface.Invalidate()
}
return d.CachedDiscoveryInterface.ServerResourcesForGroupVersion(groupVersion)
}

// MakeCachedDiscovery constructs a CachedDicoveryInterface that will
// be invalidated whenever the set of CRDs change. The idea is that
// the only avenue of a change to the API resources in a running
// system is CRDs being added, updated or deleted.
func MakeCachedDiscovery(d discovery.DiscoveryInterface, c crd.Interface, shutdown <-chan struct{}) discovery.CachedDiscoveryInterface {
result, _, _ := makeCachedDiscovery(d, c, shutdown, makeInvalidatingHandler)
return result
}

// ---

func makeInvalidatingHandler(cached discovery.CachedDiscoveryInterface) toolscache.ResourceEventHandler {
var handler toolscache.ResourceEventHandler = toolscache.ResourceEventHandlerFuncs{
AddFunc: func(_ interface{}) {
cached.Invalidate()
},
UpdateFunc: func(_, _ interface{}) {
cached.Invalidate()
},
DeleteFunc: func(_ interface{}) {
cached.Invalidate()
},
}
return handler
}

type makeHandle func(discovery.CachedDiscoveryInterface) toolscache.ResourceEventHandler

// makeCachedDiscovery constructs a cached discovery client, with more
// flexibility than MakeCachedDiscovery; e.g., with extra handlers for
// testing.
func makeCachedDiscovery(d discovery.DiscoveryInterface, c crd.Interface, shutdown <-chan struct{}, handlerFn makeHandle) (*cachedDiscovery, toolscache.Store, toolscache.Controller) {
cachedDisco := &cachedDiscovery{CachedDiscoveryInterface: discocache.NewMemCacheClient(d)}
// We have an empty cache, so it's _a priori_ invalid. (Yes, that's the zero value, but better safe than sorry)
cachedDisco.Invalidate()

crdClient := c.ApiextensionsV1beta1().CustomResourceDefinitions()
lw := &toolscache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return crdClient.List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return crdClient.Watch(options)
},
}

handler := handlerFn(cachedDisco)
store, controller := toolscache.NewInformer(lw, &crdv1beta1.CustomResourceDefinition{}, 5*time.Minute, handler)
go controller.Run(shutdown)
return cachedDisco, store, controller
}
134 changes: 134 additions & 0 deletions cluster/kubernetes/cached_disco_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package kubernetes

import (
"testing"
"time"

crdv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
crdfake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/discovery"
toolscache "k8s.io/client-go/tools/cache"
)

type chainHandler struct {
first toolscache.ResourceEventHandler
next toolscache.ResourceEventHandler
}

func (h chainHandler) OnAdd(obj interface{}) {
h.first.OnAdd(obj)
h.next.OnAdd(obj)
}

func (h chainHandler) OnUpdate(old, new interface{}) {
h.first.OnUpdate(old, new)
h.next.OnUpdate(old, new)
}

func (h chainHandler) OnDelete(old interface{}) {
h.first.OnDelete(old)
h.next.OnDelete(old)
}

func TestCachedDiscovery(t *testing.T) {
coreClient := makeFakeClient()

myCRD := &crdv1beta1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{
Name: "custom",
},
}
crdClient := crdfake.NewSimpleClientset(myCRD)

// Here's my fake API resource
myAPI := &metav1.APIResourceList{
GroupVersion: "foo/v1",
APIResources: []metav1.APIResource{
{Name: "customs", SingularName: "custom", Namespaced: true, Kind: "Custom", Verbs: getAndList},
},
}

apiResources := coreClient.Fake.Resources
coreClient.Fake.Resources = append(apiResources, myAPI)

shutdown := make(chan struct{})
defer close(shutdown)

// this extra handler means we can synchronise on the add later
// being processed
allowAdd := make(chan interface{})

addHandler := toolscache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
allowAdd <- obj
},
}
makeHandler := func(d discovery.CachedDiscoveryInterface) toolscache.ResourceEventHandler {
return chainHandler{first: addHandler, next: makeInvalidatingHandler(d)}
}

cachedDisco, store, _ := makeCachedDiscovery(coreClient.Discovery(), crdClient, shutdown, makeHandler)

saved := getDefaultNamespace
getDefaultNamespace = func() (string, error) { return "bar-ns", nil }
defer func() { getDefaultNamespace = saved }()
namespacer, err := NewNamespacer(cachedDisco)
if err != nil {
t.Fatal(err)
}

namespaced, err := namespacer.lookupNamespaced("foo/v1", "Custom")
if err != nil {
t.Fatal(err)
}
if !namespaced {
t.Error("got false from lookupNamespaced, expecting true")
}

// In a cluster, we'd rely on the apiextensions server to reflect
// changes to CRDs to changes in the API resources. Here I will be
// more narrow, and just test that the API resources are reloaded
// when a CRD is updated or deleted.

// This is delicate: we can't just change the value in-place,
// since that will update everyone's record of it, and the test
// below will trivially succeed.
updatedAPI := &metav1.APIResourceList{
GroupVersion: "foo/v1",
APIResources: []metav1.APIResource{
{Name: "customs", SingularName: "custom", Namespaced: false /* <-- changed */, Kind: "Custom", Verbs: getAndList},
},
}
coreClient.Fake.Resources = append(apiResources, updatedAPI)

// Provoke the cached discovery client into invalidating
_, err = crdClient.ApiextensionsV1beta1().CustomResourceDefinitions().Update(myCRD)
if err != nil {
t.Fatal(err)
}

// Wait for the update to "go through"
select {
case <-allowAdd:
break
case <-time.After(time.Second):
t.Fatal("timed out waiting for Add to happen")
}

_, exists, err := store.Get(myCRD)
if err != nil {
t.Error(err)
}
if !exists {
t.Error("does not exist")
}

namespaced, err = namespacer.lookupNamespaced("foo/v1", "Custom")
if err != nil {
t.Fatal(err)
}
if namespaced {
t.Error("got true from lookupNamespaced, expecting false (after changing it)")
}
}
1 change: 0 additions & 1 deletion cluster/kubernetes/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,4 @@ Package kubernetes provides implementations of `Cluster` and
`Manifests` that interact with the Kubernetes API (using kubectl or
the k8s API client).
*/

2opremio marked this conversation as resolved.
Show resolved Hide resolved
package kubernetes
Loading