diff --git a/pkg/apis/internalversion/metric_type.go b/pkg/apis/internalversion/metric_type.go index fe29434cbb..03bd1521b6 100644 --- a/pkg/apis/internalversion/metric_type.go +++ b/pkg/apis/internalversion/metric_type.go @@ -23,50 +23,50 @@ import ( // Metric provides metrics configuration. type Metric struct { //+k8s:conversion-gen=false - metav1.TypeMeta + metav1.TypeMeta `json:",inline"` // Standard list metadata. // More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata - metav1.ObjectMeta + metav1.ObjectMeta `json:"metadata"` // Spec holds spec for metrics. - Spec MetricSpec + Spec MetricSpec `json:"spec"` } // MetricSpec holds spec for metrics. type MetricSpec struct { // Path is a restful service path. - Path string + Path string `json:"path"` // Metrics is a list of metric configurations. - Metrics []MetricConfig + Metrics []MetricConfig `json:"metrics"` } // MetricConfig provides metric configuration to a single metric type MetricConfig struct { // Name is the fully-qualified name of the metric. - Name string + Name string `json:"name"` // Help provides information about this metric. - Help string + Help string `json:"help"` // Kind is kind of metric (ex. counter, gauge, histogram). - Kind string + Kind string `json:"kind"` // Labels are metric labels. - Labels []MetricLabel + Labels []MetricLabel `json:"labels"` // Value is a CEL expression. - Value string + Value string `json:"value"` // Buckets is a list of buckets for a histogram metric. - Buckets []MetricBucket + Buckets []MetricBucket `json:"buckets"` } // MetricLabel holds label name and the value of the label. type MetricLabel struct { // Name is a label name. - Name string + Name string `json:"name"` // Value is a CEL expression. - Value string + Value string `json:"value"` } // MetricBucket is a single bucket for a metric. type MetricBucket struct { // Le is less-than or equal. - Le string + Le string `json:"le"` // Value is a CEL expression. - Value string + Value string `json:"value"` } diff --git a/pkg/config/config.go b/pkg/config/config.go index 1561cf58d5..746bef9581 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -322,12 +322,12 @@ func Load(ctx context.Context, src ...string) ([]InternalObject, error) { // Save saves the given objects to the given path. func Save(ctx context.Context, path string, objs []InternalObject) error { - err := os.MkdirAll(filepath.Dir(path), 0o750) + err := os.MkdirAll(filepath.Dir(path), 0750) if err != nil { return err } - file, err := os.OpenFile(filepath.Clean(path), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o640) + file, err := os.OpenFile(filepath.Clean(path), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0640) if err != nil { return err } @@ -414,10 +414,6 @@ func FilterWithTypeFromContext[T metav1.Object](ctx context.Context) (out []T) { if len(objs) == 0 { return nil } - fmt.Println("Filter with type from context") - for _, obj := range objs { - fmt.Printf("[] %T\n", obj) - } return FilterWithType[T](objs) } diff --git a/pkg/kwok/cmd/root.go b/pkg/kwok/cmd/root.go index aabdef3932..5926be717c 100644 --- a/pkg/kwok/cmd/root.go +++ b/pkg/kwok/cmd/root.go @@ -19,7 +19,6 @@ package cmd import ( "context" - "fmt" "os" "time" @@ -62,7 +61,6 @@ func NewCommand(ctx context.Context) *cobra.Command { SilenceErrors: true, Version: version.DisplayVersion(), RunE: func(cmd *cobra.Command, args []string) error { - fmt.Printf("ARGS=%v\ncmd=%+v\n", args, cmd) return runE(cmd.Context(), flags) }, } diff --git a/pkg/kwok/controllers/node_controller.go b/pkg/kwok/controllers/node_controller.go index caccc1caea..ef8cd66f93 100644 --- a/pkg/kwok/controllers/node_controller.go +++ b/pkg/kwok/controllers/node_controller.go @@ -26,7 +26,6 @@ import ( "text/template" "time" - "github.com/prometheus/client_golang/prometheus" "github.com/wzshiming/cron" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -270,6 +269,7 @@ func (c *NodeController) watchResources(ctx context.Context, opt metav1.ListOpti node := event.Object.(*corev1.Node) if c.need(node) { c.putNodeInfo(node) + c.applyMetrics(ctx, node) c.preprocessChan <- node if c.onNodeManagedFunc != nil { err = c.onNodeManagedFunc(ctx, node.Name) @@ -284,6 +284,7 @@ func (c *NodeController) watchResources(ctx context.Context, opt metav1.ListOpti node := event.Object.(*corev1.Node) if c.need(node) { c.putNodeInfo(node) + c.applyMetrics(ctx, node) c.preprocessChan <- node } case watch.Deleted: @@ -672,21 +673,20 @@ func (c *NodeController) applyMetrics(ctx context.Context, node *corev1.Node) { logger.Warn("Failed to instantiate evaluator for a node", "error", err.Error()) } - reg := kwokmetrics.DefaultMetrics() + reg := kwokmetrics.CustomMetric(node.Name) for _, metric := range metrics { - for _, m := range metric.Spec.Metrics { - var labels prometheus.Labels - if m.Labels != nil { - labels = make(prometheus.Labels, len(m.Labels)) - } - for _, l := range m.Labels { - labels[l.Name] = l.Value - } - + for i := range metric.Spec.Metrics { + m := &metric.Spec.Metrics[i] switch m.Kind { case kwokmetrics.KindGauge: - g, ok := reg.Gauge(m.Name) - if !ok { + g, err := reg.Gauge(m, node) + if err != nil { + logger.Warn( + "Failed to get/create gauge metric", + "metricConfig", metric.Name, + "metricName", m.Name, + "error", err.Error(), + ) continue } val, err := eval.Evaluate(m.Value) @@ -695,18 +695,31 @@ func (c *NodeController) applyMetrics(ctx context.Context, node *corev1.Node) { } g.Set(val) case kwokmetrics.KindCounter: - c, ok := reg.Counter(m.Name) - if !ok { + c, err := reg.Counter(m, node) + if err != nil { + logger.Warn( + "Failed to get/create counter metric", + "metricConfig", metric.Name, + "metricName", m.Name, + "error", err.Error(), + ) continue } val, err := eval.Evaluate(m.Value) if err != nil { logger.Warn("Failed to evaluate expression", "expression", m.Value) + continue } c.Add(val) case kwokmetrics.KindHistogram: - h, ok := reg.Histogram(m.Name) - if !ok { + h, err := reg.Histogram(m, node) + if err != nil { + logger.Warn( + "Failed to get/create histogram metric", + "metricConfig", metric.Name, + "metricName", m.Name, + "error", err.Error(), + ) continue } for _, b := range m.Buckets { diff --git a/pkg/kwok/metrics/cel/evaluate.go b/pkg/kwok/metrics/cel/evaluate.go index 0f3134610e..e091552a8a 100644 --- a/pkg/kwok/metrics/cel/evaluate.go +++ b/pkg/kwok/metrics/cel/evaluate.go @@ -26,14 +26,14 @@ import ( corev1 "k8s.io/api/core/v1" ) -// MetricsEvaluator is a common interface for all types +// Evaluator is a common interface for all types // that evaluate metrics -type MetricsEvaluator interface { +type Evaluator interface { Evaluate(exp string) (float64, error) } // NewNodeEvaluator returns a MetricEvaluator that is able to evaluate node metrics -func NewNodeEvaluator(node *corev1.Node) (MetricsEvaluator, error) { +func NewNodeEvaluator(node *corev1.Node) (Evaluator, error) { b, err := json.Marshal(node) if err != nil { return nil, err diff --git a/pkg/kwok/metrics/metrics.go b/pkg/kwok/metrics/metrics.go index fdce53a775..2004cbee2c 100644 --- a/pkg/kwok/metrics/metrics.go +++ b/pkg/kwok/metrics/metrics.go @@ -18,12 +18,19 @@ limitations under the License. package metrics import ( + "bytes" + "encoding/json" + "fmt" "net/http" + "sort" + "strconv" + "text/template" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/collectors" "github.com/prometheus/client_golang/prometheus/promhttp" + "sigs.k8s.io/kwok/pkg/apis/internalversion" "sigs.k8s.io/kwok/pkg/utils/maps" ) @@ -34,14 +41,17 @@ const ( KindCounter = "counter" ) +func init() { + defaultMetrics.reg = prometheus.NewRegistry() + defaultMetrics.reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) + defaultMetrics.reg.MustRegister(collectors.NewGoCollector(collectors.WithGoCollectorRuntimeMetrics(collectors.MetricsAll))) +} + // Registry provides prometheus and in-memory metrics for dynamic acces type Registry interface { - RegisterGauge(name string, g prometheus.Gauge) - RegisterCounter(name string, c prometheus.Counter) - RegisterHistogram(name string, h prometheus.Histogram) - Gauge(name string) (prometheus.Gauge, bool) - Counter(name string) (prometheus.Counter, bool) - Histogram(name string) (prometheus.Histogram, bool) + Gauge(m *internalversion.MetricConfig, target any) (prometheus.Gauge, error) + Counter(m *internalversion.MetricConfig, target any) (prometheus.Counter, error) + Histogram(m *internalversion.MetricConfig, target any) (prometheus.Histogram, error) Handler() http.Handler } @@ -52,35 +62,106 @@ type registry struct { histograms maps.SyncMap[string, prometheus.Histogram] } -func (m *registry) RegisterGauge(name string, g prometheus.Gauge) { - m.gauges.Store(name, g) - m.reg.MustRegister(g) -} - -func (m *registry) RegisterCounter(name string, c prometheus.Counter) { - m.counters.Store(name, c) - m.reg.MustRegister(c) -} - -func (m *registry) RegisterHistogram(name string, h prometheus.Histogram) { - m.histograms.Store(name, h) - m.reg.MustRegister(h) -} - -func (m *registry) Gauge(name string) (prometheus.Gauge, bool) { - return m.gauges.Load(name) +func (r *registry) Gauge(m *internalversion.MetricConfig, target any) (prometheus.Gauge, error) { + if m.Kind != KindGauge { + return nil, fmt.Errorf("expected metric type(%q), got(%q)", KindGauge, m.Kind) + } + + val, ok := r.gauges.Load(m.Name) + if ok { + return val, nil + } + + labels, err := metricLabels(m, target) + if err != nil { + return nil, err + } + + val = prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: m.Name, + Help: m.Help, + ConstLabels: labels, + }, + ) + + r.gauges.Store(m.Name, val) + r.reg.Register(val) + return val, nil } -func (m *registry) Counter(name string) (prometheus.Counter, bool) { - return m.counters.Load(name) +func (r *registry) Counter(m *internalversion.MetricConfig, target any) (prometheus.Counter, error) { + if m.Kind != KindCounter { + return nil, fmt.Errorf("expected metric type(%q), got(%q)", KindCounter, m.Kind) + } + + val, ok := r.counters.Load(m.Name) + if ok { + return val, nil + } + + labels, err := metricLabels(m, target) + if err != nil { + return nil, err + } + + val = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: m.Name, + Help: m.Help, + ConstLabels: labels, + }, + ) + + r.counters.Store(m.Name, val) + r.reg.Register(val) + return val, nil } -func (m *registry) Histogram(name string) (prometheus.Histogram, bool) { - return m.histograms.Load(name) +func (r *registry) Histogram(m *internalversion.MetricConfig, target any) (prometheus.Histogram, error) { + if m.Kind != KindHistogram { + return nil, fmt.Errorf("expected metric type(%q), got(%q)", KindCounter, m.Kind) + } + + val, ok := r.histograms.Load(m.Name) + if ok { + return val, nil + } + + labels, err := metricLabels(m, target) + if err != nil { + return nil, err + } + var bucketValues []float64 + for _, b := range m.Buckets { + if b.Le == "+Inf" { + continue + } + v, err := strconv.ParseFloat(b.Le, 64) + if err != nil { + return nil, err + } + bucketValues = append(bucketValues, v) + } + buckets := sort.Float64Slice(bucketValues) + buckets.Sort() + + val = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Name: m.Name, + Help: m.Help, + Buckets: buckets, + ConstLabels: labels, + }, + ) + + r.histograms.Store(m.Name, val) + r.reg.Register(val) + return val, nil } -func (m *registry) Handler() http.Handler { - return promhttp.InstrumentMetricHandler(prometheus.DefaultRegisterer, promhttp.HandlerFor(m.reg, promhttp.HandlerOpts{Registry: m.reg})) +func (r *registry) Handler() http.Handler { + return promhttp.InstrumentMetricHandler(prometheus.DefaultRegisterer, promhttp.HandlerFor(r.reg, promhttp.HandlerOpts{Registry: r.reg})) } var defaultMetrics registry @@ -99,8 +180,41 @@ func CustomMetric(name string) Registry { return v } -func init() { - defaultMetrics.reg = prometheus.NewRegistry() - defaultMetrics.reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})) - defaultMetrics.reg.MustRegister(collectors.NewGoCollector(collectors.WithGoCollectorRuntimeMetrics(collectors.MetricsAll))) +func metricLabels(m *internalversion.MetricConfig, data any) (prometheus.Labels, error) { + if len(m.Labels) == 0 { + return nil, nil + } + + labels := make(prometheus.Labels, len(m.Labels)) + for _, l := range m.Labels { + var err error + labels[l.Name], err = executeValueTemplate(l.Value, data) + if err != nil { + return nil, err + } + } + return labels, nil +} + +func executeValueTemplate(tmpl string, data any) (string, error) { + pathTmpl, err := template.New("path").Parse(tmpl) + if err != nil { + return "", fmt.Errorf("failed to parse path template: %v", err) + } + + b, err := json.Marshal(data) + if err != nil { + return "", fmt.Errorf("failed to marshal metric: %v", err) + } + + d := make(map[string]any) + if err := json.Unmarshal(b, &data); err != nil { + return "", fmt.Errorf("failed to unmarshal data: %v", err) + } + + var buf bytes.Buffer + if err := pathTmpl.Execute(&buf, d); err != nil { + return "", fmt.Errorf("failed to execute path template: %v", err) + } + return buf.String(), nil } diff --git a/pkg/kwok/server/metrics.go b/pkg/kwok/server/metrics.go index 3795f357b8..f613f6121b 100644 --- a/pkg/kwok/server/metrics.go +++ b/pkg/kwok/server/metrics.go @@ -20,12 +20,8 @@ import ( "bytes" "encoding/json" "fmt" - "sort" - "strconv" "text/template" - "github.com/prometheus/client_golang/prometheus" - "sigs.k8s.io/kwok/pkg/apis/internalversion" "sigs.k8s.io/kwok/pkg/kwok/metrics" ) @@ -39,13 +35,17 @@ func (s *Server) InstallMetrics() { if err != nil { panic(fmt.Sprintf("Failed to execute path template: %v", err)) } - reg := metrics.CustomMetric(path) - installMetrics(reg, m.Spec.Metrics) + reg := metrics.CustomMetric(m.Name) s.restfulCont.Handle(path, reg.Handler()) } } func executePathTemplate(path string, m *internalversion.Metric) (string, error) { + pathTmpl, err := template.New("path").Parse(path) + if err != nil { + return "", fmt.Errorf("failed to parse path template: %v", err) + } + b, err := json.Marshal(m) if err != nil { return "", fmt.Errorf("failed to marshal metric: %v", err) @@ -55,75 +55,10 @@ func executePathTemplate(path string, m *internalversion.Metric) (string, error) if err := json.Unmarshal(b, &data); err != nil { return "", fmt.Errorf("failed to unmarshal data: %v", err) } - pathTmpl, err := template.New("path").Parse(path) - if err != nil { - return "", fmt.Errorf("failed to parse path template: %v", err) - } + var buf bytes.Buffer if err := pathTmpl.Execute(&buf, data); err != nil { return "", fmt.Errorf("failed to execute path template: %v", err) } return buf.String(), nil } - -func installMetrics(reg metrics.Registry, ms []internalversion.MetricConfig) { - for _, m := range ms { - var labels prometheus.Labels - if m.Labels != nil { - labels = make(prometheus.Labels, len(m.Labels)) - for _, l := range m.Labels { - labels[l.Name] = l.Value - } - } - switch m.Kind { - case metrics.KindGauge: - reg.RegisterGauge( - m.Name, - prometheus.NewGauge( - prometheus.GaugeOpts{ - Name: m.Name, - Help: m.Help, - ConstLabels: labels, - }, - ), - ) - case metrics.KindCounter: - reg.RegisterCounter( - m.Name, - prometheus.NewCounter( - prometheus.CounterOpts{ - Name: m.Name, - Help: m.Help, - ConstLabels: labels, - }, - ), - ) - case metrics.KindHistogram: - var bucketValues []float64 - for _, b := range m.Buckets { - if b.Le == "+Inf" { - continue - } - v, err := strconv.ParseFloat(b.Le, 64) - if err != nil { - panic(fmt.Sprintf("float parsing failed on value: %v", b)) - } - bucketValues = append(bucketValues, v) - } - buckets := sort.Float64Slice(bucketValues) - buckets.Sort() - - reg.RegisterHistogram( - m.Name, - prometheus.NewHistogram( - prometheus.HistogramOpts{ - Name: m.Name, - Help: m.Help, - Buckets: buckets, - ConstLabels: labels, - }, - ), - ) - } - } -} diff --git a/test/kwokctl/metric.yaml b/test/kwokctl/metric.yaml index 247e3f48c4..dd60b90af5 100644 --- a/test/kwokctl/metric.yaml +++ b/test/kwokctl/metric.yaml @@ -11,7 +11,7 @@ spec: kind: gauge labels: - name: node - value: 'node.metadata.name' + value: '{{ .metadata.name }}' value: '1.0' - name: kubelet_started_containers_total # For special cases, we can also consider providing special functions to provide data,