Skip to content

Commit

Permalink
Merge pull request #1928 from mrueg/hotreloading-crs
Browse files Browse the repository at this point in the history
Reload CustomResourceState Config File on Change
  • Loading branch information
k8s-ci-robot committed Jan 12, 2023
2 parents b7a7070 + 06268ab commit 63db483
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 48 deletions.
69 changes: 37 additions & 32 deletions internal/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"errors"
"os"
"path/filepath"
"strings"
"time"

"github.com/fsnotify/fsnotify"
Expand All @@ -30,67 +29,73 @@ import (
"k8s.io/klog/v2"

"k8s.io/kube-state-metrics/v2/pkg/app"
"k8s.io/kube-state-metrics/v2/pkg/customresource"
"k8s.io/kube-state-metrics/v2/pkg/customresourcestate"
"k8s.io/kube-state-metrics/v2/pkg/options"
)

// RunKubeStateMetricsWrapper is a wrapper around KSM, delegated to the root command.
func RunKubeStateMetricsWrapper(opts *options.Options) {
var factories []customresource.RegistryFactory
if config, set := resolveCustomResourceConfig(opts); set {
crf, err := customresourcestate.FromConfig(config)
if err != nil {
klog.ErrorS(err, "Parsing from Custom Resource State Metrics file failed")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
factories = append(factories, crf...)
}

KSMRunOrDie := func(ctx context.Context) {
if err := app.RunKubeStateMetricsWrapper(ctx, opts, factories...); err != nil {
if err := app.RunKubeStateMetricsWrapper(ctx, opts); err != nil {
klog.ErrorS(err, "Failed to run kube-state-metrics")
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
}

ctx, cancel := context.WithCancel(context.Background())
if file := options.GetConfigFile(*opts); file != "" {
viper.SetConfigType("yaml")
viper.SetConfigFile(file)
if err := viper.ReadInConfig(); err != nil {
cfgViper := viper.New()
cfgViper.SetConfigType("yaml")
cfgViper.SetConfigFile(file)
if err := cfgViper.ReadInConfig(); err != nil {
if errors.Is(err, viper.ConfigFileNotFoundError{}) {
klog.ErrorS(err, "Options configuration file not found", "file", file)
} else {
klog.ErrorS(err, "Error reading options configuration file", "file", file)
}
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
viper.OnConfigChange(func(e fsnotify.Event) {
cfgViper.OnConfigChange(func(e fsnotify.Event) {
klog.Infof("Changes detected: %s\n", e.Name)
cancel()
// Wait for the ports to be released.
<-time.After(3 * time.Second)
ctx, cancel = context.WithCancel(context.Background())
go KSMRunOrDie(ctx)
})
viper.WatchConfig()
}
klog.Infoln("Starting kube-state-metrics")
KSMRunOrDie(ctx)
select {}
}
cfgViper.WatchConfig()

func resolveCustomResourceConfig(opts *options.Options) (customresourcestate.ConfigDecoder, bool) {
if s := opts.CustomResourceConfig; s != "" {
return yaml.NewDecoder(strings.NewReader(s)), true
}
if file := opts.CustomResourceConfigFile; file != "" {
f, err := os.Open(filepath.Clean(file))
// Merge configFile values with opts so we get the CustomResourceConfigFile from config as well
configFile, err := os.ReadFile(filepath.Clean(file))
if err != nil {
klog.ErrorS(err, "Custom Resource State Metrics file could not be opened")
klog.ErrorS(err, "failed to read options configuration file", "file", file)
}

yaml.Unmarshal(configFile, opts)
}
if opts.CustomResourceConfigFile != "" {
crcViper := viper.New()
crcViper.SetConfigType("yaml")
crcViper.SetConfigFile(opts.CustomResourceConfigFile)
if err := crcViper.ReadInConfig(); err != nil {
if errors.Is(err, viper.ConfigFileNotFoundError{}) {
klog.ErrorS(err, "Custom resource configuration file not found", "file", opts.CustomResourceConfigFile)
} else {
klog.ErrorS(err, "Error reading Custom resource configuration file", "file", opts.CustomResourceConfigFile)
}
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
}
return yaml.NewDecoder(f), true
crcViper.OnConfigChange(func(e fsnotify.Event) {
klog.Infof("Changes detected: %s\n", e.Name)
cancel()
// Wait for the ports to be released.
<-time.After(3 * time.Second)
ctx, cancel = context.WithCancel(context.Background())
go KSMRunOrDie(ctx)
})
crcViper.WatchConfig()
}
return nil, false
klog.Infoln("Starting kube-state-metrics")
KSMRunOrDie(ctx)
select {}
}
59 changes: 44 additions & 15 deletions pkg/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"os"
"path/filepath"
"strconv"
"strings"
"time"

"gopkg.in/yaml.v3"
Expand All @@ -47,6 +48,7 @@ import (
"k8s.io/kube-state-metrics/v2/internal/store"
"k8s.io/kube-state-metrics/v2/pkg/allowdenylist"
"k8s.io/kube-state-metrics/v2/pkg/customresource"
"k8s.io/kube-state-metrics/v2/pkg/customresourcestate"
generator "k8s.io/kube-state-metrics/v2/pkg/metric_generator"
"k8s.io/kube-state-metrics/v2/pkg/metricshandler"
"k8s.io/kube-state-metrics/v2/pkg/optin"
Expand All @@ -73,8 +75,8 @@ func (pl promLogger) Log(v ...interface{}) error {
}

// RunKubeStateMetricsWrapper runs KSM with context cancellation.
func RunKubeStateMetricsWrapper(ctx context.Context, opts *options.Options, factories ...customresource.RegistryFactory) error {
err := RunKubeStateMetrics(ctx, opts, factories...)
func RunKubeStateMetricsWrapper(ctx context.Context, opts *options.Options) error {
err := RunKubeStateMetrics(ctx, opts)
if ctx.Err() == context.Canceled {
klog.Infoln("Restarting: kube-state-metrics, metrics will be reset")
return nil
Expand All @@ -85,11 +87,10 @@ func RunKubeStateMetricsWrapper(ctx context.Context, opts *options.Options, fact
// RunKubeStateMetrics will build and run the kube-state-metrics.
// Any out-of-tree custom resource metrics could be registered by newing a registry factory
// which implements customresource.RegistryFactory and pass all factories into this function.
func RunKubeStateMetrics(ctx context.Context, opts *options.Options, factories ...customresource.RegistryFactory) error {
func RunKubeStateMetrics(ctx context.Context, opts *options.Options) error {
promLogger := promLogger{}

storeBuilder := store.NewBuilder()
storeBuilder.WithCustomResourceStoreFactories(factories...)

ksmMetricsRegistry := prometheus.NewRegistry()
ksmMetricsRegistry.MustRegister(version.NewCollector("kube_state_metrics"))
Expand Down Expand Up @@ -144,6 +145,22 @@ func RunKubeStateMetrics(ctx context.Context, opts *options.Options, factories .
}
}

// Loading custom resource state configuration from cli argument or config file
config, err := resolveCustomResourceConfig(opts)
if err != nil {
return err
}

var factories []customresource.RegistryFactory

if config != nil {
factories, err = customresourcestate.FromConfig(config)
if err != nil {
return fmt.Errorf("Parsing from Custom Resource State Metrics file failed: %v", err)
}
}
storeBuilder.WithCustomResourceStoreFactories(factories...)

if opts.CustomResourceConfigFile != "" {
crcFile, err := os.ReadFile(filepath.Clean(opts.CustomResourceConfigFile))
if err != nil {
Expand All @@ -156,24 +173,22 @@ func RunKubeStateMetrics(ctx context.Context, opts *options.Options, factories .

}

var resources []string
resources := make([]string, len(factories))

for i, factory := range factories {
resources[i] = factory.Name()
}

switch {
case len(opts.Resources) == 0 && !opts.CustomResourcesOnly:
resources = append(resources, options.DefaultResources.AsSlice()...)
klog.InfoS("Used default resources")
resources = options.DefaultResources.AsSlice()
// enable custom resource
for _, factory := range factories {
resources = append(resources, factory.Name())
}
case opts.CustomResourcesOnly:
// enable custom resource only
for _, factory := range factories {
resources = append(resources, factory.Name())
}
klog.InfoS("Used CRD resources only", "resources", resources)
default:
klog.InfoS("Used resources", "resources", opts.Resources.String())
resources = opts.Resources.AsSlice()
resources = append(resources, opts.Resources.AsSlice()...)
klog.InfoS("Used resources", "resources", resources)
}

if err := storeBuilder.WithEnabledResources(resources); err != nil {
Expand Down Expand Up @@ -419,3 +434,17 @@ func md5HashAsMetricValue(data []byte) float64 {
copy(bytes, smallSum)
return float64(binary.LittleEndian.Uint64(bytes))
}

func resolveCustomResourceConfig(opts *options.Options) (customresourcestate.ConfigDecoder, error) {
if s := opts.CustomResourceConfig; s != "" {
return yaml.NewDecoder(strings.NewReader(s)), nil
}
if file := opts.CustomResourceConfigFile; file != "" {
f, err := os.Open(filepath.Clean(file))
if err != nil {
return nil, fmt.Errorf("Custom Resource State Metrics file could not be opened: %v", err)
}
return yaml.NewDecoder(f), nil
}
return nil, nil
}
2 changes: 1 addition & 1 deletion tests/e2e/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func TestDefaultCollectorMetricsAvailable(t *testing.T) {

files, err := os.ReadDir("../../internal/store/")
if err != nil {
t.Fatalf("failed to read dir to get all resouces name: %v", err)
t.Fatalf("failed to read dir to get all resources name: %v", err)
}

re := regexp.MustCompile(`^([a-z]+).go$`)
Expand Down

0 comments on commit 63db483

Please sign in to comment.