Skip to content

Commit

Permalink
Merge pull request #739 from wzshiming/feat/crd-metrics
Browse files Browse the repository at this point in the history
Support CRD for Metric
  • Loading branch information
wzshiming authored Jul 24, 2023
2 parents 549e838 + 9a2bb93 commit 70c52c9
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 46 deletions.
32 changes: 29 additions & 3 deletions pkg/config/resources/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package resources

import (
"context"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -38,22 +37,28 @@ type ConvertFunc[O any, T runtime.Object, S ~[]T] func(objs S) O

// NewDynamicGetter returns a new Getter that returns the latest list of resources.
func NewDynamicGetter[O any, T runtime.Object, L runtime.Object](syncer Syncer[T, L], convertFunc ConvertFunc[O, T, []T]) DynamicGetter[O] {
syncCh := make(chan struct{}, 1)
syncCh <- struct{}{}
getter := &dynamicGetter[O, T, L]{
syncCh: syncCh,
syncer: syncer,
convertFunc: convertFunc,
}

return struct {
Getter[O]
Starter
Synced
}{
Getter: withCache[O](getter),
Starter: getter,
Synced: getter,
}
}

type dynamicGetter[O any, T runtime.Object, L runtime.Object] struct {
syncer Syncer[T, L]
syncCh chan struct{}

convertFunc ConvertFunc[O, T, []T]

Expand All @@ -73,8 +78,18 @@ func (c *dynamicGetter[O, T, L]) Start(ctx context.Context) error {
},
},
t,
10*time.Second,
cache.ResourceEventHandlerFuncs{},
0,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
c.sync()
},
UpdateFunc: func(oldObj, newObj interface{}) {
c.sync()
},
DeleteFunc: func(obj interface{}) {
c.sync()
},
},
)

