Skip to content

Commit

Permalink
feat: add a CustomMetricsProvider to the metrics-adapter
Browse files Browse the repository at this point in the history
Signed-off-by: chaunceyjiang <chaunceyjiang@gmail.com>
  • Loading branch information
chaunceyjiang committed Jun 8, 2023
1 parent bfb9bc3 commit 405434d
Show file tree
Hide file tree
Showing 6 changed files with 431 additions and 22 deletions.
28 changes: 28 additions & 0 deletions artifacts/deploy/karmada-metrics-adapter-apiservice.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,34 @@ spec:
version: v1beta1
versionPriority: 10
---
apiVersion: apiregistration.k8s.io/v1
kind: APIService
metadata:
name: v1beta2.custom.metrics.k8s.io
spec:
service:
name: karmada-metrics-adapter
namespace: karmada-system
group: custom.metrics.k8s.io
version: v1beta2
insecureSkipTLSVerify: true
groupPriorityMinimum: 100
versionPriority: 200
---
apiVersion: apiregistration.k8s.io/v1
kind: APIService
metadata:
name: v1beta1.custom.metrics.k8s.io
spec:
service:
name: karmada-metrics-adapter
namespace: karmada-system
group: custom.metrics.k8s.io
version: v1beta1
insecureSkipTLSVerify: true
groupPriorityMinimum: 100
versionPriority: 200
---
apiVersion: v1
kind: Service
metadata:
Expand Down
10 changes: 8 additions & 2 deletions cmd/metrics-adapter/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package options

