Skip to content

Commit

Permalink
Replace multiListerWatcher with independent listWatchers per namespace
Browse files Browse the repository at this point in the history
The multiListerWatcher is a composite object encapsulating multiple
ListerWatchers and implements the ListerWatcher interface.
With the current implementation, when an individual lister fails, the
entire List operation fails. This causes no metrics to be shown when KSM
has no permissions to a single namespace.

In addition to this, the multiListerWatcher takes advantage of internal
implementation details if the client-go library by modifiying and
relying on the ResourceVersion metadata field. This introduces a bug
where reconnecting to the API server will break the multiListerWatcher
completely.

This commit replaces the multiListerWatcher with individual
ListerWatchers per each configured namespace, resolving both issues.

Signed-off-by: fpetkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Jun 21, 2021
1 parent cbfc906 commit e071c9f
Show file tree
Hide file tree
Showing 7 changed files with 297 additions and 283 deletions.
181 changes: 100 additions & 81 deletions internal/store/builder.go

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions pkg/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ package builder
import (
"context"

metricsstore "k8s.io/kube-state-metrics/v2/pkg/metrics_store"

"github.com/prometheus/client_golang/prometheus"
vpaclientset "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"

internalstore "k8s.io/kube-state-metrics/v2/internal/store"
ksmtypes "k8s.io/kube-state-metrics/v2/pkg/builder/types"
Expand Down Expand Up @@ -95,6 +96,6 @@ func (b *Builder) DefaultGenerateStoreFunc() ksmtypes.BuildStoreFunc {
}

// Build initializes and registers all enabled stores.
func (b *Builder) Build() []cache.Store {
func (b *Builder) Build() []metricsstore.MetricsWriter {
return b.internal.Build()
}
6 changes: 4 additions & 2 deletions pkg/builder/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package types
import (
"context"

metricsstore "k8s.io/kube-state-metrics/v2/pkg/metrics_store"

"github.com/prometheus/client_golang/prometheus"
vpaclientset "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/client/clientset/versioned"
clientset "k8s.io/client-go/kubernetes"
Expand All @@ -40,14 +42,14 @@ type BuilderInterface interface {
WithAllowDenyList(l AllowDenyLister)
WithGenerateStoreFunc(f BuildStoreFunc)
DefaultGenerateStoreFunc() BuildStoreFunc
Build() []cache.Store
Build() []metricsstore.MetricsWriter
}

// BuildStoreFunc function signature that is use to returns a cache.Store
type BuildStoreFunc func(metricFamilies []generator.FamilyGenerator,
expectedType interface{},
listWatchFunc func(kubeClient clientset.Interface, ns string) cache.ListerWatcher,
) cache.Store
) []*metricsstore.MetricsStore

// AllowDenyLister interface for AllowDeny lister that can allow or exclude metrics by there names
type AllowDenyLister interface {
Expand Down
170 changes: 0 additions & 170 deletions pkg/listwatch/listwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,8 @@ package listwatch

import (
"context"
"fmt"
"strings"
"sync"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -101,172 +97,6 @@ func NewFilteredUnprivilegedNamespaceListWatchFromClient(c cache.Getter, allowed
return &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}

// MultiNamespaceListerWatcher takes allowed and denied namespaces and a
// cache.ListerWatcher generator func and returns a single cache.ListerWatcher
// capable of operating on multiple namespaces.
//
// Allowed namespaces and denied namespaces are mutually exclusive.
// If allowed namespaces contain multiple items, the given denied namespaces have no effect.
// If the allowed namespaces includes exactly one entry with the value v1.NamespaceAll (empty string),
// the given denied namespaces are applied.
func MultiNamespaceListerWatcher(allowedNamespaces, deniedNamespaces []string, f func(string) cache.ListerWatcher) cache.ListerWatcher {
// If there is only one namespace then there is no need to create a
// multi lister watcher proxy.
if IsAllNamespaces(allowedNamespaces) {
return newDenylistListerWatcher(deniedNamespaces, f(allowedNamespaces[0]))
}
if len(allowedNamespaces) == 1 {
return f(allowedNamespaces[0])
}

var lws []cache.ListerWatcher
for _, n := range allowedNamespaces {
lws = append(lws, f(n))
}
return multiListerWatcher(lws)
}

// multiListerWatcher abstracts several cache.ListerWatchers, allowing them
// to be treated as a single cache.ListerWatcher.
type multiListerWatcher []cache.ListerWatcher

// List implements the ListerWatcher interface.
// It combines the output of the List method of every ListerWatcher into
// a single result.
func (mlw multiListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
l := metav1.List{}
var resourceVersions []string
for _, lw := range mlw {
list, err := lw.List(options)
if err != nil {
return nil, err
}
items, err := meta.ExtractList(list)
if err != nil {
return nil, err
}
metaObj, err := meta.ListAccessor(list)
if err != nil {
return nil, err
}
for _, item := range items {
l.Items = append(l.Items, runtime.RawExtension{Object: item.DeepCopyObject()})
}
resourceVersions = append(resourceVersions, metaObj.GetResourceVersion())
}
// Combine the resource versions so that the composite Watch method can
// distribute appropriate versions to each underlying Watch func.
l.ListMeta.ResourceVersion = strings.Join(resourceVersions, "/")
return &l, nil
}

// Watch implements the ListerWatcher interface.
// It returns a watch.Interface that combines the output from the
// watch.Interface of every cache.ListerWatcher into a single result chan.
func (mlw multiListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
resourceVersions := make([]string, len(mlw))
// Allow resource versions to be "".
if options.ResourceVersion != "" {
rvs := make([]string, 0, len(mlw))
if strings.Contains(options.ResourceVersion, "/") {
rvs = strings.Split(options.ResourceVersion, "/")
if len(rvs) != len(mlw) {
return nil, fmt.Errorf("expected resource version to have %d parts to match the number of ListerWatchers, actual: %d", len(mlw), len(rvs))
}
} else {
// watch reconnected and resource version is the latest one from event.Object has no "/"
for i := 0; i < len(mlw); i++ {
rvs = append(rvs, options.ResourceVersion)
}
}
resourceVersions = rvs
}
return newMultiWatch(mlw, resourceVersions, options)
}

// multiWatch abstracts multiple watch.Interface's, allowing them
// to be treated as a single watch.Interface.
type multiWatch struct {
result chan watch.Event
stopped chan struct{}
stoppers []func()
}

// newMultiWatch returns a new multiWatch or an error if one of the underlying
// Watch funcs errored. The length of []cache.ListerWatcher and []string must
// match.
func newMultiWatch(lws []cache.ListerWatcher, resourceVersions []string, options metav1.ListOptions) (*multiWatch, error) {
var (
result = make(chan watch.Event)
stopped = make(chan struct{})
stoppers []func()
wg sync.WaitGroup
)

wg.Add(len(lws))

for i, lw := range lws {
o := options.DeepCopy()
o.ResourceVersion = resourceVersions[i]
w, err := lw.Watch(*o)
if err != nil {
return nil, err
}

go func() {
defer wg.Done()

for {
event, ok := <-w.ResultChan()
if !ok {
return
}

select {
case result <- event:
case <-stopped:
return
}
}
}()
stoppers = append(stoppers, w.Stop)
}

// result chan must be closed,
// once all event sender goroutines exited.
go func() {
wg.Wait()
close(result)
}()

return &multiWatch{
result: result,
stoppers: stoppers,
stopped: stopped,
}, nil
}

// ResultChan implements the watch.Interface interface.
func (mw *multiWatch) ResultChan() <-chan watch.Event {
return mw.result
}

// Stop implements the watch.Interface interface.
// It stops all of the underlying watch.Interfaces and closes the backing chan.
// Can safely be called more than once.
func (mw *multiWatch) Stop() {
select {
case <-mw.stopped:
// nothing to do, we are already stopped
default:
for _, stop := range mw.stoppers {
stop()
}
close(mw.stopped)
}
return
}

// IsAllNamespaces checks if the given slice of namespaces
// contains only v1.NamespaceAll.
func IsAllNamespaces(namespaces []string) bool {
Expand Down
Loading

0 comments on commit e071c9f

Please sign in to comment.