c.store = store
Expand All @@ -97,3 +112,14 @@ func (c *dynamicGetter[O, T, L]) Get() O {
func (c *dynamicGetter[O, T, L]) Version() string {
return c.controller.LastSyncResourceVersion()
}

func (c *dynamicGetter[O, T, L]) Sync() <-chan struct{} {
return c.syncCh
}

func (c *dynamicGetter[O, T, L]) sync() {
select {
case c.syncCh <- struct{}{}:
default:
}
}
11 changes: 11 additions & 0 deletions pkg/config/resources/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,20 @@ type Getter[O any] interface {
type DynamicGetter[O any] interface {
Getter[O]
Starter
Synced
}

// Starter is an interface for starting resources.
type Starter interface {
Start(ctx context.Context) error
}

// CacheGetter is an interface for getting resources that are cached.
type CacheGetter[O any] interface {
Getter[O]
}

// Synced is an interface for getting resources that are synced.
type Synced interface {
Sync() <-chan struct{}
}
18 changes: 12 additions & 6 deletions pkg/kwok/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func runE(ctx context.Context, flags *flagpole) error {
TypedClient: typedClient,
TypedKwokClient: typedKwokClient,
EnableCNI: flags.Options.EnableCNI,
EnableMetrics: len(metrics) != 0,
EnableMetrics: len(metrics) != 0 || slices.Contains(flags.Options.EnableCRDs, v1alpha1.MetricKind),
ManageAllNodes: flags.Options.ManageAllNodes,
ManageNodesWithAnnotationSelector: flags.Options.ManageNodesWithAnnotationSelector,
ManageNodesWithLabelSelector: flags.Options.ManageNodesWithLabelSelector,
Expand Down Expand Up @@ -309,11 +309,6 @@ func runE(ctx context.Context, flags *flagpole) error {
if err != nil {
return fmt.Errorf("failed to create server: %w", err)
}

err = svc.InstallMetrics()
if err != nil {
return fmt.Errorf("failed to install metrics: %w", err)
}
svc.InstallHealthz()

if flags.Options.EnableDebuggingHandlers {
Expand All @@ -322,6 +317,17 @@ func runE(ctx context.Context, flags *flagpole) error {
} else {
svc.InstallDebuggingDisabledHandlers()
}

err = svc.InstallCRD(ctx)
if err != nil {
return fmt.Errorf("failed to install crd: %w", err)
}

err = svc.InstallMetrics(ctx)
if err != nil {
return fmt.Errorf("failed to install metrics: %w", err)
}

go func() {
err := svc.Run(ctx, serverAddress, flags.Options.TLSCertFile, flags.Options.TLSPrivateKeyFile)
if err != nil {
Expand Down
13 changes: 11 additions & 2 deletions pkg/kwok/metrics/cel/evaluate.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package cel
import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -78,6 +79,7 @@ type Environment struct {
env *easycel.Environment
conf NodeEvaluatorConfig
cacheEvaluator map[string]*Evaluator
cacheMut sync.Mutex
resultCacheVer *int64
}

Expand Down Expand Up @@ -202,6 +204,9 @@ func (e *Environment) init() error {
// Compile is responsible for compiling a cel program
func (e *Environment) Compile(src string) (*Evaluator, error) {
if e.cacheEvaluator != nil {
e.cacheMut.Lock()
defer e.cacheMut.Unlock()

if evaluator, ok := e.cacheEvaluator[src]; ok {
return evaluator, nil
}
Expand Down Expand Up @@ -234,8 +239,9 @@ type Evaluator struct {
latestCacheVer *int64
cacheVer int64

cache map[string]ref.Val
program cel.Program
cache map[string]ref.Val
cacheMut sync.Mutex
program cel.Program
}

func resultUniqueKey(node *corev1.Node, pod *corev1.Pod, container *corev1.Container) string {
Expand All @@ -255,6 +261,9 @@ func resultUniqueKey(node *corev1.Node, pod *corev1.Pod, container *corev1.Conta
func (e *Evaluator) evaluate(data Data) (ref.Val, error) {
var key string
if e.latestCacheVer != nil {
e.cacheMut.Lock()
defer e.cacheMut.Unlock()

if e.cache == nil || *e.latestCacheVer != e.cacheVer {
e.cache = map[string]ref.Val{}
e.cacheVer = *e.latestCacheVer
Expand Down
14 changes: 2 additions & 12 deletions pkg/kwok/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ import (

// UpdateHandler handles updating metrics on request
type UpdateHandler struct {
name string
metric *internalversion.Metric
controller *controllers.Controller
environment *cel.Environment

Expand All @@ -50,8 +48,6 @@ type UpdateHandler struct {

// UpdateHandlerConfig is configuration for a single node
type UpdateHandlerConfig struct {
NodeName string
Metrics *internalversion.Metric
Controller *controllers.Controller
Environment *cel.Environment
}
Expand All @@ -64,8 +60,6 @@ func NewMetricsUpdateHandler(conf UpdateHandlerConfig) *UpdateHandler {
)

h := &UpdateHandler{
name: conf.NodeName,
metric: conf.Metrics,
controller: conf.Controller,
environment: conf.Environment,
registry: registry,
Expand Down Expand Up @@ -497,13 +491,12 @@ func uniqueKey(name string, kind internalversion.Kind, labels map[string]string)
return builder.String()
}

func (h *UpdateHandler) update(ctx context.Context) {
// Update updates metrics for a node
func (h *UpdateHandler) Update(ctx context.Context, nodeName string, metrics []internalversion.MetricConfig) {
logger := log.FromContext(ctx)
has := map[string]struct{}{}
// Update metrics
h.environment.ClearResultCache()
metrics := h.metric.Spec.Metrics
nodeName := h.metric.Name
for _, metric := range metrics {
metric := metric
metricName := metric.Name
Expand Down Expand Up @@ -548,9 +541,6 @@ func (h *UpdateHandler) update(ctx context.Context) {
}

func (h *UpdateHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
h.update(ctx)

// Serve metrics
h.handler.ServeHTTP(w, r)
}
89 changes: 79 additions & 10 deletions pkg/kwok/server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,27 @@ limitations under the License.
package server

import (
"context"
"fmt"
"strings"

"github.com/emicklei/go-restful/v3"
"github.com/prometheus/client_golang/prometheus/promhttp"

"sigs.k8s.io/kwok/pkg/apis/internalversion"
"sigs.k8s.io/kwok/pkg/config/resources"
"sigs.k8s.io/kwok/pkg/kwok/metrics"
"sigs.k8s.io/kwok/pkg/kwok/metrics/cel"
"sigs.k8s.io/kwok/pkg/log"
)

// InstallMetrics registers the metrics handler on the given mux.
func (s *Server) InstallMetrics() error {
func (s *Server) InstallMetrics(ctx context.Context) error {
promHandler := promhttp.Handler()
s.restfulCont.Handle("/metrics", promHandler)

selfMetric := func(req *restful.Request, resp *restful.Response) {
promHandler.ServeHTTP(resp.ResponseWriter, req.Request)
}

controller := s.controller
env, err := cel.NewEnvironment(cel.NodeEvaluatorConfig{
Expand All @@ -45,14 +54,74 @@ func (s *Server) InstallMetrics() error {
if err != nil {
return fmt.Errorf("failed to create CEL environment: %w", err)
}
for _, m := range s.metrics {
handler := metrics.NewMetricsUpdateHandler(metrics.UpdateHandlerConfig{
NodeName: m.Name,
Metrics: m,
Controller: controller,
Environment: env,
})
s.restfulCont.Handle(m.Spec.Path, handler)

const rootPath = "/metrics"
ws := new(restful.WebService)
ws.Path(rootPath)
ws.Route(ws.GET("/").To(selfMetric))

for _, m := range s.metrics.Get() {
if !strings.HasPrefix(m.Spec.Path, rootPath) {
return fmt.Errorf("metric path %q does not start with %q", m.Spec.Path, rootPath)
}

ws.Route(ws.GET(strings.TrimPrefix(m.Spec.Path, rootPath)).
To(s.getMetrics(m, env)))
}

s.restfulCont.Add(ws)
s.metricsWebService = ws

logger := log.FromContext(ctx)
syncd, ok := s.metrics.(resources.Synced)
if ok {
logger.Info("Starting metrics syncer")
go func() {
for range syncd.Sync() {
logger.Info("Metrics synced, updating metrics web service")
ws := new(restful.WebService)
ws.Path(rootPath)
ws.Route(ws.GET("/").To(selfMetric))

for _, m := range s.metrics.Get() {
if !strings.HasPrefix(m.Spec.Path, rootPath) {
logger.Warn("metric path does not start with "+rootPath, "path", m.Spec.Path)
continue
}
ws.Route(ws.GET(strings.TrimPrefix(m.Spec.Path, rootPath)).
To(s.getMetrics(m, env)))
}

err := s.restfulCont.Remove(s.metricsWebService)
if err != nil {
logger.Error("failed to remove metrics web service", err)
}
s.restfulCont.Add(ws)
s.metricsWebService = ws
}
}()
}

return nil
}

func (s *Server) getMetrics(metric *internalversion.Metric, env *cel.Environment) func(req *restful.Request, resp *restful.Response) {
return func(req *restful.Request, resp *restful.Response) {
nodeName := req.PathParameter("nodeName")
if nodeName == "" {
nodeName = metric.Name
}

handler, ok := s.metricsUpdateHandler.Load(nodeName)
if !ok {
handler = metrics.NewMetricsUpdateHandler(metrics.UpdateHandlerConfig{
Controller: s.controller,
Environment: env,
})
s.metricsUpdateHandler.Store(nodeName, handler)
}

handler.Update(req.Request.Context(), nodeName, metric.Spec.Metrics)
handler.ServeHTTP(resp.ResponseWriter, req.Request)
}
}
Loading

0 comments on commit 70c52c9

Please sign in to comment.