From d21e9a155c84b266e60dd588ee2ecf33406a9bb4 Mon Sep 17 00:00:00 2001 From: Pranshu Srivastava Date: Sun, 22 Jan 2023 03:50:30 +0530 Subject: [PATCH] Add support for variable VKs in CRS config Add support for variable VKs in CRS config, while maintaining a cache of discovered GVKs in the cluster, and updating it every 30s. Signed-off-by: Pranshu Srivastava --- docs/customresourcestate-metrics.md | 32 +++ internal/discovery/discovery.go | 136 +++++++++ internal/discovery/discovery_test.go | 110 ++++++++ internal/discovery/types.go | 50 ++++ internal/store/builder.go | 18 +- internal/wrapper.go | 4 +- main.go | 2 - pkg/app/server.go | 159 ++++++++++- pkg/app/server_test.go | 2 +- pkg/builder/builder.go | 5 - pkg/builder/types/interfaces.go | 1 - pkg/customresource/registry_factory.go | 2 +- pkg/customresourcestate/config.go | 72 ++++- .../custom_resource_metrics.go | 2 +- tests/e2e.sh | 11 +- tests/e2e/discovery_test.go | 259 ++++++++++++++++++ tests/e2e/hot-reload_test.go | 5 +- 17 files changed, 813 insertions(+), 57 deletions(-) create mode 100644 internal/discovery/discovery.go create mode 100644 internal/discovery/discovery_test.go create mode 100644 internal/discovery/types.go create mode 100644 tests/e2e/discovery_test.go diff --git a/docs/customresourcestate-metrics.md b/docs/customresourcestate-metrics.md index 0ed35f494b..9adc234978 100644 --- a/docs/customresourcestate-metrics.md +++ b/docs/customresourcestate-metrics.md @@ -481,3 +481,35 @@ Examples: # if the value to be matched is a number or boolean, the value is compared as a number or boolean [status, conditions, "[value=66]", name] # status.conditions[1].name = "b" ``` + +### [Variable VKs](https://github.com/kubernetes/kube-state-metrics/pull/1851) + +The CRS configuration allows you to monitor all versions and/or kinds that come under a group. +Taking the `Foo` object as reference the configuration below allows you to monitor all objects under all versions and all kinds that come under the `myteam.io` group. + +```yaml +kind: CustomResourceStateMetrics +spec: + resources: + - groupVersionKind: + group: "myteam.io" + version: "*" # Set to `v1 to monitor all kinds under `myteam.io/v1`. Wildcard matches all versions based on the installed CRDs. + kind: "*" # TODO: Set to `Foo` to monitor all `Foo` objects under the `myteam.io` group (under all versions). Wildcard matches all installed kinds in the CR group. + metrics: + - name: "myobject_info" + help: "Foo Bar Baz" + each: + type: Info + info: + path: [metadata] + labelsFromPath: + object: [name] + namespace: [namespace] +``` + +The configuration above produces these metrics. + +```yaml +kube_customresource_myobject_info{customresource_group="myteam.io",customresource_kind="Foo",customresource_version="v1",namespace="ns",object="foo"} 1 +kube_customresource_myobject_info{customresource_group="myteam.io",customresource_kind="Bar",customresource_version="v1",namespace="ns",object="bar"} 1 +``` diff --git a/internal/discovery/discovery.go b/internal/discovery/discovery.go new file mode 100644 index 0000000000..39834e544a --- /dev/null +++ b/internal/discovery/discovery.go @@ -0,0 +1,136 @@ +/* +Copyright 2023 The Kubernetes Authors All rights reserved. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package discovery provides a discovery and resolution logic for GVKs. +package discovery + +import ( + "fmt" + "sort" + "time" + + "k8s.io/klog/v2" +) + +// Interval is the interval at which the discovery client fetches the list of objects. +const Interval = 30 * time.Second + +// Multiplier is the multiplier for the discovery interval, and the upper bound till we check for resolved GVKs to be populated. +const Multiplier = 6 + +// Discover starts the discovery process, fetching all the objects that can be listed from the apiserver, every `DiscoveryInterval` seconds. +// ResolveGVK needs to be called after Discover to generate factories. +func (r *GVKMap) Discover() { + discoveryClient := r.DiscoveryClient + groupList, err := discoveryClient.ServerGroups() + if err != nil { + klog.Errorln("Failed to fetch server groups:", err) + } + arrayHasElement := func(array []string, element string) bool { + for _, v := range array { + if v == element { + return true + } + } + return false + } + gvkMap := make(map[string]map[string][]string) + for _, group := range groupList.Groups { + gvkMap[group.Name] = make(map[string][]string) + for _, version := range group.Versions { + gvkMap[group.Name][version.Version] = make([]string, 0) + resources, err := discoveryClient.ServerResourcesForGroupVersion(version.GroupVersion) + if err != nil { + klog.Errorln("Failed to fetch server resources for group version:", err) + } + for _, resource := range resources.APIResources { + gvkMap[group.Name][version.Version] = append(gvkMap[group.Name][version.Version], resource.Kind) + if r.OldMap != nil { + // A resource is "newly discovered" if the entire G-V-K tuple is new (after G** resolution). + isKindNew := !arrayHasElement(r.OldMap[group.Name][version.Version], resource.Kind) + if r.OldMap[group.Name][version.Version] == nil || isKindNew { + // Note: Reset this when the cache is invalidated. + r.M.Lock() + r.UpdateStores = true + r.M.Unlock() + } + } + } + sort.Strings(gvkMap[group.Name][version.Version]) + } + } + r.OldMap = r.Map + r.Map = gvkMap +} + +// ResolveGVK resolves the variable VKs to a GVK list. +func (r *GVKMap) ResolveGVK(gvk GroupVersionKind) (resolvedGVKs []GroupVersionKind, err error) { + g := gvk.Group + v := gvk.Version + k := gvk.Kind + if g == "" || g == "*" { + err = fmt.Errorf("group is required in the defined GVK %v", gvk) + return nil, err + } + hasVersion := v != "" && v != "*" + hasKind := k != "" && k != "*" + // No need to resolve, return. + if hasVersion && hasKind { + return []GroupVersionKind{ + {Group: g, Version: v, Kind: k}, + }, nil + } + if hasVersion && !hasKind { + kinds := r.Map[g][v] + for _, kind := range kinds { + if kind == "Scale" || kind == "Status" { + continue + } + resolvedGVKs = append(resolvedGVKs, GroupVersionKind{Group: g, Version: v, Kind: kind}) + } + } + if !hasVersion && hasKind { + versions := r.Map[g] + for version, kinds := range versions { + for _, kind := range kinds { + if kind == "Scale" || kind == "Status" { + continue + } + if kind == k { + resolvedGVKs = append(resolvedGVKs, GroupVersionKind{Group: g, Version: version, Kind: k}) + } + } + } + } + if !hasVersion && !hasKind { + versions := r.Map[g] + for version, kinds := range versions { + for _, kind := range kinds { + if kind == "Scale" || kind == "Status" { + continue + } + resolvedGVKs = append(resolvedGVKs, GroupVersionKind{Group: g, Version: version, Kind: kind}) + } + } + } + // Remove any duplicates from the list. + m := map[GroupVersionKind]struct{}{} + for _, resolvedGVK := range resolvedGVKs { + m[resolvedGVK] = struct{}{} + } + resolvedGVKs = []GroupVersionKind{} + for key := range m { + resolvedGVKs = append(resolvedGVKs, key) + } + return +} diff --git a/internal/discovery/discovery_test.go b/internal/discovery/discovery_test.go new file mode 100644 index 0000000000..d3fcbada6b --- /dev/null +++ b/internal/discovery/discovery_test.go @@ -0,0 +1,110 @@ +/* +Copyright 2023 The Kubernetes Authors All rights reserved. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package discovery + +import ( + "reflect" + "sort" + "testing" +) + +func TestGVKMapsResolveGVK(t *testing.T) { + type testcase struct { + desc string + gvkmaps *GVKMap + gvk GroupVersionKind + got []GroupVersionKind + want []GroupVersionKind + } + testcases := []testcase{ + { + desc: "variable version and kind", + gvkmaps: &GVKMap{ + Map: map[string]map[string][]string{ + "apps": { + "v1": {"Deployment", "StatefulSet"}, + }, + }, + }, + gvk: GroupVersionKind{Group: "apps", Version: "*", Kind: "*"}, + want: []GroupVersionKind{ + {Group: "apps", Version: "v1", Kind: "Deployment"}, + {Group: "apps", Version: "v1", Kind: "StatefulSet"}, + }, + }, + { + desc: "variable version", + gvkmaps: &GVKMap{ + Map: map[string]map[string][]string{ + "testgroup": { + "v1": {"TestObject1", "TestObject2"}, + "v1alpha1": {"TestObject1"}, + }, + }, + }, + gvk: GroupVersionKind{Group: "testgroup", Version: "*", Kind: "TestObject1"}, + want: []GroupVersionKind{ + {Group: "testgroup", Version: "v1alpha1", Kind: "TestObject1"}, + {Group: "testgroup", Version: "v1", Kind: "TestObject1"}, + }, + }, + { + desc: "variable kind", + gvkmaps: &GVKMap{ + Map: map[string]map[string][]string{ + "testgroup": { + "v1": {"TestObject1", "TestObject2"}, + "v1alpha1": {"TestObject1"}, + }, + }, + }, + gvk: GroupVersionKind{Group: "testgroup", Version: "v1", Kind: "*"}, + want: []GroupVersionKind{ + {Group: "testgroup", Version: "v1", Kind: "TestObject1"}, + {Group: "testgroup", Version: "v1", Kind: "TestObject2"}, + }, + }, + { + desc: "fixed version and kind", + gvkmaps: &GVKMap{ + Map: map[string]map[string][]string{ + "testgroup": { + "v1": {"TestObject1", "TestObject2"}, + "v1alpha1": {"TestObject1"}, + }, + }, + }, + gvk: GroupVersionKind{Group: "testgroup", Version: "v1", Kind: "TestObject1"}, + want: []GroupVersionKind{ + {Group: "testgroup", Version: "v1", Kind: "TestObject1"}, + }, + }, + } + for _, tc := range testcases { + got, err := tc.gvkmaps.ResolveGVK(tc.gvk) + if err != nil { + t.Errorf("testcase: %s: got error %v", tc.desc, err) + } + // Sort got and tc.want to ensure that the order of the elements. + sort.Slice(got, func(i, j int) bool { + return got[i].String() < got[j].String() + }) + sort.Slice(tc.want, func(i, j int) bool { + return tc.want[i].String() < tc.want[j].String() + }) + if !reflect.DeepEqual(got, tc.want) { + t.Errorf("testcase: %s: got %v, want %v", tc.desc, got, tc.want) + } + } +} diff --git a/internal/discovery/types.go b/internal/discovery/types.go new file mode 100644 index 0000000000..429c279d8f --- /dev/null +++ b/internal/discovery/types.go @@ -0,0 +1,50 @@ +/* +Copyright 2023 The Kubernetes Authors All rights reserved. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package discovery + +import ( + "fmt" + "sync" + + cgodiscovery "k8s.io/client-go/discovery" + + "k8s.io/kube-state-metrics/v2/pkg/customresource" +) + +// GVKMap provides a cache of the collected GVKs, along with helper utilities. +type GVKMap struct { + // Map is a cache of the collected GVKs. + Map map[string]map[string][]string + // OldMap is a cache of the collected GVKs from the previous discovery. + OldMap map[string]map[string][]string + // UpdateStores is a flag that is set to true if the GVKMap has changed. + UpdateStores bool + // M is a mutex to prevent any race-conditions while writing to UpdateStores. + M sync.RWMutex + // DiscoveryClient is the client used to discover the GVKs. + DiscoveryClient *cgodiscovery.DiscoveryClient + // NewGVKFactoriesFn is a function that returns a list of custom resource factories. + NewGVKFactoriesFn func() ([]customresource.RegistryFactory, error) +} + +// GroupVersionKind is the Kubernetes group, version, and kind of a resource. +type GroupVersionKind struct { + Group string `yaml:"group" json:"group"` + Version string `yaml:"version" json:"version"` + Kind string `yaml:"kind" json:"kind"` +} + +func (gvk GroupVersionKind) String() string { + return fmt.Sprintf("%s_%s_%s", gvk.Kind, gvk.Group, gvk.Version) +} diff --git a/internal/store/builder.go b/internal/store/builder.go index 25fa1015b6..68b063130a 100644 --- a/internal/store/builder.go +++ b/internal/store/builder.go @@ -90,18 +90,18 @@ func (b *Builder) WithMetrics(r prometheus.Registerer) { // WithEnabledResources sets the enabledResources property of a Builder. func (b *Builder) WithEnabledResources(r []string) error { - for _, col := range r { - if !resourceExists(col) { - return fmt.Errorf("resource %s does not exist. Available resources: %s", col, strings.Join(availableResources(), ",")) + for _, resource := range r { + if !resourceExists(resource) { + return fmt.Errorf("resource %s does not exist. Available resources: %s", resource, strings.Join(availableResources(), ",")) } } - var copy []string - copy = append(copy, r...) + var sortedResources []string + sortedResources = append(sortedResources, r...) - sort.Strings(copy) + sort.Strings(sortedResources) - b.enabledResources = copy + b.enabledResources = sortedResources return nil } @@ -181,12 +181,12 @@ func (b *Builder) WithCustomResourceStoreFactories(fs ...customresource.Registry for i := range fs { f := fs[i] if _, ok := availableStores[f.Name()]; ok { - klog.InfoS("The internal resource store already exists and is overridden by a custom resource store with the same name, please make sure it meets your expectation", "registryName", f.Name()) + klog.InfoS("Updating store", "resource", f.Name()) } availableStores[f.Name()] = func(b *Builder) []cache.Store { return b.buildCustomResourceStoresFunc( f.Name(), - f.MetricFamilyGenerators(b.allowAnnotationsList[f.Name()], b.allowLabelsList[f.Name()]), + f.MetricFamilyGenerators(), f.ExpectedType(), f.ListWatch, b.useAPIServerCache, diff --git a/internal/wrapper.go b/internal/wrapper.go index 0a530d503a..1529822f96 100644 --- a/internal/wrapper.go +++ b/internal/wrapper.go @@ -28,15 +28,17 @@ import ( "gopkg.in/yaml.v3" "k8s.io/klog/v2" + "k8s.io/kube-state-metrics/v2/internal/discovery" "k8s.io/kube-state-metrics/v2/pkg/app" "k8s.io/kube-state-metrics/v2/pkg/options" ) // RunKubeStateMetricsWrapper is a wrapper around KSM, delegated to the root command. func RunKubeStateMetricsWrapper(opts *options.Options) { + gvkMaps := &discovery.GVKMap{} KSMRunOrDie := func(ctx context.Context) { - if err := app.RunKubeStateMetricsWrapper(ctx, opts); err != nil { + if err := app.RunKubeStateMetricsWrapper(ctx, opts, gvkMaps); err != nil { klog.ErrorS(err, "Failed to run kube-state-metrics") klog.FlushAndExit(klog.ExitFlushTimeout, 1) } diff --git a/main.go b/main.go index 04fdad5a68..5bdfaecaf8 100644 --- a/main.go +++ b/main.go @@ -31,11 +31,9 @@ func main() { internal.RunKubeStateMetricsWrapper(opts) } opts.AddFlags(cmd) - if err := opts.Parse(); err != nil { klog.FlushAndExit(klog.ExitFlushTimeout, 1) } - if err := opts.Validate(); err != nil { klog.ErrorS(err, "Validating options error") klog.FlushAndExit(klog.ExitFlushTimeout, 1) diff --git a/pkg/app/server.go b/pkg/app/server.go index 51d3837fdb..3ad35b797e 100644 --- a/pkg/app/server.go +++ b/pkg/app/server.go @@ -29,10 +29,9 @@ import ( "runtime" "strconv" "strings" + "sync" "time" - "gopkg.in/yaml.v3" - "github.com/oklog/run" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" @@ -40,11 +39,14 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/prometheus/common/version" "github.com/prometheus/exporter-toolkit/web" + "gopkg.in/yaml.v3" + cgodiscovery "k8s.io/client-go/discovery" clientset "k8s.io/client-go/kubernetes" _ "k8s.io/client-go/plugin/pkg/client/auth" // Initialize common client auth plugins. "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" + "k8s.io/kube-state-metrics/v2/internal/discovery" "k8s.io/kube-state-metrics/v2/internal/store" "k8s.io/kube-state-metrics/v2/pkg/allowdenylist" "k8s.io/kube-state-metrics/v2/pkg/customresource" @@ -75,8 +77,8 @@ func (pl promLogger) Log(v ...interface{}) error { } // RunKubeStateMetricsWrapper runs KSM with context cancellation. -func RunKubeStateMetricsWrapper(ctx context.Context, opts *options.Options) error { - err := RunKubeStateMetrics(ctx, opts) +func RunKubeStateMetricsWrapper(ctx context.Context, opts *options.Options, gvkMaps *discovery.GVKMap) error { + err := RunKubeStateMetrics(ctx, opts, gvkMaps) if ctx.Err() == context.Canceled { klog.Infoln("Restarting: kube-state-metrics, metrics will be reset") return nil @@ -87,11 +89,8 @@ func RunKubeStateMetricsWrapper(ctx context.Context, opts *options.Options) erro // RunKubeStateMetrics will build and run the kube-state-metrics. // Any out-of-tree custom resource metrics could be registered by newing a registry factory // which implements customresource.RegistryFactory and pass all factories into this function. -func RunKubeStateMetrics(ctx context.Context, opts *options.Options) error { +func RunKubeStateMetrics(ctx context.Context, opts *options.Options, gvkMaps *discovery.GVKMap) error { promLogger := promLogger{} - - storeBuilder := store.NewBuilder() - ksmMetricsRegistry := prometheus.NewRegistry() ksmMetricsRegistry.MustRegister(version.NewCollector("kube_state_metrics")) durationVec := promauto.With(ksmMetricsRegistry).NewHistogramVec( @@ -118,6 +117,7 @@ func RunKubeStateMetrics(ctx context.Context, opts *options.Options) error { Help: "Timestamp of the last successful configuration reload.", }, []string{"type", "filename"}) + storeBuilder := store.NewBuilder() storeBuilder.WithMetrics(ksmMetricsRegistry) got := options.GetConfigFile(*opts) @@ -145,6 +145,11 @@ func RunKubeStateMetrics(ctx context.Context, opts *options.Options) error { } } + kubeConfig, err := clientcmd.BuildConfigFromFlags(opts.Apiserver, opts.Kubeconfig) + if err != nil { + return fmt.Errorf("failed to build config from flags: %v", err) + } + // Loading custom resource state configuration from cli argument or config file config, err := resolveCustomResourceConfig(opts) if err != nil { @@ -153,13 +158,27 @@ func RunKubeStateMetrics(ctx context.Context, opts *options.Options) error { var factories []customresource.RegistryFactory + // A nil CRS config implies that we need to hold off on *ALL* CRS operations. if config != nil { - factories, err = customresourcestate.FromConfig(config) + gvkMaps.DiscoveryClient = cgodiscovery.NewDiscoveryClientForConfigOrDie(kubeConfig) + // Fill maps, which are required for the store cache invalidation checks to work (in discovery). + // We can only resolve GVKs once the discovery has taken place. Think of this as the expensive part of the 2-parter discovery-resolution cache mechanism. + gvkMaps.Discover() + // Create CR factories, containing the metric family and metadata. + gvkMaps.NewGVKFactoriesFn, err = customresourcestate.ResolveConfig(config, gvkMaps) if err != nil { - return fmt.Errorf("Parsing from Custom Resource State Metrics file failed: %v", err) + return fmt.Errorf("parsing from Custom Resource State Metrics file failed: %v", err) } + factories, err = gvkMaps.NewGVKFactoriesFn() + if err != nil { + return fmt.Errorf("creating Custom Resource State Metrics factories failed: %v", err) + } + + // Create stores for the factories and add them to the registry. + storeBuilder.WithCustomResourceStoreFactories(factories...) + // `availableStores` is injected with the CR stores and knows how to fetch them at this point. + // We need to do the same thing for any newly added CRs from this point on. } - storeBuilder.WithCustomResourceStoreFactories(factories...) if opts.CustomResourceConfigFile != "" { crcFile, err := os.ReadFile(filepath.Clean(opts.CustomResourceConfigFile)) @@ -233,8 +252,9 @@ func RunKubeStateMetrics(ctx context.Context, opts *options.Options) error { storeBuilder.WithUsingAPIServerCache(opts.UseAPIServerCache) storeBuilder.WithGenerateStoresFunc(storeBuilder.DefaultGenerateStoresFunc()) - storeBuilder.WithGenerateCustomResourceStoresFunc(storeBuilder.DefaultGenerateCustomResourceStoresFunc()) - + if config != nil { + storeBuilder.WithGenerateCustomResourceStoresFunc(storeBuilder.DefaultGenerateCustomResourceStoresFunc()) + } proc.StartReaper() kubeClient, customResourceClients, err := createKubeClient(opts.Apiserver, opts.Kubeconfig, factories...) @@ -274,6 +294,21 @@ func RunKubeStateMetrics(ctx context.Context, opts *options.Options) error { tlsConfig := opts.TLSConfig + // Ensure cache prerequisites are set up only once. + var discoverer sync.Once + // Don't start discovery if no CRS config was supplied. If one is supplied after starting KSM, it will be picked up after the reboot. + if config != nil { + discoverer.Do(func() { + periodicDiscoveryForAppendCRStores( + ctx, + opts, + gvkMaps, + storeBuilder, + m, + ) + }) + } + telemetryMux := buildTelemetryServer(ksmMetricsRegistry) telemetryListenAddress := net.JoinHostPort(opts.TelemetryHost, strconv.Itoa(opts.TelemetryPort)) telemetryServer := http.Server{ @@ -289,8 +324,8 @@ func RunKubeStateMetrics(ctx context.Context, opts *options.Options) error { metricsServerListenAddress := net.JoinHostPort(opts.Host, strconv.Itoa(opts.Port)) metricsServer := http.Server{ Handler: metricsMux, - ReadHeaderTimeout: 5 * time.Second} - + ReadHeaderTimeout: 5 * time.Second, + } metricsFlags := web.FlagConfig{ WebListenAddresses: &[]string{metricsServerListenAddress}, WebSystemdSocket: new(bool), @@ -323,10 +358,104 @@ func RunKubeStateMetrics(ctx context.Context, opts *options.Options) error { if err := g.Run(); err != nil { return fmt.Errorf("run server group error: %v", err) } + klog.InfoS("Exited") return nil } +func periodicDiscoveryForAppendCRStores( + ctx context.Context, + opts *options.Options, + gvkMaps *discovery.GVKMap, + storeBuilder *store.Builder, + m *metricshandler.MetricsHandler, +) func() { + // Start an independent goroutine to periodically update `gvkMaps`. + // The interval at which the discovery client will fetch the list of objects. + // TODO: Expose as an option? + t := time.NewTicker(discovery.Interval) + olderContext, olderCancel := context.WithCancel(ctx) + // Persist list of all discovered CRDs. + var enabledCustomResources []string + // Prevent context leak (kill the last metric handler instance). + defer olderCancel() + updateFn := func(prefix string) { + klog.Infof("%s: updating stores", prefix) + // Generate config for when none of the CRDs were present during callers' initialization. + // This is to ensure that the config does not throw an EOF error due to coming up empty. + customConfig, err := resolveCustomResourceConfig(opts) + if err != nil { + klog.Errorf("%s: failed to update custom resource stores", prefix) + } + // Fetch the discovered factories. + gvkMaps.NewGVKFactoriesFn, err = customresourcestate.ResolveConfig(customConfig, gvkMaps) + if err != nil { + klog.Errorf("%s: failed to update custom resource stores: %v", prefix, err) + } + // Get families for discovered factories. + customFactories, err := gvkMaps.NewGVKFactoriesFn() + if err != nil { + klog.Errorf("%s: failed to update custom resource stores: %v", prefix, err) + } + // Update the list of enabled custom resources. + for _, factory := range customFactories { + enabledCustomResources = append(enabledCustomResources, factory.Name()) + } + // Create clients for discovered factories. + _, discoveredCustomResourceClients, err := createKubeClient(opts.Apiserver, opts.Kubeconfig, customFactories...) + if err != nil { + klog.Errorf("%s: failed to update custom resource stores: %v", prefix, err) + } + // Update the store builder with the new clients. + storeBuilder.WithCustomResourceClients(discoveredCustomResourceClients) + // Inject families' constructors to the existing set of stores. + storeBuilder.WithCustomResourceStoreFactories(customFactories...) + // Update the store builder with the new custom resources. + if err := storeBuilder.WithEnabledResources(enabledCustomResources); err != nil { + klog.Errorf("%s: failed to update custom resource stores: %v", prefix, err) + } + // Configure the generation function for the custom resource stores. + storeBuilder.WithGenerateCustomResourceStoresFunc(storeBuilder.DefaultGenerateCustomResourceStoresFunc()) + // Reset the flag, if there were no errors. Else, we'll try again on the next tick. + gvkMaps.M.Lock() + gvkMaps.UpdateStores = false + gvkMaps.M.Unlock() + // Run the metrics handler with updated configs. + olderCancel() + olderContext, olderCancel = context.WithCancel(ctx) + err = m.Run(olderContext) + if err != nil { + klog.Errorf("%s: failed to run metrics handler: %v", prefix, err) + t.Stop() + return + } + } + go func() { + for range t.C { + select { + case <-ctx.Done(): + t.Stop() + return + default: + // Prefix to be used for all log messages. + prefix := "discovery ticker" + // Subsequent discoveries will only occur if a discovery was previously made. + klog.Infof("%s: discovering resources", prefix) + // TODO: This takes (at least) a good minute, because `ServerGroups` and `ServerResourcesForGroupVersion` are both synchronous. + gvkMaps.Discover() + gvkMaps.M.RLock() + shouldUpdate := gvkMaps.UpdateStores + gvkMaps.M.RUnlock() + if shouldUpdate { + klog.Infof("%s: updating stores", prefix) + updateFn(prefix) + } + } + } + }() + return olderCancel +} + func createKubeClient(apiserver string, kubeconfig string, factories ...customresource.RegistryFactory) (clientset.Interface, map[string]interface{}, error) { config, err := clientcmd.BuildConfigFromFlags(apiserver, kubeconfig) if err != nil { diff --git a/pkg/app/server_test.go b/pkg/app/server_test.go index 61fa598057..3611b925ed 100644 --- a/pkg/app/server_test.go +++ b/pkg/app/server_test.go @@ -891,7 +891,7 @@ func (f *fooFactory) CreateClient(cfg *rest.Config) (interface{}, error) { return fooClient, nil } -func (f *fooFactory) MetricFamilyGenerators(allowAnnotationsList, allowLabelsList []string) []generator.FamilyGenerator { +func (f *fooFactory) MetricFamilyGenerators() []generator.FamilyGenerator { return []generator.FamilyGenerator{ *generator.NewFamilyGeneratorWithStability( "kube_foo_spec_replicas", diff --git a/pkg/builder/builder.go b/pkg/builder/builder.go index 7952ff3f99..e60a6e19e1 100644 --- a/pkg/builder/builder.go +++ b/pkg/builder/builder.go @@ -115,11 +115,6 @@ func (b *Builder) WithGenerateStoresFunc(f ksmtypes.BuildStoresFunc) { b.internal.WithGenerateStoresFunc(f) } -// WithGenerateCustomResourceStoresFunc configures a custom generate custom resource store function -func (b *Builder) WithGenerateCustomResourceStoresFunc(f ksmtypes.BuildCustomResourceStoresFunc) { - b.internal.WithGenerateCustomResourceStoresFunc(f) -} - // DefaultGenerateStoresFunc returns default buildStore function func (b *Builder) DefaultGenerateStoresFunc() ksmtypes.BuildStoresFunc { return b.internal.DefaultGenerateStoresFunc() diff --git a/pkg/builder/types/interfaces.go b/pkg/builder/types/interfaces.go index 41c71abc28..314054823e 100644 --- a/pkg/builder/types/interfaces.go +++ b/pkg/builder/types/interfaces.go @@ -45,7 +45,6 @@ type BuilderInterface interface { WithAllowAnnotations(a map[string][]string) WithAllowLabels(l map[string][]string) error WithGenerateStoresFunc(f BuildStoresFunc) - WithGenerateCustomResourceStoresFunc(f BuildCustomResourceStoresFunc) DefaultGenerateStoresFunc() BuildStoresFunc DefaultGenerateCustomResourceStoresFunc() BuildCustomResourceStoresFunc WithCustomResourceStoreFactories(fs ...customresource.RegistryFactory) diff --git a/pkg/customresource/registry_factory.go b/pkg/customresource/registry_factory.go index f69620248b..a14d87dc1b 100644 --- a/pkg/customresource/registry_factory.go +++ b/pkg/customresource/registry_factory.go @@ -86,7 +86,7 @@ type RegistryFactory interface { // ), // } // } - MetricFamilyGenerators(allowAnnotationsList, allowLabelsList []string) []generator.FamilyGenerator + MetricFamilyGenerators() []generator.FamilyGenerator // ExpectedType returns a pointer to an empty custom resource object. // diff --git a/pkg/customresourcestate/config.go b/pkg/customresourcestate/config.go index 4ec21f0560..5ab718fed8 100644 --- a/pkg/customresourcestate/config.go +++ b/pkg/customresourcestate/config.go @@ -19,10 +19,12 @@ package customresourcestate import ( "fmt" "strings" + "time" "github.com/gobuffalo/flect" "k8s.io/klog/v2" + "k8s.io/kube-state-metrics/v2/internal/discovery" "k8s.io/kube-state-metrics/v2/pkg/customresource" ) @@ -89,6 +91,10 @@ type GroupVersionKind struct { Kind string `yaml:"kind" json:"kind"` } +func (gvk GroupVersionKind) String() string { + return fmt.Sprintf("%s_%s_%s", gvk.Group, gvk.Version, gvk.Kind) +} + // Labels is common configuration of labels to add to metrics. type Labels struct { // CommonLabels are added to all metrics. @@ -153,29 +159,67 @@ type Metric struct { Info *MetricInfo `yaml:"info" json:"info"` } -// ConfigDecoder is for use with FromConfig. +// ConfigDecoder is for use with ResolveConfig. type ConfigDecoder interface { Decode(v interface{}) (err error) } -// FromConfig decodes a configuration source into a slice of customresource.RegistryFactory that are ready to use. -func FromConfig(decoder ConfigDecoder) ([]customresource.RegistryFactory, error) { - var crconfig Metrics - var factories []customresource.RegistryFactory +// ResolveConfig decodes a configuration source into a slice of `customresource.RegistryFactory` that are ready to use. +func ResolveConfig(decoder ConfigDecoder, gvkMaps *discovery.GVKMap) (func() (factories []customresource.RegistryFactory, err error), error) { + var customResourceConfig Metrics factoriesIndex := map[string]bool{} - if err := decoder.Decode(&crconfig); err != nil { + if err := decoder.Decode(&customResourceConfig); err != nil { return nil, fmt.Errorf("failed to parse Custom Resource State metrics: %w", err) } - for _, resource := range crconfig.Spec.Resources { - factory, err := NewCustomResourceMetrics(resource) + // Cannot proceed ahead without an instantiated GVKMap object (only for the + // first time, otherwise the cache will come up empty on the first run (in the + // <`discovery.discoveryInterval` timespan). + // Set an upper bound same as 10x of the discovery interval for the first fetch check. + mapPopulationTimer := time.NewTimer(discovery.Multiplier * discovery.Interval) + for { + didInit := false + select { + case <-mapPopulationTimer.C: + return nil, fmt.Errorf("failed to populate GVKMap cache") + default: + if gvkMaps.Map != nil { + didInit = true + } + } + if didInit { + break + } + } + var resources []Resource + for _, resource := range customResourceConfig.Spec.Resources { + gotGVKs, err := gvkMaps.ResolveGVK(discovery.GroupVersionKind(resource.GroupVersionKind)) if err != nil { - return nil, fmt.Errorf("failed to create metrics factory for %s: %w", resource.GroupVersionKind, err) + return nil, fmt.Errorf("failed to resolve GroupVersionKind %s: %w", resource.GroupVersionKind, err) + } + if gotGVKs != nil { + var newResourceList []Resource + for _, gvk := range gotGVKs { + newResource := resource + newResource.GroupVersionKind = GroupVersionKind(gvk) + newResourceList = append(newResourceList, newResource) + } + resources = append(resources, newResourceList...) } - if _, ok := factoriesIndex[factory.Name()]; ok { - return nil, fmt.Errorf("found multiple custom resource configurations for the same resource %s", factory.Name()) + } + customResourceConfig.Spec.Resources = resources + fn := func() (factories []customresource.RegistryFactory, err error) { + for _, resource := range customResourceConfig.Spec.Resources { + factory, err := NewCustomResourceMetrics(resource) + if err != nil { + return nil, fmt.Errorf("failed to create metrics factory for %s: %w", resource.GroupVersionKind, err) + } + if _, ok := factoriesIndex[factory.Name()]; ok { + klog.ErrorS(fmt.Errorf("found multiple custom resource configurations for the same resource %s", factory.Name()), "using the latest custom resource configuration") + } + factoriesIndex[factory.Name()] = true + factories = append(factories, factory) } - factoriesIndex[factory.Name()] = true - factories = append(factories, factory) + return factories, nil } - return factories, nil + return fn, nil } diff --git a/pkg/customresourcestate/custom_resource_metrics.go b/pkg/customresourcestate/custom_resource_metrics.go index da45a7533a..9ca2700835 100644 --- a/pkg/customresourcestate/custom_resource_metrics.go +++ b/pkg/customresourcestate/custom_resource_metrics.go @@ -75,7 +75,7 @@ func (s customResourceMetrics) CreateClient(cfg *rest.Config) (interface{}, erro }), nil } -func (s customResourceMetrics) MetricFamilyGenerators(_, _ []string) (result []generator.FamilyGenerator) { +func (s customResourceMetrics) MetricFamilyGenerators() (result []generator.FamilyGenerator) { klog.InfoS("Custom resource state added metrics", "familyNames", s.names()) for _, f := range s.Families { result = append(result, famGen(f)) diff --git a/tests/e2e.sh b/tests/e2e.sh index 2adb31487e..9364340d8f 100755 --- a/tests/e2e.sh +++ b/tests/e2e.sh @@ -179,8 +179,6 @@ echo "make requests to kube-state-metrics" set +e - - kube_state_metrics_up set -e @@ -190,8 +188,6 @@ echo "start e2e test for kube-state-metrics" KSM_HTTP_METRICS_URL='http://localhost:8001/api/v1/namespaces/kube-system/services/kube-state-metrics:http-metrics/proxy' KSM_TELEMETRY_URL='http://localhost:8001/api/v1/namespaces/kube-system/services/kube-state-metrics:telemetry/proxy' go test -v ./tests/e2e/main_test.go --ksm-http-metrics-url=${KSM_HTTP_METRICS_URL} --ksm-telemetry-url=${KSM_TELEMETRY_URL} -go test -v ./tests/e2e/hot-reload_test.go - # TODO: re-implement the following test cases in Go with the goal of removing this file. echo "access kube-state-metrics metrics endpoint" @@ -206,6 +202,13 @@ fi sleep 33 klog_err=E$(date +%m%d) echo "check for errors in logs" + +echo "running discovery tests..." +go test -race -v ./tests/e2e/discovery_test.go + +echo "running hot-reload tests..." +go test -v ./tests/e2e/hot-reload_test.go + output_logs=$(kubectl --namespace=kube-system logs deployment/kube-state-metrics kube-state-metrics) if echo "${output_logs}" | grep "^${klog_err}"; then echo "" diff --git a/tests/e2e/discovery_test.go b/tests/e2e/discovery_test.go new file mode 100644 index 0000000000..867e28edd1 --- /dev/null +++ b/tests/e2e/discovery_test.go @@ -0,0 +1,259 @@ +/* +Copyright 2022 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "net" + "os" + "os/exec" + "strings" + "testing" + "time" + + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/klog/v2" + + "k8s.io/kube-state-metrics/v2/internal" + "k8s.io/kube-state-metrics/v2/internal/discovery" + "k8s.io/kube-state-metrics/v2/pkg/options" +) + +func TestVariableVKsDiscoveryAndResolution(t *testing.T) { + + // Initialise options. + opts := options.NewOptions() + cmd := options.InitCommand + opts.AddFlags(cmd) + klog.Infoln("initialised options") + + // Create testdata. + crConfigFile, err := os.CreateTemp("", "cr-config.yaml") + if err != nil { + t.Fatal(err) + } + crdFile, err := os.CreateTemp("", "crd.yaml") + if err != nil { + t.Fatal(err) + } + crFile, err := os.CreateTemp("", "cr.yaml") + if err != nil { + t.Fatal(err) + } + klog.Infoln("created testdata") + + // Delete artefacts. + defer func() { + err := os.Remove(opts.CustomResourceConfigFile) + if err != nil { + t.Fatalf("failed to remove crConfig file: %v", err) + } + err = os.Remove(crdFile.Name()) + if err != nil { + t.Fatalf("failed to remove crConfig file: %v", err) + } + klog.Infoln("deleted artefacts") + }() + + // Populate options, and parse them. + opts.CustomResourceConfigFile = crConfigFile.Name() + opts.Kubeconfig = os.Getenv("HOME") + "/.kube/config" + if err := opts.Parse(); err != nil { + t.Fatalf("failed to parse options: %v", err) + } + klog.Infoln("parsed options") + + // Write to the config file. + crConfig := getCRConfig() + err = os.WriteFile(opts.CustomResourceConfigFile, []byte(crConfig), 0600 /* rw------- */) + if err != nil { + t.Fatalf("cannot write to config file: %v", err) + } + klog.Infoln("populated cr config file") + + // Make the process asynchronous. + go internal.RunKubeStateMetricsWrapper(opts) + klog.Infoln("started KSM") + + // Wait for port 8080 to come up. + err = wait.PollImmediate(1*time.Second, 20*time.Second, func() (bool, error) { + conn, err := net.Dial("tcp", "localhost:8080") + if err != nil { + return false, nil + } + err = conn.Close() + if err != nil { + return false, err + } + return true, nil + }) + if err != nil { + t.Fatalf("failed while waiting for port 8080 to come up: %v", err) + } + klog.Infoln("port 8080 up") + + // Create CRD and CR files. + crd := getCRD() + cr := getCR() + err = os.WriteFile(crdFile.Name(), []byte(crd), 0600 /* rw------- */) + if err != nil { + t.Fatalf("cannot write to crd file: %v", err) + } + err = os.WriteFile(crFile.Name(), []byte(cr), 0600 /* rw------- */) + if err != nil { + t.Fatalf("cannot write to cr file: %v", err) + } + klog.Infoln("created CR and CRD manifests") + + // Apply CRD and CR to the cluster. + err = exec.Command("kubectl", "apply", "-f", crdFile.Name()).Run() //nolint:gosec + if err != nil { + t.Fatalf("failed to apply crd: %v", err) + } + err = exec.Command("kubectl", "apply", "-f", crFile.Name()).Run() //nolint:gosec + if err != nil { + t.Fatalf("failed to apply cr: %v", err) + } + klog.Infoln("applied CR and CRD manifests") + + // Wait for the metric to be available. + ch := make(chan bool, 1) + klog.Infoln("waiting for metrics to be available") + err = wait.PollImmediate(1*time.Second, discovery.Multiplier*discovery.Interval, func() (bool, error) { + out, err := exec.Command("curl", "localhost:8080/metrics").Output() + if err != nil { + return false, err + } + if string(out) == "" { + return false, nil + } + // Note the "{" below. This is to ensure that the metric is not a comment. + if strings.Contains(string(out), "kube_customresource_test_metric{") { + klog.Infoln(string(out)) + // Signal the process to exit, since we know the metrics are being generated as expected. + ch <- true + return true, nil + } + return false, nil + }) + if err != nil { + t.Fatalf("failed while waiting for metrics to be available: %v", err) + } + + // Wait for process to exit. + select { + case <-ch: + t.Log("test passed successfully") + case <-time.After(discovery.Multiplier * discovery.Interval): + t.Fatal("timed out waiting for test to pass, check the logs for more info") + } +} + +func getCRD() string { + return ` +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: myplatforms.contoso.com +spec: + group: contoso.com + names: + plural: myplatforms + singular: myplatform + kind: MyPlatform + shortNames: + - myp + scope: Namespaced + versions: + - name: v1alpha1 + served: true + storage: true + schema: + openAPIV3Schema: + type: object + properties: + spec: + type: object + properties: + appId: + type: string + language: + type: string + enum: + - csharp + - python + - go + os: + type: string + enum: + - windows + - linux + instanceSize: + type: string + enum: + - small + - medium + - large + environmentType: + type: string + enum: + - dev + - test + - prod + replicas: + type: integer + minimum: 1 + required: ["appId", "language", "environmentType"] + required: ["spec"] +` +} + +func getCRConfig() string { + return ` +kind: CustomResourceStateMetrics +spec: + resources: + - groupVersionKind: + group: "contoso.com" + version: "*" + kind: "*" + metrics: + - name: "test_metric" + help: "foo baz" + each: + type: Info + info: + path: [metadata] + labelsFromPath: + name: [name] +` +} + +func getCR() string { + return ` +apiVersion: contoso.com/v1alpha1 +kind: MyPlatform +metadata: + name: test-dotnet-app +spec: + appId: testdotnetapp + language: csharp + os: linux + instanceSize: small + environmentType: dev + replicas: 3 +` +} diff --git a/tests/e2e/hot-reload_test.go b/tests/e2e/hot-reload_test.go index c5d099b58d..aa0a7535de 100644 --- a/tests/e2e/hot-reload_test.go +++ b/tests/e2e/hot-reload_test.go @@ -97,15 +97,14 @@ func TestConfigHotReload(t *testing.T) { if err != nil { return false, err } + // Indicate that the test has passed. + ch <- true return true, nil }) if err != nil { t.Fatalf("failed to wait for port 8080 to come up after restarting the process: %v", err) } - // Indicate that the test has passed. - ch <- true - // Wait for process to exit. select { case <-ch: