Skip to content

Commit

Permalink
fixup! fixup! fixup! fixup! Add support for variable VKs in CRS config
Browse files Browse the repository at this point in the history
  • Loading branch information
rexagod committed May 4, 2023
1 parent 0fd97f5 commit 9602762
Show file tree
Hide file tree
Showing 6 changed files with 216 additions and 153 deletions.
113 changes: 108 additions & 5 deletions internal/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ import (
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

"k8s.io/kube-state-metrics/v2/internal/store"
"k8s.io/kube-state-metrics/v2/pkg/customresource"
"k8s.io/kube-state-metrics/v2/pkg/metricshandler"
"k8s.io/kube-state-metrics/v2/pkg/options"
"k8s.io/kube-state-metrics/v2/pkg/util"
)

// PopulateTimeout is the timeout on populating the cache for the first time.
Expand All @@ -47,18 +53,33 @@ func (r *CRDiscoverer) StartDiscovery(ctx context.Context, config *rest.Config)
stopper := make(chan struct{})
_, err := informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
for _, version := range obj.(*unstructured.Unstructured).Object["spec"].(map[string]interface{})["versions"].([]interface{}) {
objSpec := obj.(*unstructured.Unstructured).Object["spec"].(map[string]interface{})
for _, version := range objSpec["versions"].([]interface{}) {
gotGVK := schema.GroupVersionKind{
Group: obj.(*unstructured.Unstructured).Object["spec"].(map[string]interface{})["group"].(string),
Group: objSpec["group"].(string),
Version: version.(map[string]interface{})["name"].(string),
Kind: obj.(*unstructured.Unstructured).Object["spec"].(map[string]interface{})["names"].(map[string]interface{})["kind"].(string),
Kind: objSpec["names"].(map[string]interface{})["kind"].(string),
}
r.AppendToMap(gotGVK)
r.SafeWrite(func() {
r.WasUpdated = true
})
}
},
DeleteFunc: func(obj interface{}) {
objSpec := obj.(*unstructured.Unstructured).Object["spec"].(map[string]interface{})
for _, version := range objSpec["versions"].([]interface{}) {
gotGVK := schema.GroupVersionKind{
Group: objSpec["group"].(string),
Version: version.(map[string]interface{})["name"].(string),
Kind: objSpec["names"].(map[string]interface{})["kind"].(string),
}
r.RemoveFromMap(gotGVK)
r.SafeWrite(func() {
r.WasUpdated = true
})
}
},
})
if err != nil {
return err
Expand All @@ -81,8 +102,7 @@ func (r *CRDiscoverer) ResolveGVK(gvk schema.GroupVersionKind) (resolvedGVKs []s
v := gvk.Version
k := gvk.Kind
if g == "" || g == "*" {
err = fmt.Errorf("group is required in the defined GVK %v", gvk)
return nil, err
return nil, fmt.Errorf("group is required in the defined GVK %v", gvk)
}
hasVersion := v != "" && v != "*"
hasKind := k != "" && k != "*"
Expand Down Expand Up @@ -118,3 +138,86 @@ func (r *CRDiscoverer) ResolveGVK(gvk schema.GroupVersionKind) (resolvedGVKs []s
}
return
}

// PollForCacheUpdates polls the cache for updates and updates the stores accordingly.
func (r *CRDiscoverer) PollForCacheUpdates(
ctx context.Context,
opts *options.Options,
storeBuilder *store.Builder,
m *metricshandler.MetricsHandler,
factoryGenerator func() ([]customresource.RegistryFactory, error),
) {
// The interval at which we will check the cache for updates.
t := time.NewTicker(Interval)
// Track previous context to allow refreshing cache.
olderContext, olderCancel := context.WithCancel(ctx)
// Prevent context leak (kill the last metric handler instance).
defer olderCancel()
generateMetrics := func() {
// Get families for discovered factories.
customFactories, err := factoryGenerator()
if err != nil {
klog.Errorf("failed to update custom resource stores: %v", err)
}
// Update the list of enabled custom resources.
var enabledCustomResources []string
for _, factory := range customFactories {
enabledCustomResources = append(enabledCustomResources, factory.Name())
}
// Create clients for discovered factories.
_, discoveredCustomResourceClients, err := util.CreateKubeClient(opts.Apiserver, opts.Kubeconfig, customFactories...)
if err != nil {
klog.Errorf("failed to update custom resource stores: %v", err)
}
// Update the store builder with the new clients.
storeBuilder.WithCustomResourceClients(discoveredCustomResourceClients)
// Inject families' constructors to the existing set of stores.
storeBuilder.WithCustomResourceStoreFactories(customFactories...)
// Update the store builder with the new custom resources.
if err := storeBuilder.WithEnabledResources(enabledCustomResources); err != nil {
klog.Errorf("failed to update custom resource stores: %v", err)
}
// Configure the generation function for the custom resource stores.
storeBuilder.WithGenerateCustomResourceStoresFunc(storeBuilder.DefaultGenerateCustomResourceStoresFunc())
// Reset the flag, if there were no errors. Else, we'll try again on the next tick.
// Keep retrying if there were errors.
r.SafeWrite(func() {
r.WasUpdated = false
})
// Run the metrics handler with updated configs.
olderContext, olderCancel = context.WithCancel(ctx)
go func() {
// Blocks indefinitely until the unbuffered context is cancelled to serve metrics for that duration.
err = m.Run(olderContext)
if err != nil {
// Check if context was cancelled.
select {
case <-olderContext.Done():
// Context cancelled, don't really need to log this though.
default:
klog.Errorf("%s: failed to run metrics handler: %v", err)
}
}
}()
}
go func() {
for range t.C {
select {
case <-ctx.Done():
klog.InfoS("context cancelled")
t.Stop()
return
default:
// Check if cache has been updated.
shouldGenerateMetrics := false
r.SafeRead(func() {
shouldGenerateMetrics = r.WasUpdated
})
if shouldGenerateMetrics {
olderCancel()
generateMetrics()
}
}
}
}()
}
29 changes: 25 additions & 4 deletions internal/discovery/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,10 @@ import (
"sync"

"k8s.io/apimachinery/pkg/runtime/schema"

"k8s.io/kube-state-metrics/v2/pkg/customresource"
)

// CRDiscoverer provides a cache of the collected GVKs, along with helper utilities.
type CRDiscoverer struct {
// FactoryGenerator is a function that returns a list of custom resource factories.
FactoryGenerator func() ([]customresource.RegistryFactory, error)
// m is a mutex to protect the cache.
m sync.RWMutex
// Map is a cache of the collected GVKs.
Expand Down Expand Up @@ -62,3 +58,28 @@ func (r *CRDiscoverer) AppendToMap(gvks ...schema.GroupVersionKind) {
r.Map[gvk.Group][gvk.Version] = append(r.Map[gvk.Group][gvk.Version], gvk.Kind)
}
}

// RemoveFromMap removes the given GVKs from the cache.
func (r *CRDiscoverer) RemoveFromMap(gvks ...schema.GroupVersionKind) {
for _, gvk := range gvks {
if _, ok := r.Map[gvk.Group]; !ok {
continue
}
if _, ok := r.Map[gvk.Group][gvk.Version]; !ok {
continue
}
for i, kind := range r.Map[gvk.Group][gvk.Version] {
if kind == gvk.Kind {
if len(r.Map[gvk.Group][gvk.Version]) == 1 {
delete(r.Map[gvk.Group], gvk.Version)
if len(r.Map[gvk.Group]) == 0 {
delete(r.Map, gvk.Group)
}
break
}
r.Map[gvk.Group][gvk.Version] = append(r.Map[gvk.Group][gvk.Version][:i], r.Map[gvk.Group][gvk.Version][i+1:]...)
break
}
}
}
}
4 changes: 1 addition & 3 deletions internal/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,15 @@ import (
"gopkg.in/yaml.v3"
"k8s.io/klog/v2"

"k8s.io/kube-state-metrics/v2/internal/discovery"
"k8s.io/kube-state-metrics/v2/pkg/app"
"k8s.io/kube-state-metrics/v2/pkg/options"
)

// RunKubeStateMetricsWrapper is a wrapper around KSM, delegated to the root command.
func RunKubeStateMetricsWrapper(opts *options.Options) {
discovererInstance := &discovery.CRDiscoverer{}

KSMRunOrDie := func(ctx context.Context) {
if err := app.RunKubeStateMetricsWrapper(ctx, opts, discovererInstance); err != nil {
if err := app.RunKubeStateMetricsWrapper(ctx, opts); err != nil {
klog.ErrorS(err, "Failed to run kube-state-metrics")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
Expand Down
Loading

0 comments on commit 9602762

Please sign in to comment.