Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Tolerate empty GroupVersions when syncing #1957

Merged
merged 4 commits into from
Apr 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions cluster/kubernetes/cached_disco.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kubernetes

import (
"fmt"
"sync"
"time"

Expand All @@ -10,7 +11,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/discovery"
discocache "k8s.io/client-go/discovery/cached"
"k8s.io/client-go/discovery/cached/memory"
toolscache "k8s.io/client-go/tools/cache"
)

Expand Down Expand Up @@ -46,7 +47,12 @@ func (d *cachedDiscovery) ServerResourcesForGroupVersion(groupVersion string) (*
if invalid {
d.CachedDiscoveryInterface.Invalidate()
}
return d.CachedDiscoveryInterface.ServerResourcesForGroupVersion(groupVersion)
result, err := d.CachedDiscoveryInterface.ServerResourcesForGroupVersion(groupVersion)
if err == memory.ErrCacheNotFound {
// improve the error returned from memcacheclient
err = fmt.Errorf("server resources for %s not found in cache; cache refreshes every 5 minutes", groupVersion)
}
return result, err
}

// MakeCachedDiscovery constructs a CachedDicoveryInterface that will
Expand Down Expand Up @@ -81,7 +87,7 @@ type makeHandle func(discovery.CachedDiscoveryInterface) toolscache.ResourceEven
// flexibility than MakeCachedDiscovery; e.g., with extra handlers for
// testing.
func makeCachedDiscovery(d discovery.DiscoveryInterface, c crd.Interface, shutdown <-chan struct{}, handlerFn makeHandle) (*cachedDiscovery, toolscache.Store, toolscache.Controller) {
cachedDisco := &cachedDiscovery{CachedDiscoveryInterface: discocache.NewMemCacheClient(d)}
cachedDisco := &cachedDiscovery{CachedDiscoveryInterface: memory.NewMemCacheClient(d)}
// We have an empty cache, so it's _a priori_ invalid. (Yes, that's the zero value, but better safe than sorry)
cachedDisco.Invalidate()

Expand Down
16 changes: 13 additions & 3 deletions cluster/kubernetes/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
rest "k8s.io/client-go/rest"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"

"github.com/weaveworks/flux"
"github.com/weaveworks/flux/cluster"
Expand Down Expand Up @@ -198,9 +199,18 @@ func (c *Cluster) getAllowedResourcesBySelector(selector string) (map[string]*ku
listOptions.LabelSelector = selector
}

resources, err := c.client.discoveryClient.ServerResources()
_, resources, err := c.client.discoveryClient.ServerGroupsAndResources()
if err != nil {
return nil, err
discErr, ok := err.(*discovery.ErrGroupDiscoveryFailed)
if !ok {
return nil, err
}
for gv, e := range discErr.Groups {
// Tolerate empty GroupVersions due to e.g. misconfigured custom metrics
if e.Error() != fmt.Sprintf("Got empty response for: %v", gv) {
return nil, err
}
}
}

result := map[string]*kuberesource{}
Expand Down
65 changes: 49 additions & 16 deletions cluster/kubernetes/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
// dynamicfake "k8s.io/client-go/dynamic/fake"
// k8sclient "k8s.io/client-go/kubernetes"
"github.com/stretchr/testify/assert"
crdfake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake"
"k8s.io/client-go/discovery"
dynamicfake "k8s.io/client-go/dynamic/fake"
k8sclient "k8s.io/client-go/kubernetes"
Expand All @@ -38,7 +39,7 @@ const (
defaultTestNamespace = "unusual-default"
)

func fakeClients() ExtendedClient {
func fakeClients() (ExtendedClient, func()) {
scheme := runtime.NewScheme()

// Set this to `true` to output a trace of the API actions called
Expand Down Expand Up @@ -67,6 +68,9 @@ func fakeClients() ExtendedClient {
coreClient := corefake.NewSimpleClientset(&corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: defaultTestNamespace}})
fluxClient := fluxfake.NewSimpleClientset()
dynamicClient := dynamicfake.NewSimpleDynamicClient(scheme)
crdClient := crdfake.NewSimpleClientset()
shutdown := make(chan struct{})
discoveryClient := MakeCachedDiscovery(coreClient.Discovery(), crdClient, shutdown)

// Assigned here, since this is _also_ used by the (fake)
// discovery client therein, and ultimately by
Expand All @@ -84,12 +88,14 @@ func fakeClients() ExtendedClient {
}
}

return ExtendedClient{
ec := ExtendedClient{
coreClient: coreClient,
fluxHelmClient: fluxClient,
dynamicClient: dynamicClient,
discoveryClient: coreClient.Discovery(),
discoveryClient: discoveryClient,
}

return ec, func() { close(shutdown) }
}

// fakeApplier is an Applier that just forwards changeset operations
Expand Down Expand Up @@ -232,19 +238,20 @@ func findAPIResource(gvr schema.GroupVersionResource, disco discovery.DiscoveryI

// ---

func setup(t *testing.T) (*Cluster, *fakeApplier) {
clients := fakeClients()
func setup(t *testing.T) (*Cluster, *fakeApplier, func()) {
clients, cancel := fakeClients()
applier := &fakeApplier{dynamicClient: clients.dynamicClient, coreClient: clients.coreClient, defaultNS: defaultTestNamespace}
kube := &Cluster{
applier: applier,
client: clients,
logger: log.NewNopLogger(),
logger: log.NewLogfmtLogger(os.Stdout),
}
return kube, applier
return kube, applier, cancel
}

func TestSyncNop(t *testing.T) {
kube, mock := setup(t)
kube, mock, cancel := setup(t)
defer cancel()
if err := kube.Sync(cluster.SyncSet{}); err != nil {
t.Errorf("%#v", err)
}
Expand All @@ -253,6 +260,24 @@ func TestSyncNop(t *testing.T) {
}
}

func TestSyncTolerateEmptyGroupVersion(t *testing.T) {
kube, _, cancel := setup(t)
defer cancel()

// Add a GroupVersion without API Resources
fakeClient := kube.client.coreClient.(*corefake.Clientset)
fakeClient.Resources = append(fakeClient.Resources, &metav1.APIResourceList{GroupVersion: "custom.metrics.k8s.io/v1beta1"})

// We should tolerate the error caused in the cache due to the
// GroupVersion being empty
err := kube.Sync(cluster.SyncSet{})
assert.NoError(t, err)

// No errors the second time either
err = kube.Sync(cluster.SyncSet{})
assert.NoError(t, err)
}

func TestSync(t *testing.T) {
const ns1 = `---
apiVersion: v1
Expand Down Expand Up @@ -362,7 +387,8 @@ metadata:
}

t.Run("sync adds and GCs resources", func(t *testing.T) {
kube, _ := setup(t)
kube, _, cancel := setup(t)
defer cancel()

// without GC on, resources persist if they are not mentioned in subsequent syncs.
test(t, kube, "", "", false)
Expand All @@ -379,7 +405,8 @@ metadata:
})

t.Run("sync won't incorrectly delete non-namespaced resources", func(t *testing.T) {
kube, _ := setup(t)
kube, _, cancel := setup(t)
defer cancel()
kube.GC = true

const nsDef = `
Expand All @@ -398,7 +425,8 @@ metadata:
// fallback (this would come from kubeconfig usually); and,
// for things that _don't_ have a namespace to have it
// stripped out.
kube, _ := setup(t)
kube, _, cancel := setup(t)
defer cancel()
kube.GC = true
const withoutNS = `
apiVersion: apps/v1
Expand All @@ -417,7 +445,8 @@ metadata:
})

t.Run("sync won't delete resources whose garbage collection mark was copied to", func(t *testing.T) {
kube, _ := setup(t)
kube, _, cancel := setup(t)
defer cancel()
kube.GC = true

depName := "dep"
Expand Down Expand Up @@ -464,7 +493,8 @@ metadata:
})

t.Run("sync won't delete if apply failed", func(t *testing.T) {
kube, _ := setup(t)
kube, _, cancel := setup(t)
defer cancel()
kube.GC = true

const defs1invalid = `---
Expand All @@ -481,7 +511,8 @@ metadata:
})

t.Run("sync doesn't apply or delete manifests marked with ignore", func(t *testing.T) {
kube, _ := setup(t)
kube, _, cancel := setup(t)
defer cancel()
kube.GC = true

const dep1 = `---
Expand Down Expand Up @@ -535,7 +566,8 @@ spec:
labels:
app: original
`
kube, _ := setup(t)
kube, _, cancel := setup(t)
defer cancel()
// This just checks the starting assumption: dep1 exists in the cluster
test(t, kube, ns1+dep1, ns1+dep1, false)

Expand Down Expand Up @@ -575,7 +607,8 @@ spec:
})

t.Run("sync doesn't update or delete a pre-existing resource marked with ignore", func(t *testing.T) {
kube, _ := setup(t)
kube, _, cancel := setup(t)
defer cancel()

const existing = `---
apiVersion: apps/v1
Expand Down
6 changes: 4 additions & 2 deletions cmd/fluxd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,10 @@ func main() {
logger.Log("version", version)

// Silence access errors logged internally by client-go
k8slog := log.With(logger, "type", "internal kubernetes error")
k8slog := log.With(logger,
"type", "internal kubernetes error",
"ts", log.DefaultTimestampUTC,
"caller", log.Caller(5)) // we want to log one level deeper than k8sruntime.HandleError
logErrorUnlessAccessRelated := func(err error) {
errLower := strings.ToLower(err.Error())
if k8serrors.IsForbidden(err) || k8serrors.IsNotFound(err) ||
Expand All @@ -203,7 +206,6 @@ func main() {
k8slog.Log("err", err)
}
k8sruntime.ErrorHandlers = []func(error){logErrorUnlessAccessRelated}

// Argument validation

// Sort out values for the git tag and notes ref. There are
Expand Down
2 changes: 1 addition & 1 deletion daemon/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,9 @@ func (d *Daemon) doSync(logger log.Logger, lastKnownSyncTagRev *string, warnedAb

var resourceErrors []event.ResourceError
if err := fluxsync.Sync(syncSetName, allResources, d.Cluster); err != nil {
logger.Log("err", err)
switch syncerr := err.(type) {
case cluster.SyncError:
logger.Log("err", err)
squaremo marked this conversation as resolved.
Show resolved Hide resolved
for _, e := range syncerr {
resourceErrors = append(resourceErrors, event.ResourceError{
ID: e.ResourceID,
Expand Down