import (
"context"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"

"github.com/spf13/pflag"
openapinamer "k8s.io/apiserver/pkg/endpoints/openapi"
Expand Down Expand Up @@ -56,8 +58,9 @@ func (o *Options) Config() (*metricsadapter.MetricsServer, error) {

karmadaClient := karmadaclientset.NewForConfigOrDie(restConfig)
factory := informerfactory.NewSharedInformerFactory(karmadaClient, 0)

metricsController := metricsadapter.NewMetricsController(restConfig, factory)
kubeClient := kubernetes.NewForConfigOrDie(restConfig)
kubeFactory := informers.NewSharedInformerFactory(kubeClient, 0)
metricsController := metricsadapter.NewMetricsController(restConfig, factory, kubeFactory)
metricsAdapter := metricsadapter.NewMetricsAdapter(metricsController, o.CustomMetricsAdapterServerOptions)
metricsAdapter.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(api.Scheme))
metricsAdapter.OpenAPIConfig.Info.Title = "karmada-metrics-adapter"
Expand All @@ -70,6 +73,9 @@ func (o *Options) Config() (*metricsadapter.MetricsServer, error) {
}

err = server.GenericAPIServer.AddPostStartHook("start-karmada-informers", func(context genericapiserver.PostStartHookContext) error {
kubeFactory.Core().V1().Secrets().Informer()
kubeFactory.Start(context.StopCh)
kubeFactory.WaitForCacheSync(context.StopCh)
factory.Start(context.StopCh)
return nil
})
Expand Down
3 changes: 1 addition & 2 deletions pkg/metricsadapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ type MetricsAdapter struct {
func NewMetricsAdapter(controller *MetricsController, customMetricsAdapterServerOptions *options.CustomMetricsAdapterServerOptions) *MetricsAdapter {
adapter := &MetricsAdapter{}
adapter.CustomMetricsAdapterServerOptions = customMetricsAdapterServerOptions

adapter.ResourceMetricsProvider = provider.NewResourceMetricsProvider(controller.ClusterLister, controller.InformerManager)
customProvider := provider.MakeCustomMetricsProvider()
customProvider := provider.MakeCustomMetricsProvider(controller.ClusterLister, controller.MultiClusterDiscovery)
externalProvider := provider.MakeExternalMetricsProvider()
adapter.WithCustomMetrics(customProvider)
adapter.WithExternalMetrics(externalProvider)
Expand Down
36 changes: 23 additions & 13 deletions pkg/metricsadapter/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
Expand All @@ -16,6 +17,7 @@ import (
clusterV1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
informerfactory "github.com/karmada-io/karmada/pkg/generated/informers/externalversions"
clusterlister "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1"
"github.com/karmada-io/karmada/pkg/metricsadapter/multiclient"
"github.com/karmada-io/karmada/pkg/metricsadapter/provider"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
Expand All @@ -29,23 +31,24 @@ var (

// MetricsController is a controller for metrics, control the lifecycle of multi-clusters informer
type MetricsController struct {
InformerFactory informerfactory.SharedInformerFactory
ClusterLister clusterlister.ClusterLister
InformerManager genericmanager.MultiClusterInformerManager

queue workqueue.RateLimitingInterface
restConfig *rest.Config
InformerFactory informerfactory.SharedInformerFactory
ClusterLister clusterlister.ClusterLister
InformerManager genericmanager.MultiClusterInformerManager
MultiClusterDiscovery multiclient.MultiClusterDiscoveryInterface
queue workqueue.RateLimitingInterface
restConfig *rest.Config
}

// NewMetricsController creates a new metrics controller
func NewMetricsController(restConfig *rest.Config, factory informerfactory.SharedInformerFactory) *MetricsController {
func NewMetricsController(restConfig *rest.Config, factory informerfactory.SharedInformerFactory, kubeFactory informers.SharedInformerFactory) *MetricsController {
clusterLister := factory.Cluster().V1alpha1().Clusters().Lister()
controller := &MetricsController{
InformerFactory: factory,
ClusterLister: clusterLister,
InformerManager: genericmanager.GetInstance(),
restConfig: restConfig,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "metrics-adapter"),
InformerFactory: factory,
ClusterLister: clusterLister,
MultiClusterDiscovery: multiclient.NewMultiClusterDiscoveryClient(clusterLister, kubeFactory),
InformerManager: genericmanager.GetInstance(),
restConfig: restConfig,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "metrics-adapter"),
}
controller.addEventHandler()

Expand Down Expand Up @@ -127,6 +130,7 @@ func (m *MetricsController) handleClusters() bool {
if apierrors.IsNotFound(err) {
klog.Infof("try to stop cluster informer %s", clusterName)
m.InformerManager.Stop(clusterName)
m.MultiClusterDiscovery.Remove(clusterName)
return true
}
return false
Expand All @@ -135,12 +139,14 @@ func (m *MetricsController) handleClusters() bool {
if !cls.DeletionTimestamp.IsZero() {
klog.Infof("try to stop cluster informer %s", clusterName)
m.InformerManager.Stop(clusterName)
m.MultiClusterDiscovery.Remove(clusterName)
return true
}

if !util.IsClusterReady(&cls.Status) {
klog.Warningf("cluster %s is notReady try to stop this cluster informer", clusterName)
m.InformerManager.Stop(clusterName)
m.MultiClusterDiscovery.Remove(clusterName)
return false
}

Expand All @@ -159,8 +165,12 @@ func (m *MetricsController) handleClusters() bool {
}
_ = m.InformerManager.ForCluster(clusterName, clusterDynamicClient.DynamicClientSet, 0)
}
err = m.MultiClusterDiscovery.Set(clusterName)
if err != nil {
klog.Warningf("failed to build discoveryClient for cluster(%s), Error: %+v", clusterName, err)
return true
}
sci := m.InformerManager.GetSingleClusterManager(clusterName)

// Just trigger the informer to work
_ = sci.Lister(provider.PodsGVR)
_ = sci.Lister(provider.NodesGVR)
Expand Down
70 changes: 70 additions & 0 deletions pkg/metricsadapter/multiclient/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package multiclient

import (
"sync"

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/discovery"
"k8s.io/client-go/informers"
listcorev1 "k8s.io/client-go/listers/core/v1"

clusterV1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
clusterlister "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1"
"github.com/karmada-io/karmada/pkg/util"
)

// MultiClusterDiscoveryInterface DiscoveryInterface for multiple clusters.
type MultiClusterDiscoveryInterface interface {
Get(clusterName string) *discovery.DiscoveryClient
Set(clusterName string) error
Remove(clusterName string)
}

// MultiClusterDiscovery provide DiscoveryClient for multiple clusters.
type MultiClusterDiscovery struct {
sync.RWMutex
clients map[string]*discovery.DiscoveryClient
secretLister listcorev1.SecretLister
clusterLister clusterlister.ClusterLister
}

// NewMultiClusterDiscoveryClient return a new MultiClusterDiscovery
func NewMultiClusterDiscoveryClient(clusterLister clusterlister.ClusterLister, KubeFactory informers.SharedInformerFactory) MultiClusterDiscoveryInterface {
return &MultiClusterDiscovery{
clusterLister: clusterLister,
secretLister: KubeFactory.Core().V1().Secrets().Lister(),
clients: map[string]*discovery.DiscoveryClient{},
}
}

// Get returns a DiscoveryClient for the provided clusterName.
func (m *MultiClusterDiscovery) Get(clusterName string) *discovery.DiscoveryClient {
m.RLock()
defer m.RUnlock()
return m.clients[clusterName]
}

// Set a DiscoveryClient for the provided clusterName.
func (m *MultiClusterDiscovery) Set(clusterName string) error {
clusterGetter := func(cluster string) (*clusterV1alpha1.Cluster, error) {
return m.clusterLister.Get(cluster)
}
secretGetter := func(namespace string, name string) (*corev1.Secret, error) {
return m.secretLister.Secrets(namespace).Get(name)
}
clusterConfig, err := util.BuildClusterConfig(clusterName, clusterGetter, secretGetter)
if err != nil {
return err
}
m.Lock()
defer m.Unlock()
m.clients[clusterName] = discovery.NewDiscoveryClientForConfigOrDie(clusterConfig)
return nil
}

// Remove a DiscoveryClient for the provided clusterName.
func (m *MultiClusterDiscovery) Remove(clusterName string) {
m.Lock()
defer m.Unlock()
delete(m.clients, clusterName)
}
Loading

0 comments on commit 405434d

Please sign in to comment.