From d5e8b9d897700d1fd747485b50fbedf40fdd34a0 Mon Sep 17 00:00:00 2001 From: Leo Li Date: Thu, 4 Jul 2024 15:03:46 -0400 Subject: [PATCH 1/7] feat: Move the broker filter statsReporter to metrics package --- cmd/broker/filter/main.go | 3 +- pkg/broker/filter/filter_handler.go | 54 ++++++++-------- pkg/broker/filter/filter_handler_test.go | 39 ++++++------ pkg/broker/filter/stats_reporter_test.go | 30 ++++----- .../filter => metrics}/stats_reporter.go | 63 +++++++++---------- 5 files changed, 96 insertions(+), 93 deletions(-) rename pkg/{broker/filter => metrics}/stats_reporter.go (79%) diff --git a/cmd/broker/filter/main.go b/cmd/broker/filter/main.go index 167a0e888d6..b8d422ebf06 100644 --- a/cmd/broker/filter/main.go +++ b/cmd/broker/filter/main.go @@ -24,6 +24,7 @@ import ( "github.com/google/uuid" "github.com/kelseyhightower/envconfig" "go.uber.org/zap" + eventingmetrics "knative.dev/eventing/pkg/metrics" kubeclient "knative.dev/pkg/client/injection/kube/client" configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/filtered" filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered" @@ -146,7 +147,7 @@ func main() { logger.Fatal("Error setting up trace publishing", zap.Error(err)) } - reporter := filter.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String())) + reporter := eventingmetrics.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String())) oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) // We are running both the receiver (takes messages in from the Broker) and the dispatcher (send diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index 5276b6f0a5e..23c926b6067 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -26,6 +26,8 @@ import ( "net/http" "time" + "knative.dev/eventing/pkg/metrics" + opencensusclient "github.com/cloudevents/sdk-go/observability/opencensus/v2/client" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/cloudevents/sdk-go/v2/binding" @@ -75,7 +77,7 @@ const ( // Handler parses Cloud Events, determines if they pass a filter, and sends them to a subscriber. type Handler struct { // reporter reports stats of status code and dispatch time - reporter StatsReporter + reporter metrics.StatsReporter eventDispatcher *kncloudevents.Dispatcher @@ -89,7 +91,7 @@ type Handler struct { } // NewHandler creates a new Handler and its associated EventReceiver. -func NewHandler(logger *zap.Logger, tokenVerifier *auth.OIDCTokenVerifier, oidcTokenProvider *auth.OIDCTokenProvider, triggerInformer v1.TriggerInformer, brokerInformer v1.BrokerInformer, reporter StatsReporter, trustBundleConfigMapLister corev1listers.ConfigMapNamespaceLister, wc func(ctx context.Context) context.Context) (*Handler, error) { +func NewHandler(logger *zap.Logger, tokenVerifier *auth.OIDCTokenVerifier, oidcTokenProvider *auth.OIDCTokenProvider, triggerInformer v1.TriggerInformer, brokerInformer v1.BrokerInformer, reporter metrics.StatsReporter, trustBundleConfigMapLister corev1listers.ConfigMapNamespaceLister, wc func(ctx context.Context) context.Context) (*Handler, error) { kncloudevents.ConfigureConnectionArgs(&kncloudevents.ConnectionArgs{ MaxIdleConns: defaultMaxIdleConnections, MaxIdleConnsPerHost: defaultMaxIdleConnectionsPerHost, @@ -245,17 +247,17 @@ func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eve target := broker.Status.Address - reportArgs := &ReportArgs{ - ns: trigger.Namespace, - trigger: trigger.Name, - broker: brokerRef, - requestType: "reply_forward", + reportArgs := &metrics.ReportArgs{ + Ns: trigger.Namespace, + Trigger: trigger.Name, + Broker: brokerRef, + RequestType: "reply_forward", } if request.TLS != nil { - reportArgs.requestScheme = "https" + reportArgs.RequestScheme = "https" } else { - reportArgs.requestScheme = "http" + reportArgs.RequestScheme = "http" } h.logger.Info("sending to reply", zap.Any("target", target)) @@ -296,17 +298,17 @@ func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *event } } - reportArgs := &ReportArgs{ - ns: trigger.Namespace, - trigger: trigger.Name, - broker: trigger.Spec.Broker, - requestType: "dls_forward", + reportArgs := &metrics.ReportArgs{ + Ns: trigger.Namespace, + Trigger: trigger.Name, + Broker: trigger.Spec.Broker, + RequestType: "dls_forward", } if request.TLS != nil { - reportArgs.requestScheme = "https" + reportArgs.RequestScheme = "https" } else { - reportArgs.requestScheme = "http" + reportArgs.RequestScheme = "http" } h.logger.Info("sending to dls", zap.Any("target", target)) @@ -343,18 +345,18 @@ func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger h.logger.Warn("Failed to delete TTL.", zap.Error(err)) } - reportArgs := &ReportArgs{ - ns: trigger.Namespace, - trigger: trigger.Name, - broker: brokerRef, - filterType: triggerFilterAttribute(trigger.Spec.Filter, "type"), - requestType: "filter", + reportArgs := &metrics.ReportArgs{ + Ns: trigger.Namespace, + Trigger: trigger.Name, + Broker: brokerRef, + FilterType: triggerFilterAttribute(trigger.Spec.Filter, "type"), + RequestType: "filter", } if request.TLS != nil { - reportArgs.requestScheme = "https" + reportArgs.RequestScheme = "https" } else { - reportArgs.requestScheme = "http" + reportArgs.RequestScheme = "http" } subscriberURI := trigger.Status.SubscriberURI @@ -386,7 +388,7 @@ func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger h.send(ctx, writer, utils.PassThroughHeaders(request.Header), target, reportArgs, event, trigger, ttl) } -func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target duckv1.Addressable, reportArgs *ReportArgs, event *cloudevents.Event, t *eventingv1.Trigger, ttl int32) { +func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target duckv1.Addressable, reportArgs *metrics.ReportArgs, event *cloudevents.Event, t *eventingv1.Trigger, ttl int32) { additionalHeaders := headers.Clone() additionalHeaders.Set(apis.KnNamespaceHeader, t.GetNamespace()) @@ -519,7 +521,7 @@ func (h *Handler) writeResponse(ctx context.Context, writer http.ResponseWriter, return dispatchInfo.ResponseCode, nil } -func (h *Handler) reportArrivalTime(event *event.Event, reportArgs *ReportArgs) { +func (h *Handler) reportArrivalTime(event *event.Event, reportArgs *metrics.ReportArgs) { // Record the event processing time. This might be off if the receiver and the filter pods are running in // different nodes with different clocks. var arrivalTimeStr string diff --git a/pkg/broker/filter/filter_handler_test.go b/pkg/broker/filter/filter_handler_test.go index e6bf900ec65..7b73f1c6b6d 100644 --- a/pkg/broker/filter/filter_handler_test.go +++ b/pkg/broker/filter/filter_handler_test.go @@ -37,6 +37,7 @@ import ( "go.uber.org/zap/zaptest" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "knative.dev/eventing/pkg/metrics" "knative.dev/pkg/apis" configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/fake" "knative.dev/pkg/logging" @@ -515,14 +516,14 @@ func TestReceiver(t *testing.T) { if tc.expectedDispatch != fh.requestReceived { t.Errorf("Incorrect dispatch. Expected %v, Actual %v", tc.expectedDispatch, fh.requestReceived) } - if tc.expectedEventCount != reporter.eventCountReported { - t.Errorf("Incorrect event count reported metric. Expected %v, Actual %v", tc.expectedEventCount, reporter.eventCountReported) + if tc.expectedEventCount != reporter.EventCountReported { + t.Errorf("Incorrect event count reported metric. Expected %v, Actual %v", tc.expectedEventCount, reporter.EventCountReported) } - if tc.expectedEventDispatchTime != reporter.eventDispatchTimeReported { - t.Errorf("Incorrect event dispatch time reported metric. Expected %v, Actual %v", tc.expectedEventDispatchTime, reporter.eventDispatchTimeReported) + if tc.expectedEventDispatchTime != reporter.EventDispatchTimeReported { + t.Errorf("Incorrect event dispatch time reported metric. Expected %v, Actual %v", tc.expectedEventDispatchTime, reporter.EventDispatchTimeReported) } - if tc.expectedEventProcessingTime != reporter.eventProcessingTimeReported { - t.Errorf("Incorrect event processing time reported metric. Expected %v, Actual %v", tc.expectedEventProcessingTime, reporter.eventProcessingTimeReported) + if tc.expectedEventProcessingTime != reporter.EventProcessingTimeReported { + t.Errorf("Incorrect event processing time reported metric. Expected %v, Actual %v", tc.expectedEventProcessingTime, reporter.EventProcessingTimeReported) } if tc.expectedResponseEvent != nil { if tc.expectedResponseEvent.SpecVersion() != event.CloudEventsVersionV1 { @@ -705,11 +706,11 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) { if tc.expectedDispatch != fh.requestReceived { t.Errorf("Incorrect dispatch. Expected %v, Actual %v", tc.expectedDispatch, fh.requestReceived) } - if tc.expectedEventCount != reporter.eventCountReported { - t.Errorf("Incorrect event count reported metric. Expected %v, Actual %v", tc.expectedEventCount, reporter.eventCountReported) + if tc.expectedEventCount != reporter.EventCountReported { + t.Errorf("Incorrect event count reported metric. Expected %v, Actual %v", tc.expectedEventCount, reporter.EventCountReported) } - if tc.expectedEventDispatchTime != reporter.eventDispatchTimeReported { - t.Errorf("Incorrect event dispatch time reported metric. Expected %v, Actual %v", tc.expectedEventDispatchTime, reporter.eventDispatchTimeReported) + if tc.expectedEventDispatchTime != reporter.EventDispatchTimeReported { + t.Errorf("Incorrect event dispatch time reported metric. Expected %v, Actual %v", tc.expectedEventDispatchTime, reporter.EventDispatchTimeReported) } // Compare the returned event. message := cehttp.NewMessageFromHttpResponse(response) @@ -743,23 +744,23 @@ func (r *responseWriterWithInvocationsCheck) WriteHeader(statusCode int) { } type mockReporter struct { - eventCountReported bool - eventDispatchTimeReported bool - eventProcessingTimeReported bool + EventCountReported bool + EventDispatchTimeReported bool + EventProcessingTimeReported bool } -func (r *mockReporter) ReportEventCount(args *ReportArgs, responseCode int) error { - r.eventCountReported = true +func (r *mockReporter) ReportEventCount(args *metrics.ReportArgs, responseCode int) error { + r.EventCountReported = true return nil } -func (r *mockReporter) ReportEventDispatchTime(args *ReportArgs, responseCode int, d time.Duration) error { - r.eventDispatchTimeReported = true +func (r *mockReporter) ReportEventDispatchTime(args *metrics.ReportArgs, responseCode int, d time.Duration) error { + r.EventDispatchTimeReported = true return nil } -func (r *mockReporter) ReportEventProcessingTime(args *ReportArgs, d time.Duration) error { - r.eventProcessingTimeReported = true +func (r *mockReporter) ReportEventProcessingTime(args *metrics.ReportArgs, d time.Duration) error { + r.EventProcessingTimeReported = true return nil } diff --git a/pkg/broker/filter/stats_reporter_test.go b/pkg/broker/filter/stats_reporter_test.go index da1635dfe0f..296bfdcaac3 100644 --- a/pkg/broker/filter/stats_reporter_test.go +++ b/pkg/broker/filter/stats_reporter_test.go @@ -30,14 +30,14 @@ import ( func TestStatsReporter(t *testing.T) { setup() - args := &ReportArgs{ - ns: "testns", - trigger: "testtrigger", - broker: "testbroker", - filterType: "testeventtype", + args := &metrics.ReportArgs{ + Ns: "testns", + Trigger: "testtrigger", + Broker: "testbroker", + FilterType: "testeventtype", } - r := NewStatsReporter("testcontainer", "testpod") + r := metrics.NewStatsReporter("testcontainer", "testpod") wantTags := map[string]string{ metrics.LabelFilterType: "testeventtype", @@ -95,18 +95,18 @@ func TestStatsReporter(t *testing.T) { func TestReporterEmptySourceAndTypeFilter(t *testing.T) { setup() - args := &ReportArgs{ - ns: "testns", - trigger: "testtrigger", - broker: "testbroker", - filterType: "", - requestScheme: "http", + args := &metrics.ReportArgs{ + Ns: "testns", + Trigger: "testtrigger", + Broker: "testbroker", + FilterType: "", + RequestScheme: "http", } - r := NewStatsReporter("testcontainer", "testpod") + r := metrics.NewStatsReporter("testcontainer", "testpod") wantTags := map[string]string{ - metrics.LabelFilterType: anyValue, + metrics.LabelFilterType: metrics.AnyValue, metrics.LabelResponseCode: "202", metrics.LabelResponseCodeClass: "2xx", broker.LabelContainerName: "testcontainer", @@ -156,5 +156,5 @@ func resetMetrics() { "event_count", "event_dispatch_latencies", "event_processing_latencies") - register() + metrics.Register() } diff --git a/pkg/broker/filter/stats_reporter.go b/pkg/metrics/stats_reporter.go similarity index 79% rename from pkg/broker/filter/stats_reporter.go rename to pkg/metrics/stats_reporter.go index c2df9b3c9b5..34fe8fc39c3 100644 --- a/pkg/broker/filter/stats_reporter.go +++ b/pkg/metrics/stats_reporter.go @@ -1,5 +1,5 @@ /* -Copyright 2020 The Knative Authors +Copyright 2024 The Knative Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package filter +package metrics import ( "context" @@ -22,19 +22,18 @@ import ( "strconv" "time" + "knative.dev/pkg/metrics" + "knative.dev/pkg/metrics/metricskey" + "go.opencensus.io/resource" "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" - broker "knative.dev/eventing/pkg/broker" - eventingmetrics "knative.dev/eventing/pkg/metrics" - "knative.dev/pkg/metrics" - "knative.dev/pkg/metrics/metricskey" ) const ( // anyValue is the default value if the trigger filter attributes are empty. - anyValue = "any" + AnyValue = "any" ) var ( @@ -67,24 +66,24 @@ var ( // go.opencensus.io/tag/validate.go. Currently those restrictions are: // - length between 1 and 255 inclusive // - characters are printable US-ASCII - triggerFilterTypeKey = tag.MustNewKey(eventingmetrics.LabelFilterType) + triggerFilterTypeKey = tag.MustNewKey(LabelFilterType) triggerFilterRequestTypeKey = tag.MustNewKey("filter_request_type") - triggerFilterRequestSchemeKey = tag.MustNewKey(eventingmetrics.LabelEventScheme) - responseCodeKey = tag.MustNewKey(eventingmetrics.LabelResponseCode) - responseCodeClassKey = tag.MustNewKey(eventingmetrics.LabelResponseCodeClass) + triggerFilterRequestSchemeKey = tag.MustNewKey(LabelEventScheme) + responseCodeKey = tag.MustNewKey(LabelResponseCode) + responseCodeClassKey = tag.MustNewKey(LabelResponseCodeClass) ) type ReportArgs struct { - ns string - trigger string - broker string - filterType string - requestType string - requestScheme string + Ns string + Trigger string + Broker string + FilterType string + RequestType string + RequestScheme string } func init() { - register() + Register() } // StatsReporter defines the interface for sending filter metrics. @@ -111,26 +110,26 @@ func NewStatsReporter(container, uniqueName string) StatsReporter { } } -func register() { +func Register() { // Create view to see our measurements. err := metrics.RegisterResourceView( &view.View{ Description: eventCountM.Description(), Measure: eventCountM, Aggregation: view.Count(), - TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey}, + TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, responseCodeKey, responseCodeClassKey, tag.MustNewKey("unique_name"), tag.MustNewKey("container_name")}, }, &view.View{ Description: dispatchTimeInMsecM.Description(), Measure: dispatchTimeInMsecM, Aggregation: view.Distribution(metrics.Buckets125(1, 10000)...), // 1, 2, 5, 10, 20, 50, 100, 1000, 5000, 10000 - TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey}, + TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, responseCodeKey, responseCodeClassKey, tag.MustNewKey("unique_name"), tag.MustNewKey("container_name")}, }, &view.View{ Description: processingTimeInMsecM.Description(), Measure: processingTimeInMsecM, Aggregation: view.Distribution(metrics.Buckets125(1, 10000)...), // 1, 2, 5, 10, 20, 50, 100, 1000, 5000, 10000 - TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, broker.UniqueTagKey, broker.ContainerTagKey}, + TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, tag.MustNewKey("unique_name"), tag.MustNewKey("container_name")}, }, ) if err != nil { @@ -177,22 +176,22 @@ func (r *reporter) ReportEventProcessingTime(args *ReportArgs, d time.Duration) func (r *reporter) generateTag(args *ReportArgs, tags ...tag.Mutator) (context.Context, error) { ctx := metricskey.WithResource(emptyContext, resource.Resource{ - Type: eventingmetrics.ResourceTypeKnativeTrigger, + Type: ResourceTypeKnativeTrigger, Labels: map[string]string{ - eventingmetrics.LabelNamespaceName: args.ns, - eventingmetrics.LabelBrokerName: args.broker, - eventingmetrics.LabelTriggerName: args.trigger, + LabelNamespaceName: args.Ns, + LabelBrokerName: args.Broker, + LabelTriggerName: args.Trigger, }, }) // Note that filterType and filterSource can be empty strings, so they need a special treatment. ctx, err := tag.New( ctx, append(tags, - tag.Insert(broker.ContainerTagKey, r.container), - tag.Insert(broker.UniqueTagKey, r.uniqueName), - tag.Insert(triggerFilterTypeKey, valueOrAny(args.filterType)), - tag.Insert(triggerFilterRequestTypeKey, args.requestType), - tag.Insert(triggerFilterRequestSchemeKey, args.requestScheme), + tag.Insert(tag.MustNewKey("container_name"), r.container), + tag.Insert(tag.MustNewKey("unique_name"), r.uniqueName), + tag.Insert(triggerFilterTypeKey, valueOrAny(args.FilterType)), + tag.Insert(triggerFilterRequestTypeKey, args.RequestType), + tag.Insert(triggerFilterRequestSchemeKey, args.RequestScheme), )...) return ctx, err } @@ -201,5 +200,5 @@ func valueOrAny(v string) string { if v != "" { return v } - return anyValue + return AnyValue } From c473e79bfa14a9395a9d95f35c1fdccd5a1130e0 Mon Sep 17 00:00:00 2001 From: Leo Li Date: Mon, 8 Jul 2024 18:26:21 -0400 Subject: [PATCH 2/7] feat: Make the metric/statsReporter generic, and add the customized variables and methods for broker filter's stat_reporter - 1 --- pkg/broker/filter/filter_handler.go | 96 +++++++++++++++++------- pkg/broker/filter/filter_handler_test.go | 6 +- pkg/broker/filter/stats_reporter_test.go | 62 ++++++++++++--- pkg/metrics/stats_reporter.go | 64 +++++----------- 4 files changed, 143 insertions(+), 85 deletions(-) diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index 23c926b6067..ab214664f15 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -26,7 +26,11 @@ import ( "net/http" "time" - "knative.dev/eventing/pkg/metrics" + "go.opencensus.io/resource" + "go.opencensus.io/tag" + "knative.dev/pkg/metrics/metricskey" + + eventingmetrics "knative.dev/eventing/pkg/metrics" opencensusclient "github.com/cloudevents/sdk-go/observability/opencensus/v2/client" cloudevents "github.com/cloudevents/sdk-go/v2" @@ -77,7 +81,7 @@ const ( // Handler parses Cloud Events, determines if they pass a filter, and sends them to a subscriber. type Handler struct { // reporter reports stats of status code and dispatch time - reporter metrics.StatsReporter + reporter eventingmetrics.StatsReporter eventDispatcher *kncloudevents.Dispatcher @@ -90,8 +94,45 @@ type Handler struct { EventTypeCreator *eventtype.EventTypeAutoHandler } +type BrokerArgs struct { + ns string + trigger string + broker string + filterType string + requestType string + requestScheme string +} + +var ( + triggerFilterTypeKey = tag.MustNewKey(eventingmetrics.LabelFilterType) + triggerFilterRequestTypeKey = tag.MustNewKey("filter_request_type") + triggerFilterRequestSchemeKey = tag.MustNewKey(eventingmetrics.LabelEventScheme) + responseCodeKey = tag.MustNewKey(eventingmetrics.LabelResponseCode) + responseCodeClassKey = tag.MustNewKey(eventingmetrics.LabelResponseCodeClass) +) + +func (args *BrokerArgs) GenerateTag(tags ...tag.Mutator) (context.Context, error) { + ctx := metricskey.WithResource(eventingmetrics.EmptyContext, resource.Resource{ + Type: eventingmetrics.ResourceTypeKnativeTrigger, + Labels: map[string]string{ + eventingmetrics.LabelNamespaceName: args.ns, + eventingmetrics.LabelBrokerName: args.broker, + eventingmetrics.LabelTriggerName: args.trigger, + }, + }) + // Note that filterType and filterSource can be empty strings, so they need a special treatment. + ctx, err := tag.New( + ctx, + append(tags, + tag.Insert(triggerFilterTypeKey, eventingmetrics.ValueOrAny(args.filterType)), + tag.Insert(triggerFilterRequestTypeKey, args.requestType), + tag.Insert(triggerFilterRequestSchemeKey, args.requestScheme), + )...) + return ctx, err +} + // NewHandler creates a new Handler and its associated EventReceiver. -func NewHandler(logger *zap.Logger, tokenVerifier *auth.OIDCTokenVerifier, oidcTokenProvider *auth.OIDCTokenProvider, triggerInformer v1.TriggerInformer, brokerInformer v1.BrokerInformer, reporter metrics.StatsReporter, trustBundleConfigMapLister corev1listers.ConfigMapNamespaceLister, wc func(ctx context.Context) context.Context) (*Handler, error) { +func NewHandler(logger *zap.Logger, tokenVerifier *auth.OIDCTokenVerifier, oidcTokenProvider *auth.OIDCTokenProvider, triggerInformer v1.TriggerInformer, brokerInformer v1.BrokerInformer, reporter eventingmetrics.StatsReporter, trustBundleConfigMapLister corev1listers.ConfigMapNamespaceLister, wc func(ctx context.Context) context.Context) (*Handler, error) { kncloudevents.ConfigureConnectionArgs(&kncloudevents.ConnectionArgs{ MaxIdleConns: defaultMaxIdleConnections, MaxIdleConnsPerHost: defaultMaxIdleConnectionsPerHost, @@ -247,17 +288,18 @@ func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eve target := broker.Status.Address - reportArgs := &metrics.ReportArgs{ - Ns: trigger.Namespace, - Trigger: trigger.Name, - Broker: brokerRef, - RequestType: "reply_forward", + reportArgs := &BrokerArgs{ + ns: trigger.Namespace, + trigger: trigger.Name, + broker: brokerRef, + filterType: triggerFilterAttribute(trigger.Spec.Filter, "type"), + requestType: "reply_forward", } if request.TLS != nil { - reportArgs.RequestScheme = "https" + reportArgs.requestScheme = "https" } else { - reportArgs.RequestScheme = "http" + reportArgs.requestScheme = "http" } h.logger.Info("sending to reply", zap.Any("target", target)) @@ -298,17 +340,17 @@ func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *event } } - reportArgs := &metrics.ReportArgs{ - Ns: trigger.Namespace, - Trigger: trigger.Name, - Broker: trigger.Spec.Broker, - RequestType: "dls_forward", + reportArgs := &BrokerArgs{ + ns: trigger.Namespace, + trigger: trigger.Name, + broker: brokerRef, + requestType: "dls_forward", } if request.TLS != nil { - reportArgs.RequestScheme = "https" + reportArgs.requestScheme = "https" } else { - reportArgs.RequestScheme = "http" + reportArgs.requestScheme = "http" } h.logger.Info("sending to dls", zap.Any("target", target)) @@ -345,18 +387,18 @@ func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger h.logger.Warn("Failed to delete TTL.", zap.Error(err)) } - reportArgs := &metrics.ReportArgs{ - Ns: trigger.Namespace, - Trigger: trigger.Name, - Broker: brokerRef, - FilterType: triggerFilterAttribute(trigger.Spec.Filter, "type"), - RequestType: "filter", + reportArgs := &BrokerArgs{ + ns: trigger.Namespace, + trigger: trigger.Name, + broker: brokerRef, + filterType: triggerFilterAttribute(trigger.Spec.Filter, "type"), + requestType: "filter", } if request.TLS != nil { - reportArgs.RequestScheme = "https" + reportArgs.requestScheme = "https" } else { - reportArgs.RequestScheme = "http" + reportArgs.requestScheme = "http" } subscriberURI := trigger.Status.SubscriberURI @@ -388,7 +430,7 @@ func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger h.send(ctx, writer, utils.PassThroughHeaders(request.Header), target, reportArgs, event, trigger, ttl) } -func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target duckv1.Addressable, reportArgs *metrics.ReportArgs, event *cloudevents.Event, t *eventingv1.Trigger, ttl int32) { +func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target duckv1.Addressable, reportArgs *BrokerArgs, event *cloudevents.Event, t *eventingv1.Trigger, ttl int32) { additionalHeaders := headers.Clone() additionalHeaders.Set(apis.KnNamespaceHeader, t.GetNamespace()) @@ -521,7 +563,7 @@ func (h *Handler) writeResponse(ctx context.Context, writer http.ResponseWriter, return dispatchInfo.ResponseCode, nil } -func (h *Handler) reportArrivalTime(event *event.Event, reportArgs *metrics.ReportArgs) { +func (h *Handler) reportArrivalTime(event *event.Event, reportArgs *BrokerArgs) { // Record the event processing time. This might be off if the receiver and the filter pods are running in // different nodes with different clocks. var arrivalTimeStr string diff --git a/pkg/broker/filter/filter_handler_test.go b/pkg/broker/filter/filter_handler_test.go index 7b73f1c6b6d..d78c51bd139 100644 --- a/pkg/broker/filter/filter_handler_test.go +++ b/pkg/broker/filter/filter_handler_test.go @@ -749,17 +749,17 @@ type mockReporter struct { EventProcessingTimeReported bool } -func (r *mockReporter) ReportEventCount(args *metrics.ReportArgs, responseCode int) error { +func (r *mockReporter) ReportEventCount(args metrics.MetricArgs, responseCode int) error { r.EventCountReported = true return nil } -func (r *mockReporter) ReportEventDispatchTime(args *metrics.ReportArgs, responseCode int, d time.Duration) error { +func (r *mockReporter) ReportEventDispatchTime(args metrics.MetricArgs, responseCode int, d time.Duration) error { r.EventDispatchTimeReported = true return nil } -func (r *mockReporter) ReportEventProcessingTime(args *metrics.ReportArgs, d time.Duration) error { +func (r *mockReporter) ReportEventProcessingTime(args metrics.MetricArgs, d time.Duration) error { r.EventProcessingTimeReported = true return nil } diff --git a/pkg/broker/filter/stats_reporter_test.go b/pkg/broker/filter/stats_reporter_test.go index 296bfdcaac3..90b9c23a58b 100644 --- a/pkg/broker/filter/stats_reporter_test.go +++ b/pkg/broker/filter/stats_reporter_test.go @@ -17,10 +17,14 @@ limitations under the License. package filter import ( + "context" "net/http" "testing" "time" + "go.opencensus.io/tag" + "knative.dev/pkg/metrics/metricskey" + "go.opencensus.io/resource" broker "knative.dev/eventing/pkg/broker" "knative.dev/eventing/pkg/metrics" @@ -28,13 +32,47 @@ import ( _ "knative.dev/pkg/metrics/testing" ) +type TestArgs struct { + ns string + trigger string + broker string + filterType string + requestType string + requestScheme string + container string + uniqueName string +} + +func (args *TestArgs) GenerateTag(tags ...tag.Mutator) (context.Context, error) { + ctx := metricskey.WithResource(metrics.EmptyContext, resource.Resource{ + Type: metrics.ResourceTypeKnativeTrigger, + Labels: map[string]string{ + metrics.LabelNamespaceName: args.ns, + metrics.LabelBrokerName: args.broker, + metrics.LabelTriggerName: args.trigger, + }, + }) + // Note that filterType and filterSource can be empty strings, so they need a special treatment. + ctx, err := tag.New( + ctx, + append(tags, + tag.Insert(broker.ContainerTagKey, args.container), + tag.Insert(broker.UniqueTagKey, args.uniqueName), + tag.Insert(triggerFilterTypeKey, metrics.ValueOrAny(args.filterType)), + tag.Insert(triggerFilterRequestTypeKey, args.requestType), + tag.Insert(triggerFilterRequestSchemeKey, args.requestScheme), + )...) + return ctx, err +} func TestStatsReporter(t *testing.T) { setup() - args := &metrics.ReportArgs{ - Ns: "testns", - Trigger: "testtrigger", - Broker: "testbroker", - FilterType: "testeventtype", + args := &TestArgs{ + ns: "testns", + trigger: "testtrigger", + broker: "testbroker", + filterType: "testeventtype", + container: "testcontainer", + uniqueName: "testpod", } r := metrics.NewStatsReporter("testcontainer", "testpod") @@ -95,12 +133,14 @@ func TestStatsReporter(t *testing.T) { func TestReporterEmptySourceAndTypeFilter(t *testing.T) { setup() - args := &metrics.ReportArgs{ - Ns: "testns", - Trigger: "testtrigger", - Broker: "testbroker", - FilterType: "", - RequestScheme: "http", + args := &TestArgs{ + ns: "testns", + trigger: "testtrigger", + broker: "testbroker", + filterType: "", + requestScheme: "http", + container: "testcontainer", + uniqueName: "testpod", } r := metrics.NewStatsReporter("testcontainer", "testpod") diff --git a/pkg/metrics/stats_reporter.go b/pkg/metrics/stats_reporter.go index 34fe8fc39c3..00d0ba46980 100644 --- a/pkg/metrics/stats_reporter.go +++ b/pkg/metrics/stats_reporter.go @@ -22,13 +22,10 @@ import ( "strconv" "time" - "knative.dev/pkg/metrics" - "knative.dev/pkg/metrics/metricskey" - - "go.opencensus.io/resource" "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" + "knative.dev/pkg/metrics" ) const ( @@ -73,13 +70,14 @@ var ( responseCodeClassKey = tag.MustNewKey(LabelResponseCodeClass) ) -type ReportArgs struct { - Ns string - Trigger string - Broker string - FilterType string - RequestType string - RequestScheme string +type MetricArgs interface { + //Ns string + //Trigger string + //Broker string + //FilterType string + //RequestType string + //RequestScheme string + GenerateTag(tags ...tag.Mutator) (context.Context, error) } func init() { @@ -88,13 +86,13 @@ func init() { // StatsReporter defines the interface for sending filter metrics. type StatsReporter interface { - ReportEventCount(args *ReportArgs, responseCode int) error - ReportEventDispatchTime(args *ReportArgs, responseCode int, d time.Duration) error - ReportEventProcessingTime(args *ReportArgs, d time.Duration) error + ReportEventCount(args MetricArgs, responseCode int) error + ReportEventDispatchTime(args MetricArgs, responseCode int, d time.Duration) error + ReportEventProcessingTime(args MetricArgs, d time.Duration) error } var _ StatsReporter = (*reporter)(nil) -var emptyContext = context.Background() +var EmptyContext = context.Background() // reporter holds cached metric objects to report filter metrics. type reporter struct { @@ -138,8 +136,8 @@ func Register() { } // ReportEventCount captures the event count. -func (r *reporter) ReportEventCount(args *ReportArgs, responseCode int) error { - ctx, err := r.generateTag(args, +func (r *reporter) ReportEventCount(args MetricArgs, responseCode int) error { + ctx, err := args.GenerateTag( tag.Insert(responseCodeKey, strconv.Itoa(responseCode)), tag.Insert(responseCodeClassKey, metrics.ResponseCodeClass(responseCode))) if err != nil { @@ -150,8 +148,8 @@ func (r *reporter) ReportEventCount(args *ReportArgs, responseCode int) error { } // ReportEventDispatchTime captures dispatch times. -func (r *reporter) ReportEventDispatchTime(args *ReportArgs, responseCode int, d time.Duration) error { - ctx, err := r.generateTag(args, +func (r *reporter) ReportEventDispatchTime(args MetricArgs, responseCode int, d time.Duration) error { + ctx, err := args.GenerateTag( tag.Insert(responseCodeKey, strconv.Itoa(responseCode)), tag.Insert(responseCodeClassKey, metrics.ResponseCodeClass(responseCode))) if err != nil { @@ -163,8 +161,8 @@ func (r *reporter) ReportEventDispatchTime(args *ReportArgs, responseCode int, d } // ReportEventProcessingTime captures event processing times. -func (r *reporter) ReportEventProcessingTime(args *ReportArgs, d time.Duration) error { - ctx, err := r.generateTag(args) +func (r *reporter) ReportEventProcessingTime(args MetricArgs, d time.Duration) error { + ctx, err := args.GenerateTag() if err != nil { return err } @@ -174,29 +172,7 @@ func (r *reporter) ReportEventProcessingTime(args *ReportArgs, d time.Duration) return nil } -func (r *reporter) generateTag(args *ReportArgs, tags ...tag.Mutator) (context.Context, error) { - ctx := metricskey.WithResource(emptyContext, resource.Resource{ - Type: ResourceTypeKnativeTrigger, - Labels: map[string]string{ - LabelNamespaceName: args.Ns, - LabelBrokerName: args.Broker, - LabelTriggerName: args.Trigger, - }, - }) - // Note that filterType and filterSource can be empty strings, so they need a special treatment. - ctx, err := tag.New( - ctx, - append(tags, - tag.Insert(tag.MustNewKey("container_name"), r.container), - tag.Insert(tag.MustNewKey("unique_name"), r.uniqueName), - tag.Insert(triggerFilterTypeKey, valueOrAny(args.FilterType)), - tag.Insert(triggerFilterRequestTypeKey, args.RequestType), - tag.Insert(triggerFilterRequestSchemeKey, args.RequestScheme), - )...) - return ctx, err -} - -func valueOrAny(v string) string { +func ValueOrAny(v string) string { if v != "" { return v } From 9d1fb3aaa7ea67860f174d214916e537a14ee1ae Mon Sep 17 00:00:00 2001 From: Leo Li Date: Wed, 24 Jul 2024 02:41:10 -0400 Subject: [PATCH 3/7] fix: fix the review comments --- pkg/metrics/stats_reporter.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/pkg/metrics/stats_reporter.go b/pkg/metrics/stats_reporter.go index 186ee54c77b..7bb7b7e414e 100644 --- a/pkg/metrics/stats_reporter.go +++ b/pkg/metrics/stats_reporter.go @@ -1,5 +1,5 @@ /* -Copyright 2024 The Knative Authors +Copyright 2020 The Knative Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -71,12 +71,6 @@ var ( ) type MetricArgs interface { - //Ns string - //Trigger string - //Broker string - //FilterType string - //RequestType string - //RequestScheme string GenerateTag(tags ...tag.Mutator) (context.Context, error) } From a9bcb7482230f4614e3aa39ef80f6dfc0126832c Mon Sep 17 00:00:00 2001 From: Leo HC Li <36619969+Leo6Leo@users.noreply.github.com> Date: Wed, 14 Aug 2024 05:06:05 -0400 Subject: [PATCH 4/7] feat: change the generic stats_reporter and add tests --- pkg/metrics/stats_reporter.go | 90 +++++++++--------- pkg/metrics/stats_repoter_test.go | 146 ++++++++++++++++++++++++++++++ 2 files changed, 190 insertions(+), 46 deletions(-) create mode 100644 pkg/metrics/stats_repoter_test.go diff --git a/pkg/metrics/stats_reporter.go b/pkg/metrics/stats_reporter.go index 7bb7b7e414e..11a91dc200e 100644 --- a/pkg/metrics/stats_reporter.go +++ b/pkg/metrics/stats_reporter.go @@ -50,24 +50,13 @@ var ( stats.UnitMilliseconds, ) - // processingTimeInMsecM records the time spent between arrival at the Broker - // and the delivery to the Trigger subscriber. - processingTimeInMsecM = stats.Float64( - "event_processing_latencies", - "The time spent processing an event before it is dispatched to a Trigger subscriber", - stats.UnitMilliseconds, - ) - // Create the tag keys that will be used to add tags to our measurements. // Tag keys must conform to the restrictions described in // go.opencensus.io/tag/validate.go. Currently those restrictions are: // - length between 1 and 255 inclusive // - characters are printable US-ASCII - triggerFilterTypeKey = tag.MustNewKey(LabelFilterType) - triggerFilterRequestTypeKey = tag.MustNewKey("filter_request_type") - triggerFilterRequestSchemeKey = tag.MustNewKey(LabelEventScheme) - responseCodeKey = tag.MustNewKey(LabelResponseCode) - responseCodeClassKey = tag.MustNewKey(LabelResponseCodeClass) + responseCodeKey = tag.MustNewKey(LabelResponseCode) + responseCodeClassKey = tag.MustNewKey(LabelResponseCodeClass) ) type MetricArgs interface { @@ -75,14 +64,13 @@ type MetricArgs interface { } func init() { - Register() + Register([]stats.Measure{}, nil) } // StatsReporter defines the interface for sending filter metrics. type StatsReporter interface { ReportEventCount(args MetricArgs, responseCode int) error ReportEventDispatchTime(args MetricArgs, responseCode int, d time.Duration) error - ReportEventProcessingTime(args MetricArgs, d time.Duration) error } var _ StatsReporter = (*reporter)(nil) @@ -102,67 +90,77 @@ func NewStatsReporter(container, uniqueName string) StatsReporter { } } -func Register() { - // Create view to see our measurements. - err := metrics.RegisterResourceView( - &view.View{ +func Register(customMetrics []stats.Measure, customViews []*view.View, customTagKeys ...tag.Key) { + commonTagKeys := []tag.Key{responseCodeKey, responseCodeClassKey, tag.MustNewKey("unique_name"), tag.MustNewKey("container_name")} + allTagKeys := append(commonTagKeys, customTagKeys...) + + defaultViews := []*view.View{ + { Description: eventCountM.Description(), Measure: eventCountM, Aggregation: view.Count(), - TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, responseCodeKey, responseCodeClassKey, tag.MustNewKey("unique_name"), tag.MustNewKey("container_name")}, + TagKeys: allTagKeys, }, - &view.View{ + { Description: dispatchTimeInMsecM.Description(), Measure: dispatchTimeInMsecM, - Aggregation: view.Distribution(metrics.Buckets125(1, 10000)...), // 1, 2, 5, 10, 20, 50, 100, 1000, 5000, 10000 - TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, responseCodeKey, responseCodeClassKey, tag.MustNewKey("unique_name"), tag.MustNewKey("container_name")}, - }, - &view.View{ - Description: processingTimeInMsecM.Description(), - Measure: processingTimeInMsecM, - Aggregation: view.Distribution(metrics.Buckets125(1, 10000)...), // 1, 2, 5, 10, 20, 50, 100, 1000, 5000, 10000 - TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, tag.MustNewKey("unique_name"), tag.MustNewKey("container_name")}, + Aggregation: view.Distribution(metrics.Buckets125(1, 10000)...), + TagKeys: allTagKeys, }, - ) - if err != nil { - log.Printf("failed to register opencensus views, %s", err) + } + + // Add custom views for custom metrics + for _, metric := range customMetrics { + defaultViews = append(defaultViews, &view.View{ + Description: metric.Description(), + Measure: metric, + Aggregation: view.LastValue(), // You can change this aggregation as needed + TagKeys: allTagKeys, + }) + } + + // Append custom views + allViews := append(defaultViews, customViews...) + + if err := metrics.RegisterResourceView(allViews...); err != nil { + log.Printf("Failed to register opencensus views: %v", err) } } // ReportEventCount captures the event count. func (r *reporter) ReportEventCount(args MetricArgs, responseCode int) error { - ctx, err := args.GenerateTag( + // Create base tags + baseTags := []tag.Mutator{ tag.Insert(responseCodeKey, strconv.Itoa(responseCode)), - tag.Insert(responseCodeClassKey, metrics.ResponseCodeClass(responseCode))) + tag.Insert(responseCodeClassKey, metrics.ResponseCodeClass(responseCode)), + } + + // Generate context with all tags, including any custom ones from args.GenerateTag + ctx, err := args.GenerateTag(baseTags...) if err != nil { return err } + metrics.Record(ctx, eventCountM.M(1)) return nil } // ReportEventDispatchTime captures dispatch times. func (r *reporter) ReportEventDispatchTime(args MetricArgs, responseCode int, d time.Duration) error { - ctx, err := args.GenerateTag( + // Create base tags + baseTags := []tag.Mutator{ tag.Insert(responseCodeKey, strconv.Itoa(responseCode)), - tag.Insert(responseCodeClassKey, metrics.ResponseCodeClass(responseCode))) - if err != nil { - return err + tag.Insert(responseCodeClassKey, metrics.ResponseCodeClass(responseCode)), } - // convert time.Duration in nanoseconds to milliseconds. - metrics.Record(ctx, dispatchTimeInMsecM.M(float64(d/time.Millisecond))) - return nil -} -// ReportEventProcessingTime captures event processing times. -func (r *reporter) ReportEventProcessingTime(args MetricArgs, d time.Duration) error { - ctx, err := args.GenerateTag() + // Generate context with all tags, including any custom ones from args.GenerateTag + ctx, err := args.GenerateTag(baseTags...) if err != nil { return err } // convert time.Duration in nanoseconds to milliseconds. - metrics.Record(ctx, processingTimeInMsecM.M(float64(d/time.Millisecond))) + metrics.Record(ctx, dispatchTimeInMsecM.M(float64(d/time.Millisecond))) return nil } diff --git a/pkg/metrics/stats_repoter_test.go b/pkg/metrics/stats_repoter_test.go new file mode 100644 index 00000000000..71187c0abf6 --- /dev/null +++ b/pkg/metrics/stats_repoter_test.go @@ -0,0 +1,146 @@ +package metrics + +import ( + "context" + "net/http" + "testing" + "time" + + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + "knative.dev/pkg/metrics/metricstest" + _ "knative.dev/pkg/metrics/testing" +) + +type testMetricArgs struct{} + +func (t *testMetricArgs) GenerateTag(tags ...tag.Mutator) (context.Context, error) { + return tag.New(EmptyContext, append(tags, + tag.Insert(tag.MustNewKey("unique_name"), "testpod"), + tag.Insert(tag.MustNewKey("container_name"), "testcontainer"), + )...) +} + +// Custom metric for testing +var customMetricM = stats.Int64( + "custom_metric", + "A custom metric for testing", + stats.UnitDimensionless, +) + +// Custom tag for testing +var customTagKey = tag.MustNewKey("custom_tag") + +// add ReportEventCountRetry to StatsReporter +type statsReporterTester interface { + StatsReporter + ReportCustomMetric(args MetricArgs, value int64) error +} + +// ReportCustomMetric records a custom metric value. +func (r *reporter) ReportCustomMetric(args MetricArgs, value int64) error { + // Create base tags + baseTags := []tag.Mutator{ + tag.Insert(tag.MustNewKey("unique_name"), r.uniqueName), + tag.Insert(tag.MustNewKey("container_name"), r.container), + } + + // Generate context with all tags, including any custom ones from args.GenerateTag + ctx, err := args.GenerateTag(baseTags...) + if err != nil { + return err + } + + // Record the custom metric + stats.Record(ctx, customMetricM.M(value)) + return nil +} +func NewStatsReporterTester(container, uniqueName string) statsReporterTester { + return &reporter{ + container: container, + uniqueName: uniqueName, + } +} + +func TestStatsReporter(t *testing.T) { + setup() + + args := &testMetricArgs{} + r := NewStatsReporterTester("testcontainer", "testpod") + + wantTags := map[string]string{ + "response_code": "202", + "response_code_class": "2xx", + "unique_name": "testpod", + "container_name": "testcontainer", + } + + // Test ReportEventCount + expectSuccess(t, func() error { + return r.ReportEventCount(args, http.StatusAccepted) + }) + expectSuccess(t, func() error { + return r.ReportEventCount(args, http.StatusAccepted) + }) + metricstest.CheckCountData(t, "event_count", wantTags, 2) + + // Test ReportEventDispatchTime + expectSuccess(t, func() error { + return r.ReportEventDispatchTime(args, http.StatusAccepted, 1100*time.Millisecond) + }) + expectSuccess(t, func() error { + return r.ReportEventDispatchTime(args, http.StatusAccepted, 9100*time.Millisecond) + }) + metricstest.CheckDistributionData(t, "event_dispatch_latencies", wantTags, 2, 1100.0, 9100.0) + +} + +func TestRegisterCustomMetrics(t *testing.T) { + setup() + + customMetrics := []stats.Measure{customMetricM} + customViews := []*view.View{ + { + Description: customMetricM.Description(), + Measure: customMetricM, + Aggregation: view.LastValue(), + TagKeys: []tag.Key{customTagKey}, + }, + } + customTagKeys := []tag.Key{customTagKey} + + Register(customMetrics, customViews, customTagKeys...) + + // Verify that the custom view is registered + if v := view.Find("custom_metric"); v == nil { + t.Error("Custom view was not registered") + } + + // Record a value for the custom metric + ctx, _ := tag.New(context.Background(), tag.Insert(customTagKey, "test_value")) + stats.Record(ctx, customMetricM.M(100)) + + // Check if the value was recorded correctly + metricstest.CheckLastValueData(t, "custom_metric", map[string]string{"custom_tag": "test_value"}, 100) +} + +func expectSuccess(t *testing.T, f func() error) { + t.Helper() + if err := f(); err != nil { + t.Errorf("Reporter expected success but got error: %v", err) + } +} + +func setup() { + resetMetrics() +} + +func resetMetrics() { + metricstest.Unregister( + "event_count", + "event_dispatch_latencies", + "event_processing_latencies", + "custom_metric") + Register([]stats.Measure{}, nil) +} From 4e9654b993fe614aefe879213210e23bd63c5909 Mon Sep 17 00:00:00 2001 From: Leo HC Li <36619969+Leo6Leo@users.noreply.github.com> Date: Wed, 14 Aug 2024 05:10:59 -0400 Subject: [PATCH 5/7] fix: revert all the changes before --- cmd/broker/filter/main.go | 3 +- pkg/broker/filter/filter_handler.go | 58 +------ pkg/broker/filter/filter_handler_test.go | 39 +++-- pkg/broker/filter/stats_reporter.go | 205 +++++++++++++++++++++++ pkg/broker/filter/stats_reporter_test.go | 52 +----- 5 files changed, 238 insertions(+), 119 deletions(-) create mode 100644 pkg/broker/filter/stats_reporter.go diff --git a/cmd/broker/filter/main.go b/cmd/broker/filter/main.go index eb95e834219..61e22e9ae74 100644 --- a/cmd/broker/filter/main.go +++ b/cmd/broker/filter/main.go @@ -24,7 +24,6 @@ import ( "github.com/google/uuid" "github.com/kelseyhightower/envconfig" "go.uber.org/zap" - eventingmetrics "knative.dev/eventing/pkg/metrics" kubeclient "knative.dev/pkg/client/injection/kube/client" configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/filtered" filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered" @@ -148,7 +147,7 @@ func main() { logger.Fatal("Error setting up trace publishing", zap.Error(err)) } - reporter := eventingmetrics.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String())) + reporter := filter.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String())) oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) // We are running both the receiver (takes messages in from the Broker) and the dispatcher (send diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index 3d045c2c1a3..c0b7b4c4617 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -26,12 +26,6 @@ import ( "net/http" "time" - "go.opencensus.io/resource" - "go.opencensus.io/tag" - "knative.dev/pkg/metrics/metricskey" - - eventingmetrics "knative.dev/eventing/pkg/metrics" - messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" messaginginformers "knative.dev/eventing/pkg/client/informers/externalversions/messaging/v1" "knative.dev/eventing/pkg/reconciler/broker/resources" @@ -86,7 +80,7 @@ const ( // Handler parses Cloud Events, determines if they pass a filter, and sends them to a subscriber. type Handler struct { // reporter reports stats of status code and dispatch time - reporter eventingmetrics.StatsReporter + reporter StatsReporter eventDispatcher *kncloudevents.Dispatcher @@ -100,45 +94,8 @@ type Handler struct { EventTypeCreator *eventtype.EventTypeAutoHandler } -type BrokerArgs struct { - ns string - trigger string - broker string - filterType string - requestType string - requestScheme string -} - -var ( - triggerFilterTypeKey = tag.MustNewKey(eventingmetrics.LabelFilterType) - triggerFilterRequestTypeKey = tag.MustNewKey("filter_request_type") - triggerFilterRequestSchemeKey = tag.MustNewKey(eventingmetrics.LabelEventScheme) - responseCodeKey = tag.MustNewKey(eventingmetrics.LabelResponseCode) - responseCodeClassKey = tag.MustNewKey(eventingmetrics.LabelResponseCodeClass) -) - -func (args *BrokerArgs) GenerateTag(tags ...tag.Mutator) (context.Context, error) { - ctx := metricskey.WithResource(eventingmetrics.EmptyContext, resource.Resource{ - Type: eventingmetrics.ResourceTypeKnativeTrigger, - Labels: map[string]string{ - eventingmetrics.LabelNamespaceName: args.ns, - eventingmetrics.LabelBrokerName: args.broker, - eventingmetrics.LabelTriggerName: args.trigger, - }, - }) - // Note that filterType and filterSource can be empty strings, so they need a special treatment. - ctx, err := tag.New( - ctx, - append(tags, - tag.Insert(triggerFilterTypeKey, eventingmetrics.ValueOrAny(args.filterType)), - tag.Insert(triggerFilterRequestTypeKey, args.requestType), - tag.Insert(triggerFilterRequestSchemeKey, args.requestScheme), - )...) - return ctx, err -} - // NewHandler creates a new Handler and its associated EventReceiver. -func NewHandler(logger *zap.Logger, tokenVerifier *auth.OIDCTokenVerifier, oidcTokenProvider *auth.OIDCTokenProvider, triggerInformer v1.TriggerInformer, brokerInformer v1.BrokerInformer, subscriptionInformer messaginginformers.SubscriptionInformer, reporter eventingmetrics.StatsReporter, trustBundleConfigMapLister corev1listers.ConfigMapNamespaceLister, wc func(ctx context.Context) context.Context) (*Handler, error) { +func NewHandler(logger *zap.Logger, tokenVerifier *auth.OIDCTokenVerifier, oidcTokenProvider *auth.OIDCTokenProvider, triggerInformer v1.TriggerInformer, brokerInformer v1.BrokerInformer, subscriptionInformer messaginginformers.SubscriptionInformer, reporter StatsReporter, trustBundleConfigMapLister corev1listers.ConfigMapNamespaceLister, wc func(ctx context.Context) context.Context) (*Handler, error) { kncloudevents.ConfigureConnectionArgs(&kncloudevents.ConnectionArgs{ MaxIdleConns: defaultMaxIdleConnections, MaxIdleConnsPerHost: defaultMaxIdleConnectionsPerHost, @@ -317,11 +274,10 @@ func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eve target := broker.Status.Address - reportArgs := &BrokerArgs{ + reportArgs := &ReportArgs{ ns: trigger.Namespace, trigger: trigger.Name, broker: brokerName, - filterType: triggerFilterAttribute(trigger.Spec.Filter, "type"), requestType: "reply_forward", } @@ -376,7 +332,7 @@ func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *event } } - reportArgs := &BrokerArgs{ + reportArgs := &ReportArgs{ ns: trigger.Namespace, trigger: trigger.Name, broker: brokerName, @@ -427,7 +383,7 @@ func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger h.logger.Warn("Failed to delete TTL.", zap.Error(err)) } - reportArgs := &BrokerArgs{ + reportArgs := &ReportArgs{ ns: trigger.Namespace, trigger: trigger.Name, broker: brokerName, @@ -470,7 +426,7 @@ func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger h.send(ctx, writer, utils.PassThroughHeaders(request.Header), target, reportArgs, event, trigger, ttl) } -func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target duckv1.Addressable, reportArgs *BrokerArgs, event *cloudevents.Event, t *eventingv1.Trigger, ttl int32) { +func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target duckv1.Addressable, reportArgs *ReportArgs, event *cloudevents.Event, t *eventingv1.Trigger, ttl int32) { additionalHeaders := headers.Clone() additionalHeaders.Set(apis.KnNamespaceHeader, t.GetNamespace()) @@ -603,7 +559,7 @@ func (h *Handler) writeResponse(ctx context.Context, writer http.ResponseWriter, return dispatchInfo.ResponseCode, nil } -func (h *Handler) reportArrivalTime(event *event.Event, reportArgs *BrokerArgs) { +func (h *Handler) reportArrivalTime(event *event.Event, reportArgs *ReportArgs) { // Record the event processing time. This might be off if the receiver and the filter pods are running in // different nodes with different clocks. var arrivalTimeStr string diff --git a/pkg/broker/filter/filter_handler_test.go b/pkg/broker/filter/filter_handler_test.go index 31697019fca..5d67ddb5cdd 100644 --- a/pkg/broker/filter/filter_handler_test.go +++ b/pkg/broker/filter/filter_handler_test.go @@ -40,7 +40,6 @@ import ( "go.uber.org/zap/zaptest" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "knative.dev/eventing/pkg/metrics" "knative.dev/pkg/apis" configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/fake" "knative.dev/pkg/logging" @@ -530,14 +529,14 @@ func TestReceiver(t *testing.T) { if tc.expectedDispatch != fh.requestReceived { t.Errorf("Incorrect dispatch. Expected %v, Actual %v", tc.expectedDispatch, fh.requestReceived) } - if tc.expectedEventCount != reporter.EventCountReported { - t.Errorf("Incorrect event count reported metric. Expected %v, Actual %v", tc.expectedEventCount, reporter.EventCountReported) + if tc.expectedEventCount != reporter.eventCountReported { + t.Errorf("Incorrect event count reported metric. Expected %v, Actual %v", tc.expectedEventCount, reporter.eventCountReported) } - if tc.expectedEventDispatchTime != reporter.EventDispatchTimeReported { - t.Errorf("Incorrect event dispatch time reported metric. Expected %v, Actual %v", tc.expectedEventDispatchTime, reporter.EventDispatchTimeReported) + if tc.expectedEventDispatchTime != reporter.eventDispatchTimeReported { + t.Errorf("Incorrect event dispatch time reported metric. Expected %v, Actual %v", tc.expectedEventDispatchTime, reporter.eventDispatchTimeReported) } - if tc.expectedEventProcessingTime != reporter.EventProcessingTimeReported { - t.Errorf("Incorrect event processing time reported metric. Expected %v, Actual %v", tc.expectedEventProcessingTime, reporter.EventProcessingTimeReported) + if tc.expectedEventProcessingTime != reporter.eventProcessingTimeReported { + t.Errorf("Incorrect event processing time reported metric. Expected %v, Actual %v", tc.expectedEventProcessingTime, reporter.eventProcessingTimeReported) } if tc.expectedResponseEvent != nil { if tc.expectedResponseEvent.SpecVersion() != event.CloudEventsVersionV1 { @@ -728,11 +727,11 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) { if tc.expectedDispatch != fh.requestReceived { t.Errorf("Incorrect dispatch. Expected %v, Actual %v", tc.expectedDispatch, fh.requestReceived) } - if tc.expectedEventCount != reporter.EventCountReported { - t.Errorf("Incorrect event count reported metric. Expected %v, Actual %v", tc.expectedEventCount, reporter.EventCountReported) + if tc.expectedEventCount != reporter.eventCountReported { + t.Errorf("Incorrect event count reported metric. Expected %v, Actual %v", tc.expectedEventCount, reporter.eventCountReported) } - if tc.expectedEventDispatchTime != reporter.EventDispatchTimeReported { - t.Errorf("Incorrect event dispatch time reported metric. Expected %v, Actual %v", tc.expectedEventDispatchTime, reporter.EventDispatchTimeReported) + if tc.expectedEventDispatchTime != reporter.eventDispatchTimeReported { + t.Errorf("Incorrect event dispatch time reported metric. Expected %v, Actual %v", tc.expectedEventDispatchTime, reporter.eventDispatchTimeReported) } // Compare the returned event. message := cehttp.NewMessageFromHttpResponse(response) @@ -766,23 +765,23 @@ func (r *responseWriterWithInvocationsCheck) WriteHeader(statusCode int) { } type mockReporter struct { - EventCountReported bool - EventDispatchTimeReported bool - EventProcessingTimeReported bool + eventCountReported bool + eventDispatchTimeReported bool + eventProcessingTimeReported bool } -func (r *mockReporter) ReportEventCount(args metrics.MetricArgs, responseCode int) error { - r.EventCountReported = true +func (r *mockReporter) ReportEventCount(args *ReportArgs, responseCode int) error { + r.eventCountReported = true return nil } -func (r *mockReporter) ReportEventDispatchTime(args metrics.MetricArgs, responseCode int, d time.Duration) error { - r.EventDispatchTimeReported = true +func (r *mockReporter) ReportEventDispatchTime(args *ReportArgs, responseCode int, d time.Duration) error { + r.eventDispatchTimeReported = true return nil } -func (r *mockReporter) ReportEventProcessingTime(args metrics.MetricArgs, d time.Duration) error { - r.EventProcessingTimeReported = true +func (r *mockReporter) ReportEventProcessingTime(args *ReportArgs, d time.Duration) error { + r.eventProcessingTimeReported = true return nil } diff --git a/pkg/broker/filter/stats_reporter.go b/pkg/broker/filter/stats_reporter.go new file mode 100644 index 00000000000..264e1d6b107 --- /dev/null +++ b/pkg/broker/filter/stats_reporter.go @@ -0,0 +1,205 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filter + +import ( + "context" + "log" + "strconv" + "time" + + "go.opencensus.io/resource" + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + broker "knative.dev/eventing/pkg/broker" + eventingmetrics "knative.dev/eventing/pkg/metrics" + "knative.dev/pkg/metrics" + "knative.dev/pkg/metrics/metricskey" +) + +const ( + // anyValue is the default value if the trigger filter attributes are empty. + anyValue = "any" +) + +var ( + // eventCountM is a counter which records the number of events received + // by a Trigger. + eventCountM = stats.Int64( + "event_count", + "Number of events received by a Trigger", + stats.UnitDimensionless, + ) + + // dispatchTimeInMsecM records the time spent dispatching an event to + // a Trigger subscriber, in milliseconds. + dispatchTimeInMsecM = stats.Float64( + "event_dispatch_latencies", + "The time spent dispatching an event to a Trigger subscriber", + stats.UnitMilliseconds, + ) + + // processingTimeInMsecM records the time spent between arrival at the Broker + // and the delivery to the Trigger subscriber. + processingTimeInMsecM = stats.Float64( + "event_processing_latencies", + "The time spent processing an event before it is dispatched to a Trigger subscriber", + stats.UnitMilliseconds, + ) + + // Create the tag keys that will be used to add tags to our measurements. + // Tag keys must conform to the restrictions described in + // go.opencensus.io/tag/validate.go. Currently those restrictions are: + // - length between 1 and 255 inclusive + // - characters are printable US-ASCII + triggerFilterTypeKey = tag.MustNewKey(eventingmetrics.LabelFilterType) + triggerFilterRequestTypeKey = tag.MustNewKey("filter_request_type") + triggerFilterRequestSchemeKey = tag.MustNewKey(eventingmetrics.LabelEventScheme) + responseCodeKey = tag.MustNewKey(eventingmetrics.LabelResponseCode) + responseCodeClassKey = tag.MustNewKey(eventingmetrics.LabelResponseCodeClass) +) + +type ReportArgs struct { + ns string + trigger string + broker string + filterType string + requestType string + requestScheme string +} + +func init() { + register() +} + +// StatsReporter defines the interface for sending filter metrics. +type StatsReporter interface { + ReportEventCount(args *ReportArgs, responseCode int) error + ReportEventDispatchTime(args *ReportArgs, responseCode int, d time.Duration) error + ReportEventProcessingTime(args *ReportArgs, d time.Duration) error +} + +var _ StatsReporter = (*reporter)(nil) +var emptyContext = context.Background() + +// reporter holds cached metric objects to report filter metrics. +type reporter struct { + container string + uniqueName string +} + +// NewStatsReporter creates a reporter that collects and reports filter metrics. +func NewStatsReporter(container, uniqueName string) StatsReporter { + return &reporter{ + container: container, + uniqueName: uniqueName, + } +} + +func register() { + // Create view to see our measurements. + err := metrics.RegisterResourceView( + &view.View{ + Description: eventCountM.Description(), + Measure: eventCountM, + Aggregation: view.Count(), + TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey}, + }, + &view.View{ + Description: dispatchTimeInMsecM.Description(), + Measure: dispatchTimeInMsecM, + Aggregation: view.Distribution(metrics.Buckets125(1, 10000)...), // 1, 2, 5, 10, 20, 50, 100, 1000, 5000, 10000 + TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey}, + }, + &view.View{ + Description: processingTimeInMsecM.Description(), + Measure: processingTimeInMsecM, + Aggregation: view.Distribution(metrics.Buckets125(1, 10000)...), // 1, 2, 5, 10, 20, 50, 100, 1000, 5000, 10000 + TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, broker.UniqueTagKey, broker.ContainerTagKey}, + }, + ) + if err != nil { + log.Printf("failed to register opencensus views, %s", err) + } +} + +// ReportEventCount captures the event count. +func (r *reporter) ReportEventCount(args *ReportArgs, responseCode int) error { + ctx, err := r.generateTag(args, + tag.Insert(responseCodeKey, strconv.Itoa(responseCode)), + tag.Insert(responseCodeClassKey, metrics.ResponseCodeClass(responseCode))) + if err != nil { + return err + } + metrics.Record(ctx, eventCountM.M(1)) + return nil +} + +// ReportEventDispatchTime captures dispatch times. +func (r *reporter) ReportEventDispatchTime(args *ReportArgs, responseCode int, d time.Duration) error { + ctx, err := r.generateTag(args, + tag.Insert(responseCodeKey, strconv.Itoa(responseCode)), + tag.Insert(responseCodeClassKey, metrics.ResponseCodeClass(responseCode))) + if err != nil { + return err + } + // convert time.Duration in nanoseconds to milliseconds. + metrics.Record(ctx, dispatchTimeInMsecM.M(float64(d/time.Millisecond))) + return nil +} + +// ReportEventProcessingTime captures event processing times. +func (r *reporter) ReportEventProcessingTime(args *ReportArgs, d time.Duration) error { + ctx, err := r.generateTag(args) + if err != nil { + return err + } + + // convert time.Duration in nanoseconds to milliseconds. + metrics.Record(ctx, processingTimeInMsecM.M(float64(d/time.Millisecond))) + return nil +} + +func (r *reporter) generateTag(args *ReportArgs, tags ...tag.Mutator) (context.Context, error) { + ctx := metricskey.WithResource(emptyContext, resource.Resource{ + Type: eventingmetrics.ResourceTypeKnativeTrigger, + Labels: map[string]string{ + eventingmetrics.LabelNamespaceName: args.ns, + eventingmetrics.LabelBrokerName: args.broker, + eventingmetrics.LabelTriggerName: args.trigger, + }, + }) + // Note that filterType and filterSource can be empty strings, so they need a special treatment. + ctx, err := tag.New( + ctx, + append(tags, + tag.Insert(broker.ContainerTagKey, r.container), + tag.Insert(broker.UniqueTagKey, r.uniqueName), + tag.Insert(triggerFilterTypeKey, valueOrAny(args.filterType)), + tag.Insert(triggerFilterRequestTypeKey, args.requestType), + tag.Insert(triggerFilterRequestSchemeKey, args.requestScheme), + )...) + return ctx, err +} + +func valueOrAny(v string) string { + if v != "" { + return v + } + return anyValue +} diff --git a/pkg/broker/filter/stats_reporter_test.go b/pkg/broker/filter/stats_reporter_test.go index 4045107e5f3..1033a7efe48 100644 --- a/pkg/broker/filter/stats_reporter_test.go +++ b/pkg/broker/filter/stats_reporter_test.go @@ -17,14 +17,10 @@ limitations under the License. package filter import ( - "context" "net/http" "testing" "time" - "go.opencensus.io/tag" - "knative.dev/pkg/metrics/metricskey" - "go.opencensus.io/resource" broker "knative.dev/eventing/pkg/broker" "knative.dev/eventing/pkg/metrics" @@ -32,50 +28,16 @@ import ( _ "knative.dev/pkg/metrics/testing" ) -type TestArgs struct { - ns string - trigger string - broker string - filterType string - requestType string - requestScheme string - container string - uniqueName string -} - -func (args *TestArgs) GenerateTag(tags ...tag.Mutator) (context.Context, error) { - ctx := metricskey.WithResource(metrics.EmptyContext, resource.Resource{ - Type: metrics.ResourceTypeKnativeTrigger, - Labels: map[string]string{ - metrics.LabelNamespaceName: args.ns, - metrics.LabelBrokerName: args.broker, - metrics.LabelTriggerName: args.trigger, - }, - }) - // Note that filterType and filterSource can be empty strings, so they need a special treatment. - ctx, err := tag.New( - ctx, - append(tags, - tag.Insert(broker.ContainerTagKey, args.container), - tag.Insert(broker.UniqueTagKey, args.uniqueName), - tag.Insert(triggerFilterTypeKey, metrics.ValueOrAny(args.filterType)), - tag.Insert(triggerFilterRequestTypeKey, args.requestType), - tag.Insert(triggerFilterRequestSchemeKey, args.requestScheme), - )...) - return ctx, err -} func TestStatsReporter(t *testing.T) { setup() - args := &TestArgs{ + args := &ReportArgs{ ns: "testns", trigger: "testtrigger", broker: "testbroker", filterType: "testeventtype", - container: "testcontainer", - uniqueName: "testpod", } - r := metrics.NewStatsReporter("testcontainer", "testpod") + r := NewStatsReporter("testcontainer", "testpod") wantTags := map[string]string{ metrics.LabelFilterType: "testeventtype", @@ -133,20 +95,18 @@ func TestStatsReporter(t *testing.T) { func TestReporterEmptySourceAndTypeFilter(t *testing.T) { setup() - args := &TestArgs{ + args := &ReportArgs{ ns: "testns", trigger: "testtrigger", broker: "testbroker", filterType: "", requestScheme: "http", - container: "testcontainer", - uniqueName: "testpod", } - r := metrics.NewStatsReporter("testcontainer", "testpod") + r := NewStatsReporter("testcontainer", "testpod") wantTags := map[string]string{ - metrics.LabelFilterType: metrics.AnyValue, + metrics.LabelFilterType: anyValue, metrics.LabelResponseCode: "202", metrics.LabelResponseCodeClass: "2xx", broker.LabelContainerName: "testcontainer", @@ -196,5 +156,5 @@ func resetMetrics() { "event_count", "event_dispatch_latencies", "event_processing_latencies") - metrics.Register() + register() } From ea78fc904fed4507e7c2c92496f1b1630ea3c39b Mon Sep 17 00:00:00 2001 From: Leo HC Li <36619969+Leo6Leo@users.noreply.github.com> Date: Wed, 14 Aug 2024 05:13:51 -0400 Subject: [PATCH 6/7] fix: add boilerplate --- pkg/metrics/stats_repoter_test.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/pkg/metrics/stats_repoter_test.go b/pkg/metrics/stats_repoter_test.go index 71187c0abf6..ce2f717cf08 100644 --- a/pkg/metrics/stats_repoter_test.go +++ b/pkg/metrics/stats_repoter_test.go @@ -1,3 +1,18 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ package metrics import ( From e1aa68ae3fed11edb934c51d68a036c935b9aef0 Mon Sep 17 00:00:00 2001 From: Leo HC Li <36619969+Leo6Leo@users.noreply.github.com> Date: Thu, 15 Aug 2024 01:24:57 -0400 Subject: [PATCH 7/7] fix: update the statsreporter --- pkg/metrics/stats_reporter.go | 122 ++------------------------ pkg/metrics/stats_repoter_test.go | 139 +++++++++++++----------------- 2 files changed, 67 insertions(+), 194 deletions(-) diff --git a/pkg/metrics/stats_reporter.go b/pkg/metrics/stats_reporter.go index 11a91dc200e..987387a6dd1 100644 --- a/pkg/metrics/stats_reporter.go +++ b/pkg/metrics/stats_reporter.go @@ -19,8 +19,6 @@ package metrics import ( "context" "log" - "strconv" - "time" "go.opencensus.io/stats" "go.opencensus.io/stats/view" @@ -28,37 +26,6 @@ import ( "knative.dev/pkg/metrics" ) -const ( - // anyValue is the default value if the trigger filter attributes are empty. - AnyValue = "any" -) - -var ( - // eventCountM is a counter which records the number of events received - // by a Trigger. - eventCountM = stats.Int64( - "event_count", - "Number of events received by a Trigger", - stats.UnitDimensionless, - ) - - // dispatchTimeInMsecM records the time spent dispatching an event to - // a Trigger subscriber, in milliseconds. - dispatchTimeInMsecM = stats.Float64( - "event_dispatch_latencies", - "The time spent dispatching an event to a Trigger subscriber", - stats.UnitMilliseconds, - ) - - // Create the tag keys that will be used to add tags to our measurements. - // Tag keys must conform to the restrictions described in - // go.opencensus.io/tag/validate.go. Currently those restrictions are: - // - length between 1 and 255 inclusive - // - characters are printable US-ASCII - responseCodeKey = tag.MustNewKey(LabelResponseCode) - responseCodeClassKey = tag.MustNewKey(LabelResponseCodeClass) -) - type MetricArgs interface { GenerateTag(tags ...tag.Mutator) (context.Context, error) } @@ -69,104 +36,25 @@ func init() { // StatsReporter defines the interface for sending filter metrics. type StatsReporter interface { - ReportEventCount(args MetricArgs, responseCode int) error - ReportEventDispatchTime(args MetricArgs, responseCode int, d time.Duration) error -} - -var _ StatsReporter = (*reporter)(nil) -var EmptyContext = context.Background() - -// reporter holds cached metric objects to report filter metrics. -type reporter struct { - container string - uniqueName string -} - -// NewStatsReporter creates a reporter that collects and reports filter metrics. -func NewStatsReporter(container, uniqueName string) StatsReporter { - return &reporter{ - container: container, - uniqueName: uniqueName, - } } func Register(customMetrics []stats.Measure, customViews []*view.View, customTagKeys ...tag.Key) { - commonTagKeys := []tag.Key{responseCodeKey, responseCodeClassKey, tag.MustNewKey("unique_name"), tag.MustNewKey("container_name")} - allTagKeys := append(commonTagKeys, customTagKeys...) - - defaultViews := []*view.View{ - { - Description: eventCountM.Description(), - Measure: eventCountM, - Aggregation: view.Count(), - TagKeys: allTagKeys, - }, - { - Description: dispatchTimeInMsecM.Description(), - Measure: dispatchTimeInMsecM, - Aggregation: view.Distribution(metrics.Buckets125(1, 10000)...), - TagKeys: allTagKeys, - }, - } + allTagKeys := append(customTagKeys) + allViews := append(customViews) // Add custom views for custom metrics for _, metric := range customMetrics { - defaultViews = append(defaultViews, &view.View{ + allViews = append(allViews, &view.View{ Description: metric.Description(), + Name: metric.Name(), Measure: metric, - Aggregation: view.LastValue(), // You can change this aggregation as needed + Aggregation: view.LastValue(), TagKeys: allTagKeys, }) } // Append custom views - allViews := append(defaultViews, customViews...) - if err := metrics.RegisterResourceView(allViews...); err != nil { log.Printf("Failed to register opencensus views: %v", err) } } - -// ReportEventCount captures the event count. -func (r *reporter) ReportEventCount(args MetricArgs, responseCode int) error { - // Create base tags - baseTags := []tag.Mutator{ - tag.Insert(responseCodeKey, strconv.Itoa(responseCode)), - tag.Insert(responseCodeClassKey, metrics.ResponseCodeClass(responseCode)), - } - - // Generate context with all tags, including any custom ones from args.GenerateTag - ctx, err := args.GenerateTag(baseTags...) - if err != nil { - return err - } - - metrics.Record(ctx, eventCountM.M(1)) - return nil -} - -// ReportEventDispatchTime captures dispatch times. -func (r *reporter) ReportEventDispatchTime(args MetricArgs, responseCode int, d time.Duration) error { - // Create base tags - baseTags := []tag.Mutator{ - tag.Insert(responseCodeKey, strconv.Itoa(responseCode)), - tag.Insert(responseCodeClassKey, metrics.ResponseCodeClass(responseCode)), - } - - // Generate context with all tags, including any custom ones from args.GenerateTag - ctx, err := args.GenerateTag(baseTags...) - if err != nil { - return err - } - - // convert time.Duration in nanoseconds to milliseconds. - metrics.Record(ctx, dispatchTimeInMsecM.M(float64(d/time.Millisecond))) - return nil -} - -func ValueOrAny(v string) string { - if v != "" { - return v - } - return AnyValue -} diff --git a/pkg/metrics/stats_repoter_test.go b/pkg/metrics/stats_repoter_test.go index ce2f717cf08..8a7d1d8aa12 100644 --- a/pkg/metrics/stats_repoter_test.go +++ b/pkg/metrics/stats_repoter_test.go @@ -13,48 +13,70 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -package metrics +package metrics_test import ( "context" - "net/http" "testing" - "time" + "go.opencensus.io/resource" "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" + eventingmetrics "knative.dev/eventing/pkg/metrics" + "knative.dev/pkg/metrics/metricskey" "knative.dev/pkg/metrics/metricstest" _ "knative.dev/pkg/metrics/testing" ) -type testMetricArgs struct{} +type testMetricArgs struct { + ns string + trigger string + broker string + testParam string +} -func (t *testMetricArgs) GenerateTag(tags ...tag.Mutator) (context.Context, error) { - return tag.New(EmptyContext, append(tags, - tag.Insert(tag.MustNewKey("unique_name"), "testpod"), - tag.Insert(tag.MustNewKey("container_name"), "testcontainer"), - )...) +func (args *testMetricArgs) GenerateTag(tags ...tag.Mutator) (context.Context, error) { + ctx := metricskey.WithResource(context.Background(), resource.Resource{ + Type: eventingmetrics.ResourceTypeKnativeTrigger, + Labels: map[string]string{ + eventingmetrics.LabelNamespaceName: args.ns, + eventingmetrics.LabelBrokerName: args.broker, + eventingmetrics.LabelTriggerName: args.trigger, + }, + }) + ctx, err := tag.New( + ctx, + append(tags, + tag.Insert(customTagKey, args.testParam), + )...) + return ctx, err } -// Custom metric for testing -var customMetricM = stats.Int64( - "custom_metric", - "A custom metric for testing", - stats.UnitDimensionless, +var ( + customMetricM = stats.Int64( + "custom_metric", + "A custom metric for testing", + stats.UnitDimensionless, + ) + + customTagKey = tag.MustNewKey("custom_tag") ) -// Custom tag for testing -var customTagKey = tag.MustNewKey("custom_tag") +// StatsReporter interface definition +type StatsReporter interface { + eventingmetrics.StatsReporter + ReportCustomMetric(args eventingmetrics.MetricArgs, value int64) error +} -// add ReportEventCountRetry to StatsReporter -type statsReporterTester interface { - StatsReporter - ReportCustomMetric(args MetricArgs, value int64) error +// reporter struct definition +type reporter struct { + container string + uniqueName string } // ReportCustomMetric records a custom metric value. -func (r *reporter) ReportCustomMetric(args MetricArgs, value int64) error { +func (r *reporter) ReportCustomMetric(args eventingmetrics.MetricArgs, value int64) error { // Create base tags baseTags := []tag.Mutator{ tag.Insert(tag.MustNewKey("unique_name"), r.uniqueName), @@ -71,61 +93,27 @@ func (r *reporter) ReportCustomMetric(args MetricArgs, value int64) error { stats.Record(ctx, customMetricM.M(value)) return nil } -func NewStatsReporterTester(container, uniqueName string) statsReporterTester { - return &reporter{ - container: container, - uniqueName: uniqueName, - } -} -func TestStatsReporter(t *testing.T) { +func TestRegisterCustomMetrics(t *testing.T) { setup() - args := &testMetricArgs{} - r := NewStatsReporterTester("testcontainer", "testpod") - - wantTags := map[string]string{ - "response_code": "202", - "response_code_class": "2xx", - "unique_name": "testpod", - "container_name": "testcontainer", + args := &testMetricArgs{ + ns: "test-namespace", + trigger: "test-trigger", + broker: "test-broker", + testParam: "test-param", + } + r := &reporter{ + container: "testcontainer", + uniqueName: "testpod", } - - // Test ReportEventCount - expectSuccess(t, func() error { - return r.ReportEventCount(args, http.StatusAccepted) - }) - expectSuccess(t, func() error { - return r.ReportEventCount(args, http.StatusAccepted) - }) - metricstest.CheckCountData(t, "event_count", wantTags, 2) - - // Test ReportEventDispatchTime - expectSuccess(t, func() error { - return r.ReportEventDispatchTime(args, http.StatusAccepted, 1100*time.Millisecond) - }) - expectSuccess(t, func() error { - return r.ReportEventDispatchTime(args, http.StatusAccepted, 9100*time.Millisecond) - }) - metricstest.CheckDistributionData(t, "event_dispatch_latencies", wantTags, 2, 1100.0, 9100.0) - -} - -func TestRegisterCustomMetrics(t *testing.T) { - setup() customMetrics := []stats.Measure{customMetricM} - customViews := []*view.View{ - { - Description: customMetricM.Description(), - Measure: customMetricM, - Aggregation: view.LastValue(), - TagKeys: []tag.Key{customTagKey}, - }, - } + customTagKeys := []tag.Key{customTagKey} - Register(customMetrics, customViews, customTagKeys...) + // No need for customViews, as the custom metric will be used to create the view + eventingmetrics.Register(customMetrics, nil, customTagKeys...) // Verify that the custom view is registered if v := view.Find("custom_metric"); v == nil { @@ -133,11 +121,12 @@ func TestRegisterCustomMetrics(t *testing.T) { } // Record a value for the custom metric - ctx, _ := tag.New(context.Background(), tag.Insert(customTagKey, "test_value")) - stats.Record(ctx, customMetricM.M(100)) + expectSuccess(t, func() error { + return r.ReportCustomMetric(args, 100) + }) // Check if the value was recorded correctly - metricstest.CheckLastValueData(t, "custom_metric", map[string]string{"custom_tag": "test_value"}, 100) + metricstest.CheckLastValueData(t, "custom_metric", map[string]string{"custom_tag": "test-param"}, 100) } func expectSuccess(t *testing.T, f func() error) { @@ -152,10 +141,6 @@ func setup() { } func resetMetrics() { - metricstest.Unregister( - "event_count", - "event_dispatch_latencies", - "event_processing_latencies", - "custom_metric") - Register([]stats.Measure{}, nil) + metricstest.Unregister("custom_metric") + eventingmetrics.Register([]stats.Measure{}, nil) }