Skip to content

Commit

Permalink
Merge pull request #1548 from liangyuanpeng/feature_support_apiserver…
Browse files Browse the repository at this point in the history
…cache

Add arg use-api-server-cache to set resourceVersion=0 for ListWatch
  • Loading branch information
k8s-ci-robot committed Aug 19, 2021
2 parents 16e8f54 + 78775b0 commit 583d8c9
Show file tree
Hide file tree
Showing 8 changed files with 81 additions and 60 deletions.
1 change: 1 addition & 0 deletions docs/cli-arguments.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ Usage of ./kube-state-metrics:
--telemetry-port int Port to expose kube-state-metrics self metrics on. (default 8081)
--tls-config string Path to the TLS configuration file
--total-shards int The total number of shards. Sharding is disabled when total shards is set to 1. (default 1)
--use-apiserver-cache Sets resourceVersion=0 for ListWatch requests, using cached resources from the apiserver instead of an etcd quorum read.
-v, --v Level number for the log level verbosity
--version kube-state-metrics build version information
--vmodule moduleSpec comma-separated list of pattern=N settings for file-filtered logging
Expand Down
94 changes: 49 additions & 45 deletions internal/store/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,18 +57,19 @@ var _ ksmtypes.BuilderInterface = &Builder{}
// Builder helps to build store. It follows the builder pattern
// (https://en.wikipedia.org/wiki/Builder_pattern).
type Builder struct {
kubeClient clientset.Interface
vpaClient vpaclientset.Interface
namespaces options.NamespaceList
ctx context.Context
enabledResources []string
allowDenyList ksmtypes.AllowDenyLister
listWatchMetrics *watch.ListWatchMetrics
shardingMetrics *sharding.Metrics
shard int32
totalShards int
buildStoresFunc ksmtypes.BuildStoresFunc
allowLabelsList map[string][]string
kubeClient clientset.Interface
vpaClient vpaclientset.Interface
namespaces options.NamespaceList
ctx context.Context
enabledResources []string
allowDenyList ksmtypes.AllowDenyLister
listWatchMetrics *watch.ListWatchMetrics
shardingMetrics *sharding.Metrics
shard int32
totalShards int
buildStoresFunc ksmtypes.BuildStoresFunc
allowLabelsList map[string][]string
useAPIServerCache bool
}

// NewBuilder returns a new builder.
Expand Down Expand Up @@ -137,8 +138,9 @@ func (b *Builder) WithAllowDenyList(l ksmtypes.AllowDenyLister) {
}

// WithGenerateStoresFunc configures a custom generate store function
func (b *Builder) WithGenerateStoresFunc(f ksmtypes.BuildStoresFunc) {
func (b *Builder) WithGenerateStoresFunc(f ksmtypes.BuildStoresFunc, u bool) {
b.buildStoresFunc = f
b.useAPIServerCache = u
}

// DefaultGenerateStoresFunc returns default buildStores function
Expand Down Expand Up @@ -228,125 +230,126 @@ func availableResources() []string {
}

func (b *Builder) buildConfigMapStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(configMapMetricFamilies, &v1.ConfigMap{}, createConfigMapListWatch)
return b.buildStoresFunc(configMapMetricFamilies, &v1.ConfigMap{}, createConfigMapListWatch, b.useAPIServerCache)
}

func (b *Builder) buildCronJobStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(cronJobMetricFamilies(b.allowLabelsList["cronjobs"]), &batchv1beta1.CronJob{}, createCronJobListWatch)
return b.buildStoresFunc(cronJobMetricFamilies(b.allowLabelsList["cronjobs"]), &batchv1beta1.CronJob{}, createCronJobListWatch, b.useAPIServerCache)
}

func (b *Builder) buildDaemonSetStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(daemonSetMetricFamilies(b.allowLabelsList["daemonsets"]), &appsv1.DaemonSet{}, createDaemonSetListWatch)
return b.buildStoresFunc(daemonSetMetricFamilies(b.allowLabelsList["daemonsets"]), &appsv1.DaemonSet{}, createDaemonSetListWatch, b.useAPIServerCache)
}

func (b *Builder) buildDeploymentStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(deploymentMetricFamilies(b.allowLabelsList["deployments"]), &appsv1.Deployment{}, createDeploymentListWatch)
return b.buildStoresFunc(deploymentMetricFamilies(b.allowLabelsList["deployments"]), &appsv1.Deployment{}, createDeploymentListWatch, b.useAPIServerCache)
}

