Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add metrics for OIDC #8015

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
6 changes: 6 additions & 0 deletions pkg/auth/token_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package auth
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -36,6 +37,11 @@ const (
kubernetesOIDCDiscoveryBaseURL = "https://kubernetes.default.svc"
)

var (
ErrNoJWTTokenFound = errors.New("no JWT token found in request")
ErrInvalidJWTToken = errors.New("invalid JWT token")
)

type OIDCTokenVerifier struct {
logger *zap.SugaredLogger
restConfig *rest.Config
Expand Down
20 changes: 20 additions & 0 deletions pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,26 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
err = h.tokenVerifier.VerifyJWTFromRequest(ctx, request, &audience, writer)
if err != nil {
h.logger.Warn("Error when validating the JWT token in the request", zap.Error(err))

// If the error is due to the absence of a JWT token, report it as an unauthenticated request.
// The error message is "no JWT token found in request"
if errors.Is(err, auth.ErrNoJWTTokenFound) {
_ = h.reporter.ReportUnauthenticatedRequest(&ReportArgs{
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you think the way here to distinguish these 2 different scenario? Are there any better approach you would recommend? @creydr

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can work. You could also think about passing the reporter to the tokenVerifier, so this reports the metrics on each request and not only do this in the caller 🤔

ns: trigger.Namespace,
trigger: trigger.Name,
broker: trigger.Spec.Broker,
requestType: "filter",
})
} else {
// If the error is due to an invalid JWT token, report it as an invalid token request.
_ = h.reporter.ReportInvalidTokenRequest(&ReportArgs{
ns: trigger.Namespace,
trigger: trigger.Name,
broker: trigger.Spec.Broker,
requestType: "filter",
})
}

return
}

Expand Down
56 changes: 44 additions & 12 deletions pkg/broker/filter/filter_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,17 @@ func TestReceiver(t *testing.T) {
additionalReplyHeaders http.Header

// expectations
expectedResponseEvent *cloudevents.Event
expectedResponse *http.Response
expectedDispatch bool
expectedStatus int
expectedHeaders http.Header
expectedEventCount bool
expectedEventDispatchTime bool
expectedEventProcessingTime bool
expectedResponseHeaders http.Header
expectedResponseEvent *cloudevents.Event
expectedResponse *http.Response
expectedDispatch bool
expectedStatus int
expectedHeaders http.Header
expectedEventCount bool
expectedUnauthenticatedCount bool
expectedInvalidTokenCount bool
expectedEventDispatchTime bool
expectedEventProcessingTime bool
expectedResponseHeaders http.Header
}{
"Not POST": {
request: httptest.NewRequest(http.MethodGet, validPath, nil),
Expand Down Expand Up @@ -416,6 +418,18 @@ func TestReceiver(t *testing.T) {
additionalReplyHeaders: http.Header{"Retry-After": []string{"10"}, "Test-Header": []string{"TestValue"}},
expectedResponseHeaders: http.Header{"Retry-After": []string{"10"}},
},
"Request with a invalid token": {
Leo6Leo marked this conversation as resolved.
Show resolved Hide resolved
triggers: []*eventingv1.Trigger{
makeTrigger(withAttributesFilter(&eventingv1.TriggerFilter{})),
},
request: httptest.NewRequest(http.MethodPost, validPath, nil),
requestFails: true,
failureStatus: http.StatusUnauthorized,
expectedDispatch: false,
expectedStatus: http.StatusUnauthorized,
expectedEventCount: false,
expectedUnauthenticatedCount: true,
},
}
for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
Expand Down Expand Up @@ -517,6 +531,12 @@ func TestReceiver(t *testing.T) {
if tc.expectedEventCount != reporter.eventCountReported {
t.Errorf("Incorrect event count reported metric. Expected %v, Actual %v", tc.expectedEventCount, reporter.eventCountReported)
}
if tc.expectedUnauthenticatedCount != reporter.unauthenticatedCountReported {
t.Errorf("Incorrect unauthenticated count reported metric. Expected %v, Actual %v", tc.expectedUnauthenticatedCount, reporter.unauthenticatedCountReported)
}
if tc.expectedInvalidTokenCount != reporter.invalidTokenCountReported {
t.Errorf("Incorrect invalid token count reported metric. Expected %v, Actual %v", tc.expectedInvalidTokenCount, reporter.invalidTokenCountReported)
}
if tc.expectedEventDispatchTime != reporter.eventDispatchTimeReported {
t.Errorf("Incorrect event dispatch time reported metric. Expected %v, Actual %v", tc.expectedEventDispatchTime, reporter.eventDispatchTimeReported)
}
Expand Down Expand Up @@ -742,16 +762,28 @@ func (r *responseWriterWithInvocationsCheck) WriteHeader(statusCode int) {
}

type mockReporter struct {
eventCountReported bool
eventDispatchTimeReported bool
eventProcessingTimeReported bool
eventCountReported bool
unauthenticatedCountReported bool
invalidTokenCountReported bool
eventDispatchTimeReported bool
eventProcessingTimeReported bool
}

func (r *mockReporter) ReportEventCount(args *ReportArgs, responseCode int) error {
r.eventCountReported = true
return nil
}

func (r *mockReporter) ReportUnauthenticatedRequest(args *ReportArgs) error {
r.unauthenticatedCountReported = true
return nil
}

func (r *mockReporter) ReportInvalidTokenRequest(args *ReportArgs) error {
r.invalidTokenCountReported = true
return nil
}

func (r *mockReporter) ReportEventDispatchTime(args *ReportArgs, responseCode int, d time.Duration) error {
r.eventDispatchTimeReported = true
return nil
Expand Down
48 changes: 48 additions & 0 deletions pkg/broker/filter/stats_reporter.go
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this is not only relevant to broker-filters anymore, should we move this StatsReporter to a dedicated package and make it more generic? E.g. pkg/metrics?
(This could also help to remove the import cycle.)

Then you could also change the ReportArgs to an interface (which has the method generateTag()) to have component specific args. E.g. from a Trigger (which would to the same as the current reporter.generateTag(), or then for a Channel, etc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So for example:

type ReportArgs interface { //TODO: find a better name
    generateTag(tags ...tag.Mutator) (context.Context, error)
}

and then for example for the broker/trigger:

type BrokerArgs struct {
	ns            string
	trigger       string
	broker        string
	filterType    string
	requestType   string
	requestScheme string
}

func (args *BrokerArgs) generateTag(tags ...tag.Mutator) (context.Context, error) {
	ctx := metricskey.WithResource(emptyContext, resource.Resource{
		Type: eventingmetrics.ResourceTypeKnativeTrigger,
		Labels: map[string]string{
			eventingmetrics.LabelNamespaceName: args.ns,
			eventingmetrics.LabelBrokerName:    args.broker,
			eventingmetrics.LabelTriggerName:   args.trigger,
		},
	})
	// Note that filterType and filterSource can be empty strings, so they need a special treatment.
	ctx, err := tag.New(
		ctx,
		append(tags,
			tag.Insert(triggerFilterTypeKey, valueOrAny(args.filterType)),
			tag.Insert(triggerFilterRequestTypeKey, args.requestType),
			tag.Insert(triggerFilterRequestSchemeKey, args.requestScheme),
		)...)
	return ctx, err
}

And then we call this e.g. in ReportEventCount():

func (r *reporter) ReportEventCount(args ReportArgs, responseCode int) error {
	ctx, err := args.generateTag(
		tag.Insert(responseCodeKey, strconv.Itoa(responseCode)),
		tag.Insert(responseCodeClassKey, metrics.ResponseCodeClass(responseCode)))
	if err != nil {
		return err
	}
	metrics.Record(ctx, eventCountM.M(1))
	return nil
}

and use it

reportArgs := &BrokerArgs{
	Ns:          broker.Namespace,
	Broker:      broker.Name,
	RequestType: "broker_ingress",
}

err = h.tokenVerifier.VerifyJWTFromRequest(ctx, request, broker.Status.Address.Audience, writer, reportArgs)

Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,20 @@ var (
stats.UnitDimensionless,
)

// unauthenticatedRequestsM records the number of unauthenticated requests. (Without the JWT token in the header)
unauthenticatedRequestsM = stats.Int64(
"unauthenticated_requests",
"Number of unauthenticated requests (No JWT token found in the header)",
stats.UnitDimensionless,
)

// invalidTokenRequestsM records the number of requests with invalid tokens.
invalidTokenRequestsM = stats.Int64(
"invalid_token_requests",
"Number of requests with invalid tokens",
stats.UnitDimensionless,
)

// dispatchTimeInMsecM records the time spent dispatching an event to
// a Trigger subscriber, in milliseconds.
dispatchTimeInMsecM = stats.Float64(
Expand Down Expand Up @@ -90,6 +104,8 @@ func init() {
// StatsReporter defines the interface for sending filter metrics.
type StatsReporter interface {
ReportEventCount(args *ReportArgs, responseCode int) error
ReportUnauthenticatedRequest(args *ReportArgs) error
ReportInvalidTokenRequest(args *ReportArgs) error
ReportEventDispatchTime(args *ReportArgs, responseCode int, d time.Duration) error
ReportEventProcessingTime(args *ReportArgs, d time.Duration) error
}
Expand Down Expand Up @@ -120,6 +136,18 @@ func register() {
Aggregation: view.Count(),
TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey},
},
&view.View{
Description: unauthenticatedRequestsM.Description(),
Measure: unauthenticatedRequestsM,
Aggregation: view.Count(),
TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, broker.UniqueTagKey, broker.ContainerTagKey},
},
&view.View{
Description: invalidTokenRequestsM.Description(),
Measure: invalidTokenRequestsM,
Aggregation: view.Count(),
TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, broker.UniqueTagKey, broker.ContainerTagKey},
},
&view.View{
Description: dispatchTimeInMsecM.Description(),
Measure: dispatchTimeInMsecM,
Expand Down Expand Up @@ -150,6 +178,26 @@ func (r *reporter) ReportEventCount(args *ReportArgs, responseCode int) error {
return nil
}

// ReportUnauthenticatedRequest captures unauthenticated requests. (The requests that do not have JWT token in the header)
func (r *reporter) ReportUnauthenticatedRequest(args *ReportArgs) error {
ctx, err := r.generateTag(args)
if err != nil {
return err
}
metrics.Record(ctx, unauthenticatedRequestsM.M(1))
return nil
}

// ReportInvalidTokenRequest captures requests with invalid tokens.
func (r *reporter) ReportInvalidTokenRequest(args *ReportArgs) error {
ctx, err := r.generateTag(args)
if err != nil {
return err
}
metrics.Record(ctx, invalidTokenRequestsM.M(1))
return nil
}

// ReportEventDispatchTime captures dispatch times.
func (r *reporter) ReportEventDispatchTime(args *ReportArgs, responseCode int, d time.Duration) error {
ctx, err := r.generateTag(args,
Expand Down
Loading