From df83a87b29e3744d4962a76c772bbc9b6e868e18 Mon Sep 17 00:00:00 2001 From: Leo Li Date: Thu, 20 Jun 2024 01:03:27 -0400 Subject: [PATCH 1/7] feat: add the unauthenticate count and invalid token count for broker filter ingress. --- pkg/auth/token_verifier.go | 6 +++ pkg/broker/filter/filter_handler.go | 20 +++++++++ pkg/broker/filter/filter_handler_test.go | 56 +++++++++++++++++++----- pkg/broker/filter/stats_reporter.go | 48 ++++++++++++++++++++ 4 files changed, 118 insertions(+), 12 deletions(-) diff --git a/pkg/auth/token_verifier.go b/pkg/auth/token_verifier.go index 5571b67f2b1..d1fc7cdbd05 100644 --- a/pkg/auth/token_verifier.go +++ b/pkg/auth/token_verifier.go @@ -19,6 +19,7 @@ package auth import ( "context" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -36,6 +37,11 @@ const ( kubernetesOIDCDiscoveryBaseURL = "https://kubernetes.default.svc" ) +var ( + ErrNoJWTTokenFound = errors.New("no JWT token found in request") + ErrInvalidJWTToken = errors.New("invalid JWT token") +) + type OIDCTokenVerifier struct { logger *zap.SugaredLogger restConfig *rest.Config diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index 5276b6f0a5e..3a16919b927 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -207,6 +207,26 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { err = h.tokenVerifier.VerifyJWTFromRequest(ctx, request, &audience, writer) if err != nil { h.logger.Warn("Error when validating the JWT token in the request", zap.Error(err)) + + // If the error is due to the absence of a JWT token, report it as an unauthenticated request. + // The error message is "no JWT token found in request" + if errors.Is(err, auth.ErrNoJWTTokenFound) { + _ = h.reporter.ReportUnauthenticatedRequest(&ReportArgs{ + ns: trigger.Namespace, + trigger: trigger.Name, + broker: trigger.Spec.Broker, + requestType: "filter", + }) + } else { + // If the error is due to an invalid JWT token, report it as an invalid token request. + _ = h.reporter.ReportInvalidTokenRequest(&ReportArgs{ + ns: trigger.Namespace, + trigger: trigger.Name, + broker: trigger.Spec.Broker, + requestType: "filter", + }) + } + return } diff --git a/pkg/broker/filter/filter_handler_test.go b/pkg/broker/filter/filter_handler_test.go index 1e94b43a568..c326c9a2b63 100644 --- a/pkg/broker/filter/filter_handler_test.go +++ b/pkg/broker/filter/filter_handler_test.go @@ -88,15 +88,17 @@ func TestReceiver(t *testing.T) { additionalReplyHeaders http.Header // expectations - expectedResponseEvent *cloudevents.Event - expectedResponse *http.Response - expectedDispatch bool - expectedStatus int - expectedHeaders http.Header - expectedEventCount bool - expectedEventDispatchTime bool - expectedEventProcessingTime bool - expectedResponseHeaders http.Header + expectedResponseEvent *cloudevents.Event + expectedResponse *http.Response + expectedDispatch bool + expectedStatus int + expectedHeaders http.Header + expectedEventCount bool + expectedUnauthenticatedCount bool + expectedInvalidTokenCount bool + expectedEventDispatchTime bool + expectedEventProcessingTime bool + expectedResponseHeaders http.Header }{ "Not POST": { request: httptest.NewRequest(http.MethodGet, validPath, nil), @@ -416,6 +418,18 @@ func TestReceiver(t *testing.T) { additionalReplyHeaders: http.Header{"Retry-After": []string{"10"}, "Test-Header": []string{"TestValue"}}, expectedResponseHeaders: http.Header{"Retry-After": []string{"10"}}, }, + "Request with a invalid token": { + triggers: []*eventingv1.Trigger{ + makeTrigger(withAttributesFilter(&eventingv1.TriggerFilter{})), + }, + request: httptest.NewRequest(http.MethodPost, validPath, nil), + requestFails: true, + failureStatus: http.StatusUnauthorized, + expectedDispatch: false, + expectedStatus: http.StatusUnauthorized, + expectedEventCount: false, + expectedUnauthenticatedCount: true, + }, } for n, tc := range testCases { t.Run(n, func(t *testing.T) { @@ -517,6 +531,12 @@ func TestReceiver(t *testing.T) { if tc.expectedEventCount != reporter.eventCountReported { t.Errorf("Incorrect event count reported metric. Expected %v, Actual %v", tc.expectedEventCount, reporter.eventCountReported) } + if tc.expectedUnauthenticatedCount != reporter.unauthenticatedCountReported { + t.Errorf("Incorrect unauthenticated count reported metric. Expected %v, Actual %v", tc.expectedUnauthenticatedCount, reporter.unauthenticatedCountReported) + } + if tc.expectedInvalidTokenCount != reporter.invalidTokenCountReported { + t.Errorf("Incorrect invalid token count reported metric. Expected %v, Actual %v", tc.expectedInvalidTokenCount, reporter.invalidTokenCountReported) + } if tc.expectedEventDispatchTime != reporter.eventDispatchTimeReported { t.Errorf("Incorrect event dispatch time reported metric. Expected %v, Actual %v", tc.expectedEventDispatchTime, reporter.eventDispatchTimeReported) } @@ -742,9 +762,11 @@ func (r *responseWriterWithInvocationsCheck) WriteHeader(statusCode int) { } type mockReporter struct { - eventCountReported bool - eventDispatchTimeReported bool - eventProcessingTimeReported bool + eventCountReported bool + unauthenticatedCountReported bool + invalidTokenCountReported bool + eventDispatchTimeReported bool + eventProcessingTimeReported bool } func (r *mockReporter) ReportEventCount(args *ReportArgs, responseCode int) error { @@ -752,6 +774,16 @@ func (r *mockReporter) ReportEventCount(args *ReportArgs, responseCode int) erro return nil } +func (r *mockReporter) ReportUnauthenticatedRequest(args *ReportArgs) error { + r.unauthenticatedCountReported = true + return nil +} + +func (r *mockReporter) ReportInvalidTokenRequest(args *ReportArgs) error { + r.invalidTokenCountReported = true + return nil +} + func (r *mockReporter) ReportEventDispatchTime(args *ReportArgs, responseCode int, d time.Duration) error { r.eventDispatchTimeReported = true return nil diff --git a/pkg/broker/filter/stats_reporter.go b/pkg/broker/filter/stats_reporter.go index c2df9b3c9b5..2352aae9410 100644 --- a/pkg/broker/filter/stats_reporter.go +++ b/pkg/broker/filter/stats_reporter.go @@ -46,6 +46,20 @@ var ( stats.UnitDimensionless, ) + // unauthenticatedRequestsM records the number of unauthenticated requests. (Without the JWT token in the header) + unauthenticatedRequestsM = stats.Int64( + "unauthenticated_requests", + "Number of unauthenticated requests (No JWT token found in the header)", + stats.UnitDimensionless, + ) + + // invalidTokenRequestsM records the number of requests with invalid tokens. + invalidTokenRequestsM = stats.Int64( + "invalid_token_requests", + "Number of requests with invalid tokens", + stats.UnitDimensionless, + ) + // dispatchTimeInMsecM records the time spent dispatching an event to // a Trigger subscriber, in milliseconds. dispatchTimeInMsecM = stats.Float64( @@ -90,6 +104,8 @@ func init() { // StatsReporter defines the interface for sending filter metrics. type StatsReporter interface { ReportEventCount(args *ReportArgs, responseCode int) error + ReportUnauthenticatedRequest(args *ReportArgs) error + ReportInvalidTokenRequest(args *ReportArgs) error ReportEventDispatchTime(args *ReportArgs, responseCode int, d time.Duration) error ReportEventProcessingTime(args *ReportArgs, d time.Duration) error } @@ -120,6 +136,18 @@ func register() { Aggregation: view.Count(), TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey}, }, + &view.View{ + Description: unauthenticatedRequestsM.Description(), + Measure: unauthenticatedRequestsM, + Aggregation: view.Count(), + TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, broker.UniqueTagKey, broker.ContainerTagKey}, + }, + &view.View{ + Description: invalidTokenRequestsM.Description(), + Measure: invalidTokenRequestsM, + Aggregation: view.Count(), + TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, broker.UniqueTagKey, broker.ContainerTagKey}, + }, &view.View{ Description: dispatchTimeInMsecM.Description(), Measure: dispatchTimeInMsecM, @@ -150,6 +178,26 @@ func (r *reporter) ReportEventCount(args *ReportArgs, responseCode int) error { return nil } +// ReportUnauthenticatedRequest captures unauthenticated requests. (The requests that do not have JWT token in the header) +func (r *reporter) ReportUnauthenticatedRequest(args *ReportArgs) error { + ctx, err := r.generateTag(args) + if err != nil { + return err + } + metrics.Record(ctx, unauthenticatedRequestsM.M(1)) + return nil +} + +// ReportInvalidTokenRequest captures requests with invalid tokens. +func (r *reporter) ReportInvalidTokenRequest(args *ReportArgs) error { + ctx, err := r.generateTag(args) + if err != nil { + return err + } + metrics.Record(ctx, invalidTokenRequestsM.M(1)) + return nil +} + // ReportEventDispatchTime captures dispatch times. func (r *reporter) ReportEventDispatchTime(args *ReportArgs, responseCode int, d time.Duration) error { ctx, err := r.generateTag(args, From bb3985424cf1090f2671524667a85c746b87a307 Mon Sep 17 00:00:00 2001 From: Leo Li Date: Fri, 21 Jun 2024 14:05:40 -0400 Subject: [PATCH 2/7] fix: remove the unit test for the stat reporter, as OIDC cannot be tested here. --- pkg/broker/filter/filter_handler_test.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/pkg/broker/filter/filter_handler_test.go b/pkg/broker/filter/filter_handler_test.go index c326c9a2b63..0ed399b3273 100644 --- a/pkg/broker/filter/filter_handler_test.go +++ b/pkg/broker/filter/filter_handler_test.go @@ -418,18 +418,6 @@ func TestReceiver(t *testing.T) { additionalReplyHeaders: http.Header{"Retry-After": []string{"10"}, "Test-Header": []string{"TestValue"}}, expectedResponseHeaders: http.Header{"Retry-After": []string{"10"}}, }, - "Request with a invalid token": { - triggers: []*eventingv1.Trigger{ - makeTrigger(withAttributesFilter(&eventingv1.TriggerFilter{})), - }, - request: httptest.NewRequest(http.MethodPost, validPath, nil), - requestFails: true, - failureStatus: http.StatusUnauthorized, - expectedDispatch: false, - expectedStatus: http.StatusUnauthorized, - expectedEventCount: false, - expectedUnauthenticatedCount: true, - }, } for n, tc := range testCases { t.Run(n, func(t *testing.T) { From 2878e9233163c626261dd8f83d5ae7b7a2d96b72 Mon Sep 17 00:00:00 2001 From: Leo Li Date: Fri, 21 Jun 2024 15:34:04 -0400 Subject: [PATCH 3/7] fix: pass the reporter to the token verifier --- cmd/broker/filter/main.go | 2 +- cmd/jobsink/main.go | 6 ++-- pkg/auth/token_verifier.go | 27 +++++++++++++----- pkg/broker/filter/filter_handler.go | 36 ++++++++---------------- pkg/broker/filter/stats_reporter.go | 4 +-- pkg/broker/filter/stats_reporter_test.go | 4 +-- 6 files changed, 40 insertions(+), 39 deletions(-) diff --git a/cmd/broker/filter/main.go b/cmd/broker/filter/main.go index 167a0e888d6..ba0bf1dc3d0 100644 --- a/cmd/broker/filter/main.go +++ b/cmd/broker/filter/main.go @@ -151,7 +151,7 @@ func main() { oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) // We are running both the receiver (takes messages in from the Broker) and the dispatcher (send // the messages to the triggers' subscribers) in this binary. - oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx) + oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx, reporter) trustBundleConfigMapInformer := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace()) handler, err = filter.NewHandler(logger, oidcTokenVerifier, oidcTokenProvider, triggerinformer.Get(ctx), brokerinformer.Get(ctx), reporter, trustBundleConfigMapInformer, ctxFunc) if err != nil { diff --git a/cmd/jobsink/main.go b/cmd/jobsink/main.go index 0f8c99646b4..dbefbc94c91 100644 --- a/cmd/jobsink/main.go +++ b/cmd/jobsink/main.go @@ -117,7 +117,7 @@ func main() { k8s: kubeclient.Get(ctx), lister: jobsink.Get(ctx).Lister(), withContext: ctxFunc, - oidcTokenVerifier: auth.NewOIDCTokenVerifier(ctx), + oidcTokenVerifier: auth.NewOIDCTokenVerifier(ctx, nil), } tlsConfig, err := getServerTLSConfig(ctx) @@ -201,7 +201,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { audience := auth.GetAudienceDirect(sinksv.SchemeGroupVersion.WithKind("JobSink"), ref.Namespace, ref.Name) - err := h.oidcTokenVerifier.VerifyJWTFromRequest(ctx, r, &audience, w) + err := h.oidcTokenVerifier.VerifyJWTFromRequest(ctx, r, &audience, w, nil) if err != nil { logger.Warn("Error when validating the JWT token in the request", zap.Error(err)) return @@ -382,7 +382,7 @@ func (h *Handler) handleGet(ctx context.Context, w http.ResponseWriter, r *http. audience := auth.GetAudienceDirect(sinksv.SchemeGroupVersion.WithKind("JobSink"), ref.Namespace, ref.Name) - err := h.oidcTokenVerifier.VerifyJWTFromRequest(ctx, r, &audience, w) + err := h.oidcTokenVerifier.VerifyJWTFromRequest(ctx, r, &audience, w, nil) if err != nil { logger.Warn("Error when validating the JWT token in the request", zap.Error(err)) return diff --git a/pkg/auth/token_verifier.go b/pkg/auth/token_verifier.go index d1fc7cdbd05..c05a16ef2f5 100644 --- a/pkg/auth/token_verifier.go +++ b/pkg/auth/token_verifier.go @@ -29,6 +29,7 @@ import ( "go.uber.org/zap" "k8s.io/client-go/rest" "knative.dev/eventing/pkg/apis/feature" + "knative.dev/eventing/pkg/broker/filter" "knative.dev/pkg/injection" "knative.dev/pkg/logging" ) @@ -43,9 +44,10 @@ var ( ) type OIDCTokenVerifier struct { - logger *zap.SugaredLogger - restConfig *rest.Config - provider *oidc.Provider + logger *zap.SugaredLogger + restConfig *rest.Config + provider *oidc.Provider + statsReporter filter.StatsReporter } type IDToken struct { @@ -57,10 +59,11 @@ type IDToken struct { AccessTokenHash string } -func NewOIDCTokenVerifier(ctx context.Context) *OIDCTokenVerifier { +func NewOIDCTokenVerifier(ctx context.Context, statsReporter filter.StatsReporter) *OIDCTokenVerifier { tokenHandler := &OIDCTokenVerifier{ - logger: logging.FromContext(ctx).With("component", "oidc-token-handler"), - restConfig: injection.GetConfig(ctx), + logger: logging.FromContext(ctx).With("component", "oidc-token-handler"), + restConfig: injection.GetConfig(ctx), + statsReporter: statsReporter, } if err := tokenHandler.initOIDCProvider(ctx); err != nil { @@ -158,10 +161,16 @@ func (c *OIDCTokenVerifier) getKubernetesOIDCDiscovery() (*openIDMetadata, error } // VerifyJWTFromRequest will verify the incoming request contains the correct JWT token -func (tokenVerifier *OIDCTokenVerifier) VerifyJWTFromRequest(ctx context.Context, r *http.Request, audience *string, response http.ResponseWriter) error { +func (tokenVerifier *OIDCTokenVerifier) VerifyJWTFromRequest(ctx context.Context, r *http.Request, audience *string, response http.ResponseWriter, reportArgs *filter.ReportArgs) error { token := GetJWTFromHeader(r.Header) + if token == "" { response.WriteHeader(http.StatusUnauthorized) + + if tokenVerifier.statsReporter != nil { + tokenVerifier.statsReporter.ReportUnauthenticatedRequest(reportArgs) + } + return fmt.Errorf("no JWT token found in request") } @@ -172,6 +181,10 @@ func (tokenVerifier *OIDCTokenVerifier) VerifyJWTFromRequest(ctx context.Context if _, err := tokenVerifier.VerifyJWT(ctx, token, *audience); err != nil { response.WriteHeader(http.StatusUnauthorized) + + if tokenVerifier.statsReporter != nil { + tokenVerifier.statsReporter.ReportInvalidTokenRequest(reportArgs) + } return fmt.Errorf("failed to verify JWT: %w", err) } diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index 3a16919b927..43e66bd6d33 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -204,29 +204,17 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { audience := FilterAudience - err = h.tokenVerifier.VerifyJWTFromRequest(ctx, request, &audience, writer) - if err != nil { - h.logger.Warn("Error when validating the JWT token in the request", zap.Error(err)) + reportArgs := &ReportArgs{ + Ns: trigger.Namespace, + trigger: trigger.Name, + broker: trigger.Spec.Broker, + requestType: "filter", + } - // If the error is due to the absence of a JWT token, report it as an unauthenticated request. - // The error message is "no JWT token found in request" - if errors.Is(err, auth.ErrNoJWTTokenFound) { - _ = h.reporter.ReportUnauthenticatedRequest(&ReportArgs{ - ns: trigger.Namespace, - trigger: trigger.Name, - broker: trigger.Spec.Broker, - requestType: "filter", - }) - } else { - // If the error is due to an invalid JWT token, report it as an invalid token request. - _ = h.reporter.ReportInvalidTokenRequest(&ReportArgs{ - ns: trigger.Namespace, - trigger: trigger.Name, - broker: trigger.Spec.Broker, - requestType: "filter", - }) - } + err = h.tokenVerifier.VerifyJWTFromRequest(ctx, request, &audience, writer, reportArgs) + if err != nil { + h.logger.Warn("Error when validating the JWT token in the request", zap.Error(err)) return } @@ -266,7 +254,7 @@ func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eve target := broker.Status.Address reportArgs := &ReportArgs{ - ns: trigger.Namespace, + Ns: trigger.Namespace, trigger: trigger.Name, broker: brokerRef, requestType: "reply_forward", @@ -317,7 +305,7 @@ func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *event } reportArgs := &ReportArgs{ - ns: trigger.Namespace, + Ns: trigger.Namespace, trigger: trigger.Name, broker: trigger.Spec.Broker, requestType: "dls_forward", @@ -364,7 +352,7 @@ func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger } reportArgs := &ReportArgs{ - ns: trigger.Namespace, + Ns: trigger.Namespace, trigger: trigger.Name, broker: brokerRef, filterType: triggerFilterAttribute(trigger.Spec.Filter, "type"), diff --git a/pkg/broker/filter/stats_reporter.go b/pkg/broker/filter/stats_reporter.go index 2352aae9410..31f78cf3e0b 100644 --- a/pkg/broker/filter/stats_reporter.go +++ b/pkg/broker/filter/stats_reporter.go @@ -89,7 +89,7 @@ var ( ) type ReportArgs struct { - ns string + Ns string trigger string broker string filterType string @@ -227,7 +227,7 @@ func (r *reporter) generateTag(args *ReportArgs, tags ...tag.Mutator) (context.C ctx := metricskey.WithResource(emptyContext, resource.Resource{ Type: eventingmetrics.ResourceTypeKnativeTrigger, Labels: map[string]string{ - eventingmetrics.LabelNamespaceName: args.ns, + eventingmetrics.LabelNamespaceName: args.Ns, eventingmetrics.LabelBrokerName: args.broker, eventingmetrics.LabelTriggerName: args.trigger, }, diff --git a/pkg/broker/filter/stats_reporter_test.go b/pkg/broker/filter/stats_reporter_test.go index da1635dfe0f..67d514cf642 100644 --- a/pkg/broker/filter/stats_reporter_test.go +++ b/pkg/broker/filter/stats_reporter_test.go @@ -31,7 +31,7 @@ import ( func TestStatsReporter(t *testing.T) { setup() args := &ReportArgs{ - ns: "testns", + Ns: "testns", trigger: "testtrigger", broker: "testbroker", filterType: "testeventtype", @@ -96,7 +96,7 @@ func TestReporterEmptySourceAndTypeFilter(t *testing.T) { setup() args := &ReportArgs{ - ns: "testns", + Ns: "testns", trigger: "testtrigger", broker: "testbroker", filterType: "", From b44273f7f6b44d2650040ee7ba2822eb60c248e6 Mon Sep 17 00:00:00 2001 From: Leo Li Date: Fri, 21 Jun 2024 15:36:35 -0400 Subject: [PATCH 4/7] fix: revert the error change --- pkg/broker/filter/filter_handler.go | 8 ++++---- pkg/broker/filter/stats_reporter.go | 4 ++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index 43e66bd6d33..6f7a37ccc85 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -205,7 +205,7 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { audience := FilterAudience reportArgs := &ReportArgs{ - Ns: trigger.Namespace, + ns: trigger.Namespace, trigger: trigger.Name, broker: trigger.Spec.Broker, requestType: "filter", @@ -254,7 +254,7 @@ func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eve target := broker.Status.Address reportArgs := &ReportArgs{ - Ns: trigger.Namespace, + ns: trigger.Namespace, trigger: trigger.Name, broker: brokerRef, requestType: "reply_forward", @@ -305,7 +305,7 @@ func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *event } reportArgs := &ReportArgs{ - Ns: trigger.Namespace, + ns: trigger.Namespace, trigger: trigger.Name, broker: trigger.Spec.Broker, requestType: "dls_forward", @@ -352,7 +352,7 @@ func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger } reportArgs := &ReportArgs{ - Ns: trigger.Namespace, + ns: trigger.Namespace, trigger: trigger.Name, broker: brokerRef, filterType: triggerFilterAttribute(trigger.Spec.Filter, "type"), diff --git a/pkg/broker/filter/stats_reporter.go b/pkg/broker/filter/stats_reporter.go index 31f78cf3e0b..2352aae9410 100644 --- a/pkg/broker/filter/stats_reporter.go +++ b/pkg/broker/filter/stats_reporter.go @@ -89,7 +89,7 @@ var ( ) type ReportArgs struct { - Ns string + ns string trigger string broker string filterType string @@ -227,7 +227,7 @@ func (r *reporter) generateTag(args *ReportArgs, tags ...tag.Mutator) (context.C ctx := metricskey.WithResource(emptyContext, resource.Resource{ Type: eventingmetrics.ResourceTypeKnativeTrigger, Labels: map[string]string{ - eventingmetrics.LabelNamespaceName: args.Ns, + eventingmetrics.LabelNamespaceName: args.ns, eventingmetrics.LabelBrokerName: args.broker, eventingmetrics.LabelTriggerName: args.trigger, }, From 6fe5a929f02be567c6bb38719b7a2b83808a40b5 Mon Sep 17 00:00:00 2001 From: Leo Li Date: Fri, 21 Jun 2024 15:42:31 -0400 Subject: [PATCH 5/7] fix: revert the error change --- pkg/broker/filter/filter_handler_test.go | 4 ++-- pkg/broker/filter/stats_reporter_test.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/broker/filter/filter_handler_test.go b/pkg/broker/filter/filter_handler_test.go index 0ed399b3273..035a27115b5 100644 --- a/pkg/broker/filter/filter_handler_test.go +++ b/pkg/broker/filter/filter_handler_test.go @@ -440,7 +440,7 @@ func TestReceiver(t *testing.T) { logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) - oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx) + oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx, nil) for _, trig := range tc.triggers { // Replace the SubscriberURI to point at our fake server. @@ -645,7 +645,7 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) { logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) - oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx) + oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx, nil) // Replace the SubscriberURI to point at our fake server. for _, trig := range tc.triggers { diff --git a/pkg/broker/filter/stats_reporter_test.go b/pkg/broker/filter/stats_reporter_test.go index 67d514cf642..da1635dfe0f 100644 --- a/pkg/broker/filter/stats_reporter_test.go +++ b/pkg/broker/filter/stats_reporter_test.go @@ -31,7 +31,7 @@ import ( func TestStatsReporter(t *testing.T) { setup() args := &ReportArgs{ - Ns: "testns", + ns: "testns", trigger: "testtrigger", broker: "testbroker", filterType: "testeventtype", @@ -96,7 +96,7 @@ func TestReporterEmptySourceAndTypeFilter(t *testing.T) { setup() args := &ReportArgs{ - Ns: "testns", + ns: "testns", trigger: "testtrigger", broker: "testbroker", filterType: "", From 397488451767669e78f3ff24df27ef51169e573d Mon Sep 17 00:00:00 2001 From: Leo Li Date: Mon, 24 Jun 2024 16:29:42 -0400 Subject: [PATCH 6/7] feat: use the configuration-option function to pass in the statsReporter --- cmd/broker/filter/main.go | 6 ++- pkg/auth/token_verifier.go | 40 +++++++++++++++---- pkg/broker/filter/filter_handler.go | 49 +++++++++++++++++++++--- pkg/broker/filter/filter_handler_test.go | 2 +- pkg/broker/ingress/ingress_handler.go | 8 +++- pkg/channel/event_receiver.go | 9 ++++- 6 files changed, 96 insertions(+), 18 deletions(-) diff --git a/cmd/broker/filter/main.go b/cmd/broker/filter/main.go index ba0bf1dc3d0..bb1c2ac55f3 100644 --- a/cmd/broker/filter/main.go +++ b/cmd/broker/filter/main.go @@ -21,6 +21,8 @@ import ( "fmt" "log" + "knative.dev/eventing/pkg/broker/filter" + "github.com/google/uuid" "github.com/kelseyhightower/envconfig" "go.uber.org/zap" @@ -41,7 +43,6 @@ import ( "knative.dev/eventing/cmd/broker" "knative.dev/eventing/pkg/apis/feature" "knative.dev/eventing/pkg/auth" - "knative.dev/eventing/pkg/broker/filter" eventingclient "knative.dev/eventing/pkg/client/injection/client" brokerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker" triggerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger" @@ -147,11 +148,12 @@ func main() { } reporter := filter.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String())) + authReporter := filter.NewAuthStatsReporterAdapter(reporter) oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) // We are running both the receiver (takes messages in from the Broker) and the dispatcher (send // the messages to the triggers' subscribers) in this binary. - oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx, reporter) + oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx, auth.WithStatsReporter(authReporter)) trustBundleConfigMapInformer := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace()) handler, err = filter.NewHandler(logger, oidcTokenVerifier, oidcTokenProvider, triggerinformer.Get(ctx), brokerinformer.Get(ctx), reporter, trustBundleConfigMapInformer, ctxFunc) if err != nil { diff --git a/pkg/auth/token_verifier.go b/pkg/auth/token_verifier.go index c05a16ef2f5..9276a3d3356 100644 --- a/pkg/auth/token_verifier.go +++ b/pkg/auth/token_verifier.go @@ -29,7 +29,6 @@ import ( "go.uber.org/zap" "k8s.io/client-go/rest" "knative.dev/eventing/pkg/apis/feature" - "knative.dev/eventing/pkg/broker/filter" "knative.dev/pkg/injection" "knative.dev/pkg/logging" ) @@ -47,7 +46,7 @@ type OIDCTokenVerifier struct { logger *zap.SugaredLogger restConfig *rest.Config provider *oidc.Provider - statsReporter filter.StatsReporter + statsReporter AuthStatsReporter } type IDToken struct { @@ -59,17 +58,44 @@ type IDToken struct { AccessTokenHash string } -func NewOIDCTokenVerifier(ctx context.Context, statsReporter filter.StatsReporter) *OIDCTokenVerifier { +type AuthStatsReporter interface { + ReportUnauthenticatedRequest(args *ReportArgs) + ReportInvalidTokenRequest(args *ReportArgs) +} + +type ReportArgs struct { + Ns string + Trigger string + Broker string + Channel string + FilterType string + RequestType string + RequestScheme string +} + +// TokenVeridierOption enables further configuration of a TokenVerifier. +type TokenVerifierOption func(*OIDCTokenVerifier) + +func WithStatsReporter(reporter AuthStatsReporter) TokenVerifierOption { + return func(t *OIDCTokenVerifier) { + t.statsReporter = reporter + } +} + +func NewOIDCTokenVerifier(ctx context.Context, o ...TokenVerifierOption) *OIDCTokenVerifier { tokenHandler := &OIDCTokenVerifier{ - logger: logging.FromContext(ctx).With("component", "oidc-token-handler"), - restConfig: injection.GetConfig(ctx), - statsReporter: statsReporter, + logger: logging.FromContext(ctx).With("component", "oidc-token-handler"), + restConfig: injection.GetConfig(ctx), } if err := tokenHandler.initOIDCProvider(ctx); err != nil { tokenHandler.logger.Error(fmt.Sprintf("could not initialize provider. You can ignore this message, when the %s feature is disabled", feature.OIDCAuthentication), zap.Error(err)) } + for _, opt := range o { + opt(tokenHandler) + } + return tokenHandler } @@ -161,7 +187,7 @@ func (c *OIDCTokenVerifier) getKubernetesOIDCDiscovery() (*openIDMetadata, error } // VerifyJWTFromRequest will verify the incoming request contains the correct JWT token -func (tokenVerifier *OIDCTokenVerifier) VerifyJWTFromRequest(ctx context.Context, r *http.Request, audience *string, response http.ResponseWriter, reportArgs *filter.ReportArgs) error { +func (tokenVerifier *OIDCTokenVerifier) VerifyJWTFromRequest(ctx context.Context, r *http.Request, audience *string, response http.ResponseWriter, reportArgs *ReportArgs) error { token := GetJWTFromHeader(r.Header) if token == "" { diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index 6f7a37ccc85..6bd49a117e7 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -26,6 +26,8 @@ import ( "net/http" "time" + "knative.dev/eventing/pkg/auth" + opencensusclient "github.com/cloudevents/sdk-go/observability/opencensus/v2/client" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/cloudevents/sdk-go/v2/binding" @@ -40,7 +42,6 @@ import ( "knative.dev/pkg/logging" "knative.dev/eventing/pkg/apis" - "knative.dev/eventing/pkg/auth" "knative.dev/eventing/pkg/eventingtls" "knative.dev/eventing/pkg/utils" @@ -152,6 +153,42 @@ func NewHandler(logger *zap.Logger, tokenVerifier *auth.OIDCTokenVerifier, oidcT }, nil } +type authStatsReporterAdapter struct { + reporter StatsReporter +} + +func (a *authStatsReporterAdapter) ReportUnauthenticatedRequest(args *auth.ReportArgs) { + err := a.reporter.ReportUnauthenticatedRequest(&ReportArgs{ + ns: args.Ns, + trigger: args.Trigger, + broker: args.Broker, + filterType: args.FilterType, + requestType: args.RequestType, + requestScheme: args.RequestScheme, + }) + if err != nil { + return + } +} + +func (a *authStatsReporterAdapter) ReportInvalidTokenRequest(args *auth.ReportArgs) { + err := a.reporter.ReportInvalidTokenRequest(&ReportArgs{ + ns: args.Ns, + trigger: args.Trigger, + broker: args.Broker, + filterType: args.FilterType, + requestType: args.RequestType, + requestScheme: args.RequestScheme, + }) + if err != nil { + return + } +} + +func NewAuthStatsReporterAdapter(reporter StatsReporter) auth.AuthStatsReporter { + return &authStatsReporterAdapter{reporter: reporter} +} + func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { ctx := h.withContext(request.Context()) @@ -204,11 +241,11 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { audience := FilterAudience - reportArgs := &ReportArgs{ - ns: trigger.Namespace, - trigger: trigger.Name, - broker: trigger.Spec.Broker, - requestType: "filter", + reportArgs := &auth.ReportArgs{ + Ns: trigger.Namespace, + Trigger: trigger.Name, + Broker: trigger.Spec.Broker, + RequestType: "filter", } err = h.tokenVerifier.VerifyJWTFromRequest(ctx, request, &audience, writer, reportArgs) diff --git a/pkg/broker/filter/filter_handler_test.go b/pkg/broker/filter/filter_handler_test.go index 035a27115b5..d1bdf37f6f0 100644 --- a/pkg/broker/filter/filter_handler_test.go +++ b/pkg/broker/filter/filter_handler_test.go @@ -440,7 +440,7 @@ func TestReceiver(t *testing.T) { logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) - oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx, nil) + oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx) for _, trig := range tc.triggers { // Replace the SubscriberURI to point at our fake server. diff --git a/pkg/broker/ingress/ingress_handler.go b/pkg/broker/ingress/ingress_handler.go index 6219cb92537..df4da12f574 100644 --- a/pkg/broker/ingress/ingress_handler.go +++ b/pkg/broker/ingress/ingress_handler.go @@ -234,7 +234,13 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { if features.IsOIDCAuthentication() { h.Logger.Debug("OIDC authentication is enabled") - err = h.tokenVerifier.VerifyJWTFromRequest(ctx, request, broker.Status.Address.Audience, writer) + reportArgs := &auth.ReportArgs{ + Ns: broker.Namespace, + Broker: broker.Name, + RequestType: "broker_ingress", + } + + err = h.tokenVerifier.VerifyJWTFromRequest(ctx, request, broker.Status.Address.Audience, writer, reportArgs) if err != nil { h.Logger.Warn("Error when validating the JWT token in the request", zap.Error(err)) return diff --git a/pkg/channel/event_receiver.go b/pkg/channel/event_receiver.go index ffb441cbcce..a670edac94b 100644 --- a/pkg/channel/event_receiver.go +++ b/pkg/channel/event_receiver.go @@ -256,7 +256,14 @@ func (r *EventReceiver) ServeHTTP(response nethttp.ResponseWriter, request *neth features := feature.FromContext(ctx) if features.IsOIDCAuthentication() { r.logger.Debug("OIDC authentication is enabled") - err = r.tokenVerifier.VerifyJWTFromRequest(ctx, request, &r.audience, response) + + reportArgs := &auth.ReportArgs{ + Ns: channel.Namespace, + Channel: channel.Name, + RequestType: "channel", + } + + err = r.tokenVerifier.VerifyJWTFromRequest(ctx, request, &r.audience, response, reportArgs) if err != nil { r.logger.Warn("Error when validating the JWT token in the request", zap.Error(err)) return From 82f8327d822759d416acabc587f9db24b90162bd Mon Sep 17 00:00:00 2001 From: Leo Li Date: Mon, 24 Jun 2024 16:48:46 -0400 Subject: [PATCH 7/7] feat: nit fix --- cmd/jobsink/main.go | 2 +- pkg/broker/filter/filter_handler_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/jobsink/main.go b/cmd/jobsink/main.go index dbefbc94c91..74ab44b8a5d 100644 --- a/cmd/jobsink/main.go +++ b/cmd/jobsink/main.go @@ -117,7 +117,7 @@ func main() { k8s: kubeclient.Get(ctx), lister: jobsink.Get(ctx).Lister(), withContext: ctxFunc, - oidcTokenVerifier: auth.NewOIDCTokenVerifier(ctx, nil), + oidcTokenVerifier: auth.NewOIDCTokenVerifier(ctx), } tlsConfig, err := getServerTLSConfig(ctx) diff --git a/pkg/broker/filter/filter_handler_test.go b/pkg/broker/filter/filter_handler_test.go index d1bdf37f6f0..0ed399b3273 100644 --- a/pkg/broker/filter/filter_handler_test.go +++ b/pkg/broker/filter/filter_handler_test.go @@ -645,7 +645,7 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) { logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) - oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx, nil) + oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx) // Replace the SubscriberURI to point at our fake server. for _, trig := range tc.triggers {