func (b *Builder) buildEndpointsStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(endpointMetricFamilies(b.allowLabelsList["endpoints"]), &v1.Endpoints{}, createEndpointsListWatch)
return b.buildStoresFunc(endpointMetricFamilies(b.allowLabelsList["endpoints"]), &v1.Endpoints{}, createEndpointsListWatch, b.useAPIServerCache)
}

func (b *Builder) buildHPAStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(hpaMetricFamilies(b.allowLabelsList["horizontalpodautoscalers"]), &autoscaling.HorizontalPodAutoscaler{}, createHPAListWatch)
return b.buildStoresFunc(hpaMetricFamilies(b.allowLabelsList["horizontalpodautoscalers"]), &autoscaling.HorizontalPodAutoscaler{}, createHPAListWatch, b.useAPIServerCache)
}

func (b *Builder) buildIngressStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(ingressMetricFamilies(b.allowLabelsList["ingresses"]), &networkingv1.Ingress{}, createIngressListWatch)
return b.buildStoresFunc(ingressMetricFamilies(b.allowLabelsList["ingresses"]), &networkingv1.Ingress{}, createIngressListWatch, b.useAPIServerCache)
}

func (b *Builder) buildJobStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(jobMetricFamilies(b.allowLabelsList["jobs"]), &batchv1.Job{}, createJobListWatch)
return b.buildStoresFunc(jobMetricFamilies(b.allowLabelsList["jobs"]), &batchv1.Job{}, createJobListWatch, b.useAPIServerCache)
}

func (b *Builder) buildLimitRangeStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(limitRangeMetricFamilies, &v1.LimitRange{}, createLimitRangeListWatch)
return b.buildStoresFunc(limitRangeMetricFamilies, &v1.LimitRange{}, createLimitRangeListWatch, b.useAPIServerCache)
}

func (b *Builder) buildMutatingWebhookConfigurationStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(mutatingWebhookConfigurationMetricFamilies, &admissionregistrationv1.MutatingWebhookConfiguration{}, createMutatingWebhookConfigurationListWatch)
return b.buildStoresFunc(mutatingWebhookConfigurationMetricFamilies, &admissionregistrationv1.MutatingWebhookConfiguration{}, createMutatingWebhookConfigurationListWatch, b.useAPIServerCache)
}

func (b *Builder) buildNamespaceStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(namespaceMetricFamilies(b.allowLabelsList["namespaces"]), &v1.Namespace{}, createNamespaceListWatch)
return b.buildStoresFunc(namespaceMetricFamilies(b.allowLabelsList["namespaces"]), &v1.Namespace{}, createNamespaceListWatch, b.useAPIServerCache)
}

func (b *Builder) buildNetworkPolicyStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(networkPolicyMetricFamilies(b.allowLabelsList["networkpolicies"]), &networkingv1.NetworkPolicy{}, createNetworkPolicyListWatch)
return b.buildStoresFunc(networkPolicyMetricFamilies(b.allowLabelsList["networkpolicies"]), &networkingv1.NetworkPolicy{}, createNetworkPolicyListWatch, b.useAPIServerCache)
}

func (b *Builder) buildNodeStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(nodeMetricFamilies(b.allowLabelsList["nodes"]), &v1.Node{}, createNodeListWatch)
return b.buildStoresFunc(nodeMetricFamilies(b.allowLabelsList["nodes"]), &v1.Node{}, createNodeListWatch, b.useAPIServerCache)
}

func (b *Builder) buildPersistentVolumeClaimStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(persistentVolumeClaimMetricFamilies(b.allowLabelsList["persistentvolumeclaims"]), &v1.PersistentVolumeClaim{}, createPersistentVolumeClaimListWatch)
return b.buildStoresFunc(persistentVolumeClaimMetricFamilies(b.allowLabelsList["persistentvolumeclaims"]), &v1.PersistentVolumeClaim{}, createPersistentVolumeClaimListWatch, b.useAPIServerCache)
}

func (b *Builder) buildPersistentVolumeStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(persistentVolumeMetricFamilies(b.allowLabelsList["persistentvolumes"]), &v1.PersistentVolume{}, createPersistentVolumeListWatch)
return b.buildStoresFunc(persistentVolumeMetricFamilies(b.allowLabelsList["persistentvolumes"]), &v1.PersistentVolume{}, createPersistentVolumeListWatch, b.useAPIServerCache)
}

func (b *Builder) buildPodDisruptionBudgetStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(podDisruptionBudgetMetricFamilies, &policy.PodDisruptionBudget{}, createPodDisruptionBudgetListWatch)
return b.buildStoresFunc(podDisruptionBudgetMetricFamilies, &policy.PodDisruptionBudget{}, createPodDisruptionBudgetListWatch, b.useAPIServerCache)
}

func (b *Builder) buildReplicaSetStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(replicaSetMetricFamilies(b.allowLabelsList["replicasets"]), &appsv1.ReplicaSet{}, createReplicaSetListWatch)
return b.buildStoresFunc(replicaSetMetricFamilies(b.allowLabelsList["replicasets"]), &appsv1.ReplicaSet{}, createReplicaSetListWatch, b.useAPIServerCache)
}

func (b *Builder) buildReplicationControllerStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(replicationControllerMetricFamilies, &v1.ReplicationController{}, createReplicationControllerListWatch)
return b.buildStoresFunc(replicationControllerMetricFamilies, &v1.ReplicationController{}, createReplicationControllerListWatch, b.useAPIServerCache)
}

func (b *Builder) buildResourceQuotaStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(resourceQuotaMetricFamilies, &v1.ResourceQuota{}, createResourceQuotaListWatch)
return b.buildStoresFunc(resourceQuotaMetricFamilies, &v1.ResourceQuota{}, createResourceQuotaListWatch, b.useAPIServerCache)
}

func (b *Builder) buildSecretStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(secretMetricFamilies(b.allowLabelsList["secrets"]), &v1.Secret{}, createSecretListWatch)
return b.buildStoresFunc(secretMetricFamilies(b.allowLabelsList["secrets"]), &v1.Secret{}, createSecretListWatch, b.useAPIServerCache)
}

func (b *Builder) buildServiceStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(serviceMetricFamilies(b.allowLabelsList["services"]), &v1.Service{}, createServiceListWatch)
return b.buildStoresFunc(serviceMetricFamilies(b.allowLabelsList["services"]), &v1.Service{}, createServiceListWatch, b.useAPIServerCache)
}

func (b *Builder) buildStatefulSetStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(statefulSetMetricFamilies(b.allowLabelsList["statefulsets"]), &appsv1.StatefulSet{}, createStatefulSetListWatch)
return b.buildStoresFunc(statefulSetMetricFamilies(b.allowLabelsList["statefulsets"]), &appsv1.StatefulSet{}, createStatefulSetListWatch, b.useAPIServerCache)
}

func (b *Builder) buildStorageClassStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(storageClassMetricFamilies(b.allowLabelsList["storageclasses"]), &storagev1.StorageClass{}, createStorageClassListWatch)
return b.buildStoresFunc(storageClassMetricFamilies(b.allowLabelsList["storageclasses"]), &storagev1.StorageClass{}, createStorageClassListWatch, b.useAPIServerCache)
}

func (b *Builder) buildPodStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(podMetricFamilies(b.allowLabelsList["pods"]), &v1.Pod{}, createPodListWatch)
return b.buildStoresFunc(podMetricFamilies(b.allowLabelsList["pods"]), &v1.Pod{}, createPodListWatch, b.useAPIServerCache)
}

func (b *Builder) buildCsrStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(csrMetricFamilies(b.allowLabelsList["certificatesigningrequests"]), &certv1.CertificateSigningRequest{}, createCSRListWatch)
return b.buildStoresFunc(csrMetricFamilies(b.allowLabelsList["certificatesigningrequests"]), &certv1.CertificateSigningRequest{}, createCSRListWatch, b.useAPIServerCache)
}

func (b *Builder) buildValidatingWebhookConfigurationStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(validatingWebhookConfigurationMetricFamilies, &admissionregistrationv1.ValidatingWebhookConfiguration{}, createValidatingWebhookConfigurationListWatch)
return b.buildStoresFunc(validatingWebhookConfigurationMetricFamilies, &admissionregistrationv1.ValidatingWebhookConfiguration{}, createValidatingWebhookConfigurationListWatch, b.useAPIServerCache)
}

func (b *Builder) buildVolumeAttachmentStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(volumeAttachmentMetricFamilies, &storagev1.VolumeAttachment{}, createVolumeAttachmentListWatch)
return b.buildStoresFunc(volumeAttachmentMetricFamilies, &storagev1.VolumeAttachment{}, createVolumeAttachmentListWatch, b.useAPIServerCache)
}

