Skip to content

Commit

Permalink
feat: add a CustomMetricsProvider to the metrics-adapter
Browse files Browse the repository at this point in the history
Signed-off-by: chaunceyjiang <chaunceyjiang@gmail.com>
  • Loading branch information
chaunceyjiang committed Jun 6, 2023
1 parent bfb9bc3 commit 26ebaa3
Show file tree
Hide file tree
Showing 3 changed files with 211 additions and 7 deletions.
14 changes: 14 additions & 0 deletions artifacts/deploy/karmada-metrics-adapter-apiservice.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,20 @@ spec:
version: v1beta1
versionPriority: 10
---
apiVersion: apiregistration.k8s.io/v1
kind: APIService
metadata:
name: v1beta2.custom.metrics.k8s.io
spec:
service:
name: karmada-metrics-adapter
namespace: karmada-system
group: custom.metrics.k8s.io
version: v1beta2
insecureSkipTLSVerify: true
groupPriorityMinimum: 100
versionPriority: 200
---
apiVersion: v1
kind: Service
metadata:
Expand Down
12 changes: 10 additions & 2 deletions pkg/metricsadapter/adapter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package metricsadapter

import (
"context"

"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
basecmd "sigs.k8s.io/custom-metrics-apiserver/pkg/cmd"
"sigs.k8s.io/custom-metrics-apiserver/pkg/cmd/options"

Expand All @@ -18,9 +22,13 @@ type MetricsAdapter struct {
func NewMetricsAdapter(controller *MetricsController, customMetricsAdapterServerOptions *options.CustomMetricsAdapterServerOptions) *MetricsAdapter {
adapter := &MetricsAdapter{}
adapter.CustomMetricsAdapterServerOptions = customMetricsAdapterServerOptions

kubeClient := kubernetes.NewForConfigOrDie(controller.restConfig)
kubeFactory := informers.NewSharedInformerFactory(kubeClient, 0)
kubeFactory.Core().V1().Secrets().Informer()
kubeFactory.Start(context.TODO().Done())
kubeFactory.WaitForCacheSync(context.TODO().Done())
adapter.ResourceMetricsProvider = provider.NewResourceMetricsProvider(controller.ClusterLister, controller.InformerManager)
customProvider := provider.MakeCustomMetricsProvider()
customProvider := provider.MakeCustomMetricsProvider(controller.ClusterLister, kubeFactory.Core().V1().Secrets().Lister())
externalProvider := provider.MakeExternalMetricsProvider()
adapter.WithCustomMetrics(customProvider)
adapter.WithExternalMetrics(externalProvider)
Expand Down
192 changes: 187 additions & 5 deletions pkg/metricsadapter/provider/custommetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,215 @@ package provider
import (
"context"
"fmt"
"net/url"
"strings"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/discovery"
listcorev1 "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"
"k8s.io/metrics/pkg/apis/custom_metrics"
"sigs.k8s.io/custom-metrics-apiserver/pkg/provider"

clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
clusterlister "github.com/karmada-io/karmada/pkg/generated/listers/cluster/v1alpha1"
"github.com/karmada-io/karmada/pkg/util"
)

var (
supportedMetricsAPIVersions = []string{
"v1beta2",
}
)

// CustomMetricsProvider is a custom metrics provider
type CustomMetricsProvider struct {
// newDiscoveryClientFunc returns a discovery client for member cluster apiserver
newDiscoveryClientFunc func(string) (*discovery.DiscoveryClient, error)
clusterLister clusterlister.ClusterLister
}

// MakeCustomMetricsProvider creates a new custom metrics provider
func MakeCustomMetricsProvider() *CustomMetricsProvider {
return &CustomMetricsProvider{}
func MakeCustomMetricsProvider(clusterLister clusterlister.ClusterLister, secretLister listcorev1.SecretLister) *CustomMetricsProvider {
return &CustomMetricsProvider{
clusterLister: clusterLister,
newDiscoveryClientFunc: discoveryClientForClusterFunc(clusterLister, secretLister),
}
}

// GetMetricByName will query metrics by name from member clusters and return the result
func (c *CustomMetricsProvider) GetMetricByName(ctx context.Context, name types.NamespacedName, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValue, error) {
return nil, fmt.Errorf("karmada-metrics-adapter still not implement it")
clusters, err := c.clusterLister.List(labels.Everything())
if err != nil {
klog.Errorf("Failed to list clusters: %v", err)
return nil, err
}

rawUrl := url.URL{Path: "/apis/custom.metrics.k8s.io/v1beta2"}
// handle metrics describing namespaces
if info.GroupResource.Resource == "namespaces" && info.GroupResource.Group == "" {
// namespace-describing metrics have a path of /namespaces/$NS/metrics/$metric,
rawUrl.Path += fmt.Sprintf("/%s/%s/metrics/%s", info.GroupResource.String(), name.Name, info.Metric)
} else {
if len(name.Namespace) > 0 {
rawUrl.Path += fmt.Sprintf("/namespaces/%s", name.Namespace)
}
rawUrl.Path += fmt.Sprintf("/%s/%s/%s", info.GroupResource.String(), name.Name, info.Metric)
}

var metricValue *custom_metrics.MetricValue
for _, cluster := range clusters {
discoveryClient, err := c.newDiscoveryClientFunc(cluster.Name)
if err != nil {
klog.Errorf("Failed to build discovery client for cluster(%s): %+v", cluster.Name, err)
continue
}

tmpMetricValue := &custom_metrics.MetricValue{}
err = discoveryClient.RESTClient().Get().AbsPath(rawUrl.String()).
VersionedParams(&custom_metrics.MetricListOptions{MetricLabelSelector: metricSelector.String()}, metav1.ParameterCodec).
Do(ctx).Into(tmpMetricValue)
if err != nil {
klog.Warningf("Failed to get metric by name(%s) in cluster(%s)", name.String(), cluster.Name)
continue
}
if metricValue != nil {
return nil, errors.NewConflict(info.GroupResource, name.String(), fmt.Errorf("the metric(%s) found in more than one clusters", name))
}
metricValue = tmpMetricValue
}
if metricValue != nil {
return metricValue, nil
}
return nil, errors.NewNotFound(info.GroupResource, name.String())
}

// GetMetricBySelector will query metrics by selector from member clusters and return the result
func (c *CustomMetricsProvider) GetMetricBySelector(ctx context.Context, namespace string, selector labels.Selector, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValueList, error) {
return nil, fmt.Errorf("karmada-metrics-adapter still not implement it")
clusters, err := c.clusterLister.List(labels.Everything())
if err != nil {
klog.Errorf("Failed to list clusters: %v", err)
return nil, err
}

rawUrl := url.URL{Path: "/apis/custom.metrics.k8s.io/v1beta2"}
// handle metrics describing namespaces
if info.GroupResource.Resource == "namespaces" && info.GroupResource.Group == "" {
// namespace-describing metrics have a path of /namespaces/$NS/metrics/$metric,
rawUrl.Path += fmt.Sprintf("/%s/*/metrics/%s", info.GroupResource.String(), info.Metric)
} else {
if len(namespace) > 0 {
rawUrl.Path += fmt.Sprintf("/namespaces/%s", namespace)
}
rawUrl.Path += fmt.Sprintf("/%s/*/%s", info.GroupResource.String(), info.Metric)
}

metricValueList := &custom_metrics.MetricValueList{
TypeMeta: metav1.TypeMeta{
Kind: "MetricValueList",
APIVersion: "custom.metrics.k8s.io/v1beta2",
},
ListMeta: metav1.ListMeta{},
Items: nil,
}
for _, cluster := range clusters {
discoveryClient, err := c.newDiscoveryClientFunc(cluster.Name)
if err != nil {
klog.Errorf("Failed to build discovery client for cluster(%s): %+v", cluster.Name, err)
continue
}
tmpMetricValue := &custom_metrics.MetricValueList{}
err = discoveryClient.RESTClient().Get().AbsPath(rawUrl.String()).
VersionedParams(&custom_metrics.MetricListOptions{LabelSelector: selector.String(), MetricLabelSelector: metricSelector.String()}, metav1.ParameterCodec).
Do(ctx).Into(tmpMetricValue)
if err != nil {
klog.Warningf("Failed to get metric by namespace(%s) in cluster(%s)", namespace, cluster.Name)
continue
}
metricValueList.Items = append(metricValueList.Items, tmpMetricValue.Items...)
}
return metricValueList, nil
}

// ListAllMetrics returns all metrics in all member clusters
func (c *CustomMetricsProvider) ListAllMetrics() []provider.CustomMetricInfo {
return []provider.CustomMetricInfo{}
clusters, err := c.clusterLister.List(labels.Everything())
if err != nil {
klog.Errorf("Failed to list clusters: %v", err)
return []provider.CustomMetricInfo{}
}
var customMetricInfos []provider.CustomMetricInfo
for _, cluster := range clusters {
discoveryClient, err := c.newDiscoveryClientFunc(cluster.Name)
if err != nil {
klog.Errorf("Failed to build discovery client for cluster(%s): %+v", cluster.Name, err)
continue
}
apiGroups, err := discoveryClient.ServerGroups()
if err != nil {
klog.Errorf("Failed to query resource in cluster(%s): %+v", cluster.Name, err)
continue
}
if !supportedMetricsAPIVersionAvailable(apiGroups) {
klog.Warningf("custom.metrics.k8s.io not found in cluster(%s)", cluster.Name)
return []provider.CustomMetricInfo{}
}
resources, err := discoveryClient.ServerResourcesForGroupVersion("custom.metrics.k8s.io/v1beta2")
if err != nil {
klog.Warningf("Failed to query custom.metrics.k8s.io/v1beta2 resource in cluster(%s): %+v", cluster.Name, err)
return []provider.CustomMetricInfo{}
}
for _, resource := range resources.APIResources {
groupResourceAndMetricName := strings.SplitN(resource.Name, "/", 1)
if len(groupResourceAndMetricName) != 2 {
klog.Warningf("Failed to query custom.metrics.k8s.io/v1beta2 resource in cluster(%s): %+v", cluster.Name, err)
continue
}
customMetricInfos = append(customMetricInfos, provider.CustomMetricInfo{
GroupResource: schema.ParseGroupResource(groupResourceAndMetricName[0]),
Namespaced: resource.Namespaced,
Metric: groupResourceAndMetricName[1],
})
}
}
return customMetricInfos
}

func discoveryClientForClusterFunc(clusterLister clusterlister.ClusterLister,
secretLister listcorev1.SecretLister) func(string) (*discovery.DiscoveryClient, error) {
clusterGetter := func(cluster string) (*clusterv1alpha1.Cluster, error) {
return clusterLister.Get(cluster)
}
secretGetter := func(namespace string, name string) (*corev1.Secret, error) {
return secretLister.Secrets(namespace).Get(name)
}

return func(clusterName string) (*discovery.DiscoveryClient, error) {
clusterConfig, err := util.BuildClusterConfig(clusterName, clusterGetter, secretGetter)
if err != nil {
return nil, err
}
return discovery.NewDiscoveryClientForConfigOrDie(clusterConfig), nil
}
}

func supportedMetricsAPIVersionAvailable(discoveredAPIGroups *metav1.APIGroupList) bool {
for _, discoveredAPIGroup := range discoveredAPIGroups.Groups {
if discoveredAPIGroup.Name != custom_metrics.GroupName {
continue
}
for _, version := range discoveredAPIGroup.Versions {
for _, supportedVersion := range supportedMetricsAPIVersions {
if version.Version == supportedVersion {
return true
}
}
}
}
return false
}

0 comments on commit 26ebaa3

Please sign in to comment.