Skip to content

Commit

Permalink
Add support for variable VKs in CRS config
Browse files Browse the repository at this point in the history
Add support for variable VKs in CRS config, while maintaining a cache
of discovered GVKs in the cluster, and updating it every 30s.

Signed-off-by: Pranshu Srivastava <rexagod@gmail.com>
  • Loading branch information
rexagod committed Apr 25, 2023
1 parent 168254d commit d21e9a1
Show file tree
Hide file tree
Showing 17 changed files with 813 additions and 57 deletions.
32 changes: 32 additions & 0 deletions docs/customresourcestate-metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -481,3 +481,35 @@ Examples:
# if the value to be matched is a number or boolean, the value is compared as a number or boolean
[status, conditions, "[value=66]", name] # status.conditions[1].name = "b"
```

### [Variable VKs](https://github.com/kubernetes/kube-state-metrics/pull/1851)

The CRS configuration allows you to monitor all versions and/or kinds that come under a group.
Taking the `Foo` object as reference the configuration below allows you to monitor all objects under all versions and all kinds that come under the `myteam.io` group.

```yaml
kind: CustomResourceStateMetrics
spec:
resources:
- groupVersionKind:
group: "myteam.io"
version: "*" # Set to `v1 to monitor all kinds under `myteam.io/v1`. Wildcard matches all versions based on the installed CRDs.
kind: "*" # TODO: Set to `Foo` to monitor all `Foo` objects under the `myteam.io` group (under all versions). Wildcard matches all installed kinds in the CR group.
metrics:
- name: "myobject_info"
help: "Foo Bar Baz"
each:
type: Info
info:
path: [metadata]
labelsFromPath:
object: [name]
namespace: [namespace]
```

The configuration above produces these metrics.

```yaml
kube_customresource_myobject_info{customresource_group="myteam.io",customresource_kind="Foo",customresource_version="v1",namespace="ns",object="foo"} 1
kube_customresource_myobject_info{customresource_group="myteam.io",customresource_kind="Bar",customresource_version="v1",namespace="ns",object="bar"} 1
```
136 changes: 136 additions & 0 deletions internal/discovery/discovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
Copyright 2023 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package discovery provides a discovery and resolution logic for GVKs.
package discovery

import (
"fmt"
"sort"
"time"

"k8s.io/klog/v2"
)

// Interval is the interval at which the discovery client fetches the list of objects.
const Interval = 30 * time.Second

// Multiplier is the multiplier for the discovery interval, and the upper bound till we check for resolved GVKs to be populated.
const Multiplier = 6

// Discover starts the discovery process, fetching all the objects that can be listed from the apiserver, every `DiscoveryInterval` seconds.
// ResolveGVK needs to be called after Discover to generate factories.
func (r *GVKMap) Discover() {
discoveryClient := r.DiscoveryClient
groupList, err := discoveryClient.ServerGroups()
if err != nil {
klog.Errorln("Failed to fetch server groups:", err)
}
arrayHasElement := func(array []string, element string) bool {
for _, v := range array {
if v == element {
return true
}
}
return false
}
gvkMap := make(map[string]map[string][]string)
for _, group := range groupList.Groups {
gvkMap[group.Name] = make(map[string][]string)
for _, version := range group.Versions {
gvkMap[group.Name][version.Version] = make([]string, 0)
resources, err := discoveryClient.ServerResourcesForGroupVersion(version.GroupVersion)
if err != nil {
klog.Errorln("Failed to fetch server resources for group version:", err)
}
for _, resource := range resources.APIResources {
gvkMap[group.Name][version.Version] = append(gvkMap[group.Name][version.Version], resource.Kind)
if r.OldMap != nil {
// A resource is "newly discovered" if the entire G-V-K tuple is new (after G** resolution).
isKindNew := !arrayHasElement(r.OldMap[group.Name][version.Version], resource.Kind)
if r.OldMap[group.Name][version.Version] == nil || isKindNew {
// Note: Reset this when the cache is invalidated.
r.M.Lock()
r.UpdateStores = true
r.M.Unlock()
}
}
}
sort.Strings(gvkMap[group.Name][version.Version])
}
}
r.OldMap = r.Map
r.Map = gvkMap
}

// ResolveGVK resolves the variable VKs to a GVK list.
func (r *GVKMap) ResolveGVK(gvk GroupVersionKind) (resolvedGVKs []GroupVersionKind, err error) {
g := gvk.Group
v := gvk.Version
k := gvk.Kind
if g == "" || g == "*" {
err = fmt.Errorf("group is required in the defined GVK %v", gvk)
return nil, err
}
hasVersion := v != "" && v != "*"
hasKind := k != "" && k != "*"
// No need to resolve, return.
if hasVersion && hasKind {
return []GroupVersionKind{
{Group: g, Version: v, Kind: k},
}, nil
}
if hasVersion && !hasKind {
kinds := r.Map[g][v]
for _, kind := range kinds {
if kind == "Scale" || kind == "Status" {
continue
}
resolvedGVKs = append(resolvedGVKs, GroupVersionKind{Group: g, Version: v, Kind: kind})
}
}
if !hasVersion && hasKind {
versions := r.Map[g]
for version, kinds := range versions {
for _, kind := range kinds {
if kind == "Scale" || kind == "Status" {
continue
}
if kind == k {
resolvedGVKs = append(resolvedGVKs, GroupVersionKind{Group: g, Version: version, Kind: k})
}
}
}
}
if !hasVersion && !hasKind {
versions := r.Map[g]
for version, kinds := range versions {
for _, kind := range kinds {
if kind == "Scale" || kind == "Status" {
continue
}
resolvedGVKs = append(resolvedGVKs, GroupVersionKind{Group: g, Version: version, Kind: kind})
}
}
}
// Remove any duplicates from the list.
m := map[GroupVersionKind]struct{}{}
for _, resolvedGVK := range resolvedGVKs {
m[resolvedGVK] = struct{}{}
}
resolvedGVKs = []GroupVersionKind{}
for key := range m {
resolvedGVKs = append(resolvedGVKs, key)
}
return
}
110 changes: 110 additions & 0 deletions internal/discovery/discovery_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
Copyright 2023 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package discovery

import (
"reflect"
"sort"
"testing"
)

func TestGVKMapsResolveGVK(t *testing.T) {
type testcase struct {
desc string
gvkmaps *GVKMap
gvk GroupVersionKind
got []GroupVersionKind
want []GroupVersionKind
}
testcases := []testcase{
{
desc: "variable version and kind",
gvkmaps: &GVKMap{
Map: map[string]map[string][]string{
"apps": {
"v1": {"Deployment", "StatefulSet"},
},
},
},
gvk: GroupVersionKind{Group: "apps", Version: "*", Kind: "*"},
want: []GroupVersionKind{
{Group: "apps", Version: "v1", Kind: "Deployment"},
{Group: "apps", Version: "v1", Kind: "StatefulSet"},
},
},
{
desc: "variable version",
gvkmaps: &GVKMap{
Map: map[string]map[string][]string{
"testgroup": {
"v1": {"TestObject1", "TestObject2"},
"v1alpha1": {"TestObject1"},
},
},
},
gvk: GroupVersionKind{Group: "testgroup", Version: "*", Kind: "TestObject1"},
want: []GroupVersionKind{
{Group: "testgroup", Version: "v1alpha1", Kind: "TestObject1"},
{Group: "testgroup", Version: "v1", Kind: "TestObject1"},
},
},
{
desc: "variable kind",
gvkmaps: &GVKMap{
Map: map[string]map[string][]string{
"testgroup": {
"v1": {"TestObject1", "TestObject2"},
"v1alpha1": {"TestObject1"},
},
},
},
gvk: GroupVersionKind{Group: "testgroup", Version: "v1", Kind: "*"},
want: []GroupVersionKind{
{Group: "testgroup", Version: "v1", Kind: "TestObject1"},
{Group: "testgroup", Version: "v1", Kind: "TestObject2"},
},
},
{
desc: "fixed version and kind",
gvkmaps: &GVKMap{
Map: map[string]map[string][]string{
"testgroup": {
"v1": {"TestObject1", "TestObject2"},
"v1alpha1": {"TestObject1"},
},
},
},
gvk: GroupVersionKind{Group: "testgroup", Version: "v1", Kind: "TestObject1"},
want: []GroupVersionKind{
{Group: "testgroup", Version: "v1", Kind: "TestObject1"},
},
},
}
for _, tc := range testcases {
got, err := tc.gvkmaps.ResolveGVK(tc.gvk)
if err != nil {
t.Errorf("testcase: %s: got error %v", tc.desc, err)
}
// Sort got and tc.want to ensure that the order of the elements.
sort.Slice(got, func(i, j int) bool {
return got[i].String() < got[j].String()
})
sort.Slice(tc.want, func(i, j int) bool {
return tc.want[i].String() < tc.want[j].String()
})
if !reflect.DeepEqual(got, tc.want) {
t.Errorf("testcase: %s: got %v, want %v", tc.desc, got, tc.want)
}
}
}
50 changes: 50 additions & 0 deletions internal/discovery/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
Copyright 2023 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package discovery

import (
"fmt"
"sync"

cgodiscovery "k8s.io/client-go/discovery"

"k8s.io/kube-state-metrics/v2/pkg/customresource"
)

// GVKMap provides a cache of the collected GVKs, along with helper utilities.
type GVKMap struct {
// Map is a cache of the collected GVKs.
Map map[string]map[string][]string
// OldMap is a cache of the collected GVKs from the previous discovery.
OldMap map[string]map[string][]string
// UpdateStores is a flag that is set to true if the GVKMap has changed.
UpdateStores bool
// M is a mutex to prevent any race-conditions while writing to UpdateStores.
M sync.RWMutex
// DiscoveryClient is the client used to discover the GVKs.
DiscoveryClient *cgodiscovery.DiscoveryClient
// NewGVKFactoriesFn is a function that returns a list of custom resource factories.
NewGVKFactoriesFn func() ([]customresource.RegistryFactory, error)
}

// GroupVersionKind is the Kubernetes group, version, and kind of a resource.
type GroupVersionKind struct {
Group string `yaml:"group" json:"group"`
Version string `yaml:"version" json:"version"`
Kind string `yaml:"kind" json:"kind"`
}

func (gvk GroupVersionKind) String() string {
return fmt.Sprintf("%s_%s_%s", gvk.Kind, gvk.Group, gvk.Version)
}
18 changes: 9 additions & 9 deletions internal/store/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,18 +90,18 @@ func (b *Builder) WithMetrics(r prometheus.Registerer) {

// WithEnabledResources sets the enabledResources property of a Builder.
func (b *Builder) WithEnabledResources(r []string) error {
for _, col := range r {
if !resourceExists(col) {
return fmt.Errorf("resource %s does not exist. Available resources: %s", col, strings.Join(availableResources(), ","))
for _, resource := range r {
if !resourceExists(resource) {
return fmt.Errorf("resource %s does not exist. Available resources: %s", resource, strings.Join(availableResources(), ","))
}
}

var copy []string
copy = append(copy, r...)
var sortedResources []string
sortedResources = append(sortedResources, r...)

sort.Strings(copy)
sort.Strings(sortedResources)

b.enabledResources = copy
b.enabledResources = sortedResources
return nil
}

Expand Down Expand Up @@ -181,12 +181,12 @@ func (b *Builder) WithCustomResourceStoreFactories(fs ...customresource.Registry
for i := range fs {
f := fs[i]
if _, ok := availableStores[f.Name()]; ok {
klog.InfoS("The internal resource store already exists and is overridden by a custom resource store with the same name, please make sure it meets your expectation", "registryName", f.Name())
klog.InfoS("Updating store", "resource", f.Name())
}
availableStores[f.Name()] = func(b *Builder) []cache.Store {
return b.buildCustomResourceStoresFunc(
f.Name(),
f.MetricFamilyGenerators(b.allowAnnotationsList[f.Name()], b.allowLabelsList[f.Name()]),
f.MetricFamilyGenerators(),
f.ExpectedType(),
f.ListWatch,
b.useAPIServerCache,
Expand Down
4 changes: 3 additions & 1 deletion internal/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,17 @@ import (
"gopkg.in/yaml.v3"
"k8s.io/klog/v2"

"k8s.io/kube-state-metrics/v2/internal/discovery"
"k8s.io/kube-state-metrics/v2/pkg/app"
"k8s.io/kube-state-metrics/v2/pkg/options"
)

// RunKubeStateMetricsWrapper is a wrapper around KSM, delegated to the root command.
func RunKubeStateMetricsWrapper(opts *options.Options) {
gvkMaps := &discovery.GVKMap{}

KSMRunOrDie := func(ctx context.Context) {
if err := app.RunKubeStateMetricsWrapper(ctx, opts); err != nil {
if err := app.RunKubeStateMetricsWrapper(ctx, opts, gvkMaps); err != nil {
klog.ErrorS(err, "Failed to run kube-state-metrics")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
Expand Down
Loading

0 comments on commit d21e9a1

Please sign in to comment.