func (b *Builder) buildVPAStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(vpaMetricFamilies(b.allowLabelsList["verticalpodautoscalers"]), &vpaautoscaling.VerticalPodAutoscaler{}, createVPAListWatchFunc(b.vpaClient))
return b.buildStoresFunc(vpaMetricFamilies(b.allowLabelsList["verticalpodautoscalers"]), &vpaautoscaling.VerticalPodAutoscaler{}, createVPAListWatchFunc(b.vpaClient), b.useAPIServerCache)
}

func (b *Builder) buildLeasesStores() []*metricsstore.MetricsStore {
return b.buildStoresFunc(leaseMetricFamilies, &coordinationv1.Lease{}, createLeaseListWatch)
return b.buildStoresFunc(leaseMetricFamilies, &coordinationv1.Lease{}, createLeaseListWatch, b.useAPIServerCache)
}

func (b *Builder) buildStores(
metricFamilies []generator.FamilyGenerator,
expectedType interface{},
listWatchFunc func(kubeClient clientset.Interface, ns string) cache.ListerWatcher,
useAPIServerCache bool,
) []*metricsstore.MetricsStore {
metricFamilies = generator.FilterMetricFamilies(b.allowDenyList, metricFamilies)
composedMetricGenFuncs := generator.ComposeMetricGenFuncs(metricFamilies)
Expand All @@ -358,7 +361,7 @@ func (b *Builder) buildStores(
composedMetricGenFuncs,
)
listWatcher := listWatchFunc(b.kubeClient, v1.NamespaceAll)
b.startReflector(expectedType, store, listWatcher)
b.startReflector(expectedType, store, listWatcher, useAPIServerCache)
return []*metricsstore.MetricsStore{store}
}

Expand All @@ -369,7 +372,7 @@ func (b *Builder) buildStores(
composedMetricGenFuncs,
)
listWatcher := listWatchFunc(b.kubeClient, ns)
b.startReflector(expectedType, store, listWatcher)
b.startReflector(expectedType, store, listWatcher, useAPIServerCache)
stores = append(stores, store)
}

Expand All @@ -382,8 +385,9 @@ func (b *Builder) startReflector(
expectedType interface{},
store cache.Store,
listWatcher cache.ListerWatcher,
useAPIServerCache bool,
) {
instrumentedListWatch := watch.NewInstrumentedListerWatcher(listWatcher, b.listWatchMetrics, reflect.TypeOf(expectedType).String())
instrumentedListWatch := watch.NewInstrumentedListerWatcher(listWatcher, b.listWatchMetrics, reflect.TypeOf(expectedType).String(), useAPIServerCache)
reflector := cache.NewReflector(sharding.NewShardedListWatch(b.shard, b.totalShards, instrumentedListWatch), expectedType, store, 0)
go reflector.Run(b.ctx.Done())
}
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func main() {

storeBuilder.WithAllowDenyList(allowDenyList)

storeBuilder.WithGenerateStoresFunc(storeBuilder.DefaultGenerateStoresFunc())
storeBuilder.WithGenerateStoresFunc(storeBuilder.DefaultGenerateStoresFunc(), opts.UseAPIServerCache)

proc.StartReaper()

Expand Down
10 changes: 5 additions & 5 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func BenchmarkKubeStateMetrics(b *testing.B) {
builder.WithSharding(0, 1)
builder.WithContext(ctx)
builder.WithNamespaces(options.DefaultNamespaces)
builder.WithGenerateStoresFunc(builder.DefaultGenerateStoresFunc())
builder.WithGenerateStoresFunc(builder.DefaultGenerateStoresFunc(), false)

l, err := allowdenylist.New(map[string]struct{}{}, map[string]struct{}{})
if err != nil {
Expand Down Expand Up @@ -132,7 +132,7 @@ func TestFullScrapeCycle(t *testing.T) {
builder.WithEnabledResources(options.DefaultResources.AsSlice())
builder.WithKubeClient(kubeClient)
builder.WithNamespaces(options.DefaultNamespaces)
builder.WithGenerateStoresFunc(builder.DefaultGenerateStoresFunc())
builder.WithGenerateStoresFunc(builder.DefaultGenerateStoresFunc(), false)

l, err := allowdenylist.New(map[string]struct{}{}, map[string]struct{}{})
if err != nil {
Expand Down Expand Up @@ -412,7 +412,7 @@ func TestShardingEquivalenceScrapeCycle(t *testing.T) {
unshardedBuilder.WithNamespaces(options.DefaultNamespaces)
unshardedBuilder.WithAllowDenyList(l)
unshardedBuilder.WithAllowLabels(map[string][]string{})
unshardedBuilder.WithGenerateStoresFunc(unshardedBuilder.DefaultGenerateStoresFunc())
unshardedBuilder.WithGenerateStoresFunc(unshardedBuilder.DefaultGenerateStoresFunc(), false)

unshardedHandler := metricshandler.New(&options.Options{}, kubeClient, unshardedBuilder, false)
unshardedHandler.ConfigureSharding(ctx, 0, 1)
Expand All @@ -425,7 +425,7 @@ func TestShardingEquivalenceScrapeCycle(t *testing.T) {
shardedBuilder1.WithNamespaces(options.DefaultNamespaces)
shardedBuilder1.WithAllowDenyList(l)
shardedBuilder1.WithAllowLabels(map[string][]string{})
shardedBuilder1.WithGenerateStoresFunc(shardedBuilder1.DefaultGenerateStoresFunc())
shardedBuilder1.WithGenerateStoresFunc(shardedBuilder1.DefaultGenerateStoresFunc(), false)

shardedHandler1 := metricshandler.New(&options.Options{}, kubeClient, shardedBuilder1, false)
shardedHandler1.ConfigureSharding(ctx, 0, 2)
Expand All @@ -438,7 +438,7 @@ func TestShardingEquivalenceScrapeCycle(t *testing.T) {
shardedBuilder2.WithNamespaces(options.DefaultNamespaces)
shardedBuilder2.WithAllowDenyList(l)
shardedBuilder2.WithAllowLabels(map[string][]string{})
shardedBuilder2.WithGenerateStoresFunc(shardedBuilder2.DefaultGenerateStoresFunc())
shardedBuilder2.WithGenerateStoresFunc(shardedBuilder2.DefaultGenerateStoresFunc(), false)

shardedHandler2 := metricshandler.New(&options.Options{}, kubeClient, shardedBuilder2, false)
shardedHandler2.ConfigureSharding(ctx, 1, 2)
Expand Down
2 changes: 1 addition & 1 deletion pkg/builder/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (b *Builder) WithAllowLabels(l map[string][]string) {

// WithGenerateStoresFunc configures a custom generate store function
func (b *Builder) WithGenerateStoresFunc(f ksmtypes.BuildStoresFunc) {
b.internal.WithGenerateStoresFunc(f)
b.internal.WithGenerateStoresFunc(f, false)
}

// DefaultGenerateStoresFunc returns default buildStore function
Expand Down
3 changes: 2 additions & 1 deletion pkg/builder/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type BuilderInterface interface {
WithVPAClient(c vpaclientset.Interface)
WithAllowDenyList(l AllowDenyLister)
WithAllowLabels(l map[string][]string)
WithGenerateStoresFunc(f BuildStoresFunc)
WithGenerateStoresFunc(f BuildStoresFunc, useAPIServerCache bool)
DefaultGenerateStoresFunc() BuildStoresFunc
Build() []metricsstore.MetricsWriter
}
Expand All @@ -50,6 +50,7 @@ type BuilderInterface interface {
type BuildStoresFunc func(metricFamilies []generator.FamilyGenerator,
expectedType interface{},
listWatchFunc func(kubeClient clientset.Interface, ns string) cache.ListerWatcher,
useAPIServerCache bool,
) []*metricsstore.MetricsStore

// AllowDenyLister interface for AllowDeny lister that can allow or exclude metrics by there names
Expand Down
3 changes: 3 additions & 0 deletions pkg/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type Options struct {

EnableGZIPEncoding bool

UseAPIServerCache bool

flags *pflag.FlagSet
}

Expand Down Expand Up @@ -78,6 +80,7 @@ func (o *Options) AddFlags() {
o.flags.PrintDefaults()
}

o.flags.BoolVarP(&o.UseAPIServerCache, "use-apiserver-cache", "", false, "Sets resourceVersion=0 for ListWatch requests, using cached resources from the apiserver instead of an etcd quorum read.")
o.flags.StringVar(&o.Apiserver, "apiserver", "", `The URL of the apiserver to use as a master`)
o.flags.StringVar(&o.Kubeconfig, "kubeconfig", "", "Absolute path to the kubeconfig file")
o.flags.StringVar(&o.TLSConfig, "tls-config", "", "Path to the TLS configuration file")
Expand Down
Loading

0 comments on commit 583d8c9

Please sign in to comment.