Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add metrics for OIDC #8015

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion cmd/broker/filter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,9 @@
oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)
// We are running both the receiver (takes messages in from the Broker) and the dispatcher (send
// the messages to the triggers' subscribers) in this binary.
oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx)
oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx, reporter)
trustBundleConfigMapInformer := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace())
handler, err = filter.NewHandler(logger, oidcTokenVerifier, oidcTokenProvider, triggerinformer.Get(ctx), brokerinformer.Get(ctx), reporter, trustBundleConfigMapInformer, ctxFunc)

Check failure on line 156 in cmd/broker/filter/main.go

View workflow job for this annotation

GitHub Actions / analyze / Go vulnerability Detection

cannot use oidcTokenVerifier (variable of type *"knative.dev/eventing/pkg/auth".OIDCTokenVerifier) as *invalid type value in argument to filter.NewHandler
if err != nil {
logger.Fatal("Error creating Handler", zap.Error(err))
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/jobsink/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func main() {
k8s: kubeclient.Get(ctx),
lister: jobsink.Get(ctx).Lister(),
withContext: ctxFunc,
oidcTokenVerifier: auth.NewOIDCTokenVerifier(ctx),
oidcTokenVerifier: auth.NewOIDCTokenVerifier(ctx, nil),
}

tlsConfig, err := getServerTLSConfig(ctx)
Expand Down Expand Up @@ -201,7 +201,7 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {

audience := auth.GetAudienceDirect(sinksv.SchemeGroupVersion.WithKind("JobSink"), ref.Namespace, ref.Name)

err := h.oidcTokenVerifier.VerifyJWTFromRequest(ctx, r, &audience, w)
err := h.oidcTokenVerifier.VerifyJWTFromRequest(ctx, r, &audience, w, nil)
if err != nil {
logger.Warn("Error when validating the JWT token in the request", zap.Error(err))
return
Expand Down Expand Up @@ -382,7 +382,7 @@ func (h *Handler) handleGet(ctx context.Context, w http.ResponseWriter, r *http.

audience := auth.GetAudienceDirect(sinksv.SchemeGroupVersion.WithKind("JobSink"), ref.Namespace, ref.Name)

err := h.oidcTokenVerifier.VerifyJWTFromRequest(ctx, r, &audience, w)
err := h.oidcTokenVerifier.VerifyJWTFromRequest(ctx, r, &audience, w, nil)
if err != nil {
logger.Warn("Error when validating the JWT token in the request", zap.Error(err))
return
Expand Down
33 changes: 26 additions & 7 deletions pkg/auth/token_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package auth
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -28,6 +29,7 @@ import (
"go.uber.org/zap"
"k8s.io/client-go/rest"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/eventing/pkg/broker/filter"
"knative.dev/pkg/injection"
"knative.dev/pkg/logging"
)
Expand All @@ -36,10 +38,16 @@ const (
kubernetesOIDCDiscoveryBaseURL = "https://kubernetes.default.svc"
)

var (
ErrNoJWTTokenFound = errors.New("no JWT token found in request")
ErrInvalidJWTToken = errors.New("invalid JWT token")
)

type OIDCTokenVerifier struct {
logger *zap.SugaredLogger
restConfig *rest.Config
provider *oidc.Provider
logger *zap.SugaredLogger
restConfig *rest.Config
provider *oidc.Provider
statsReporter filter.StatsReporter
}

type IDToken struct {
Expand All @@ -51,10 +59,11 @@ type IDToken struct {
AccessTokenHash string
}

func NewOIDCTokenVerifier(ctx context.Context) *OIDCTokenVerifier {
func NewOIDCTokenVerifier(ctx context.Context, statsReporter filter.StatsReporter) *OIDCTokenVerifier {
tokenHandler := &OIDCTokenVerifier{
logger: logging.FromContext(ctx).With("component", "oidc-token-handler"),
restConfig: injection.GetConfig(ctx),
logger: logging.FromContext(ctx).With("component", "oidc-token-handler"),
restConfig: injection.GetConfig(ctx),
statsReporter: statsReporter,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this looks good? @creydr By passing the statsReporter to the verifier, and pass the reportArgs when verifying happens.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make this an optional argument (e.g. by using a configuration-option function as we do for example in

func NewBroker(name, namespace string, o ...BrokerOption) *v1.Broker {
b := &v1.Broker{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
Name: name,
},
}
for _, opt := range o {
opt(b)
}
with type BrokerOption func(*v1.Broker))? This allows us to add options, without the need to update the packages immediately which use the NewOIDCTokenVerifier.

}

if err := tokenHandler.initOIDCProvider(ctx); err != nil {
Expand Down Expand Up @@ -152,10 +161,16 @@ func (c *OIDCTokenVerifier) getKubernetesOIDCDiscovery() (*openIDMetadata, error
}

// VerifyJWTFromRequest will verify the incoming request contains the correct JWT token
func (tokenVerifier *OIDCTokenVerifier) VerifyJWTFromRequest(ctx context.Context, r *http.Request, audience *string, response http.ResponseWriter) error {
func (tokenVerifier *OIDCTokenVerifier) VerifyJWTFromRequest(ctx context.Context, r *http.Request, audience *string, response http.ResponseWriter, reportArgs *filter.ReportArgs) error {
token := GetJWTFromHeader(r.Header)

if token == "" {
response.WriteHeader(http.StatusUnauthorized)

if tokenVerifier.statsReporter != nil {
tokenVerifier.statsReporter.ReportUnauthenticatedRequest(reportArgs)
}

return fmt.Errorf("no JWT token found in request")
}

Expand All @@ -166,6 +181,10 @@ func (tokenVerifier *OIDCTokenVerifier) VerifyJWTFromRequest(ctx context.Context

if _, err := tokenVerifier.VerifyJWT(ctx, token, *audience); err != nil {
response.WriteHeader(http.StatusUnauthorized)

if tokenVerifier.statsReporter != nil {
tokenVerifier.statsReporter.ReportInvalidTokenRequest(reportArgs)
}
return fmt.Errorf("failed to verify JWT: %w", err)
}

Expand Down
10 changes: 9 additions & 1 deletion pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
"knative.dev/pkg/logging"

"knative.dev/eventing/pkg/apis"
"knative.dev/eventing/pkg/auth"

Check failure on line 43 in pkg/broker/filter/filter_handler.go

View workflow job for this annotation

GitHub Actions / analyze / Go vulnerability Detection

could not import knative.dev/eventing/pkg/auth (import cycle: [knative.dev/eventing/pkg/kncloudevents knative.dev/eventing/pkg/auth knative.dev/eventing/pkg/broker/filter])
"knative.dev/eventing/pkg/eventingtls"
"knative.dev/eventing/pkg/utils"

Expand All @@ -53,7 +53,7 @@
"knative.dev/eventing/pkg/eventfilter/attributes"
"knative.dev/eventing/pkg/eventfilter/subscriptionsapi"
"knative.dev/eventing/pkg/eventtype"
"knative.dev/eventing/pkg/kncloudevents"

Check failure on line 56 in pkg/broker/filter/filter_handler.go

View workflow job for this annotation

GitHub Actions / analyze / Go vulnerability Detection

could not import knative.dev/eventing/pkg/kncloudevents (import cycle: [knative.dev/eventing/pkg/kncloudevents knative.dev/eventing/pkg/auth knative.dev/eventing/pkg/broker/filter])
"knative.dev/eventing/pkg/reconciler/sugar/trigger/path"
"knative.dev/eventing/pkg/tracing"
)
Expand Down Expand Up @@ -204,7 +204,15 @@

audience := FilterAudience

err = h.tokenVerifier.VerifyJWTFromRequest(ctx, request, &audience, writer)
reportArgs := &ReportArgs{
ns: trigger.Namespace,
trigger: trigger.Name,
broker: trigger.Spec.Broker,
requestType: "filter",
}

err = h.tokenVerifier.VerifyJWTFromRequest(ctx, request, &audience, writer, reportArgs)

if err != nil {
h.logger.Warn("Error when validating the JWT token in the request", zap.Error(err))
return
Expand Down
48 changes: 34 additions & 14 deletions pkg/broker/filter/filter_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,17 @@ func TestReceiver(t *testing.T) {
additionalReplyHeaders http.Header

// expectations
expectedResponseEvent *cloudevents.Event
expectedResponse *http.Response
expectedDispatch bool
expectedStatus int
expectedHeaders http.Header
expectedEventCount bool
expectedEventDispatchTime bool
expectedEventProcessingTime bool
expectedResponseHeaders http.Header
expectedResponseEvent *cloudevents.Event
expectedResponse *http.Response
expectedDispatch bool
expectedStatus int
expectedHeaders http.Header
expectedEventCount bool
expectedUnauthenticatedCount bool
expectedInvalidTokenCount bool
expectedEventDispatchTime bool
expectedEventProcessingTime bool
expectedResponseHeaders http.Header
}{
"Not POST": {
request: httptest.NewRequest(http.MethodGet, validPath, nil),
Expand Down Expand Up @@ -438,7 +440,7 @@ func TestReceiver(t *testing.T) {

logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)
oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx)
oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx, nil)

for _, trig := range tc.triggers {
// Replace the SubscriberURI to point at our fake server.
Expand Down Expand Up @@ -517,6 +519,12 @@ func TestReceiver(t *testing.T) {
if tc.expectedEventCount != reporter.eventCountReported {
t.Errorf("Incorrect event count reported metric. Expected %v, Actual %v", tc.expectedEventCount, reporter.eventCountReported)
}
if tc.expectedUnauthenticatedCount != reporter.unauthenticatedCountReported {
t.Errorf("Incorrect unauthenticated count reported metric. Expected %v, Actual %v", tc.expectedUnauthenticatedCount, reporter.unauthenticatedCountReported)
}
if tc.expectedInvalidTokenCount != reporter.invalidTokenCountReported {
t.Errorf("Incorrect invalid token count reported metric. Expected %v, Actual %v", tc.expectedInvalidTokenCount, reporter.invalidTokenCountReported)
}
if tc.expectedEventDispatchTime != reporter.eventDispatchTimeReported {
t.Errorf("Incorrect event dispatch time reported metric. Expected %v, Actual %v", tc.expectedEventDispatchTime, reporter.eventDispatchTimeReported)
}
Expand Down Expand Up @@ -637,7 +645,7 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) {

logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller()))
oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)
oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx)
oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx, nil)

// Replace the SubscriberURI to point at our fake server.
for _, trig := range tc.triggers {
Expand Down Expand Up @@ -742,16 +750,28 @@ func (r *responseWriterWithInvocationsCheck) WriteHeader(statusCode int) {
}

type mockReporter struct {
eventCountReported bool
eventDispatchTimeReported bool
eventProcessingTimeReported bool
eventCountReported bool
unauthenticatedCountReported bool
invalidTokenCountReported bool
eventDispatchTimeReported bool
eventProcessingTimeReported bool
}

func (r *mockReporter) ReportEventCount(args *ReportArgs, responseCode int) error {
r.eventCountReported = true
return nil
}

func (r *mockReporter) ReportUnauthenticatedRequest(args *ReportArgs) error {
r.unauthenticatedCountReported = true
return nil
}

func (r *mockReporter) ReportInvalidTokenRequest(args *ReportArgs) error {
r.invalidTokenCountReported = true
return nil
}

func (r *mockReporter) ReportEventDispatchTime(args *ReportArgs, responseCode int, d time.Duration) error {
r.eventDispatchTimeReported = true
return nil
Expand Down
48 changes: 48 additions & 0 deletions pkg/broker/filter/stats_reporter.go
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As this is not only relevant to broker-filters anymore, should we move this StatsReporter to a dedicated package and make it more generic? E.g. pkg/metrics?
(This could also help to remove the import cycle.)

Then you could also change the ReportArgs to an interface (which has the method generateTag()) to have component specific args. E.g. from a Trigger (which would to the same as the current reporter.generateTag(), or then for a Channel, etc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So for example:

type ReportArgs interface { //TODO: find a better name
    generateTag(tags ...tag.Mutator) (context.Context, error)
}

and then for example for the broker/trigger:

type BrokerArgs struct {
	ns            string
	trigger       string
	broker        string
	filterType    string
	requestType   string
	requestScheme string
}

func (args *BrokerArgs) generateTag(tags ...tag.Mutator) (context.Context, error) {
	ctx := metricskey.WithResource(emptyContext, resource.Resource{
		Type: eventingmetrics.ResourceTypeKnativeTrigger,
		Labels: map[string]string{
			eventingmetrics.LabelNamespaceName: args.ns,
			eventingmetrics.LabelBrokerName:    args.broker,
			eventingmetrics.LabelTriggerName:   args.trigger,
		},
	})
	// Note that filterType and filterSource can be empty strings, so they need a special treatment.
	ctx, err := tag.New(
		ctx,
		append(tags,
			tag.Insert(triggerFilterTypeKey, valueOrAny(args.filterType)),
			tag.Insert(triggerFilterRequestTypeKey, args.requestType),
			tag.Insert(triggerFilterRequestSchemeKey, args.requestScheme),
		)...)
	return ctx, err
}

And then we call this e.g. in ReportEventCount():

func (r *reporter) ReportEventCount(args ReportArgs, responseCode int) error {
	ctx, err := args.generateTag(
		tag.Insert(responseCodeKey, strconv.Itoa(responseCode)),
		tag.Insert(responseCodeClassKey, metrics.ResponseCodeClass(responseCode)))
	if err != nil {
		return err
	}
	metrics.Record(ctx, eventCountM.M(1))
	return nil
}

and use it

reportArgs := &BrokerArgs{
	Ns:          broker.Namespace,
	Broker:      broker.Name,
	RequestType: "broker_ingress",
}

err = h.tokenVerifier.VerifyJWTFromRequest(ctx, request, broker.Status.Address.Audience, writer, reportArgs)

Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,20 @@ var (
stats.UnitDimensionless,
)

// unauthenticatedRequestsM records the number of unauthenticated requests. (Without the JWT token in the header)
unauthenticatedRequestsM = stats.Int64(
"unauthenticated_requests",
"Number of unauthenticated requests (No JWT token found in the header)",
stats.UnitDimensionless,
)

// invalidTokenRequestsM records the number of requests with invalid tokens.
invalidTokenRequestsM = stats.Int64(
"invalid_token_requests",
"Number of requests with invalid tokens",
stats.UnitDimensionless,
)

// dispatchTimeInMsecM records the time spent dispatching an event to
// a Trigger subscriber, in milliseconds.
dispatchTimeInMsecM = stats.Float64(
Expand Down Expand Up @@ -90,6 +104,8 @@ func init() {
// StatsReporter defines the interface for sending filter metrics.
type StatsReporter interface {
ReportEventCount(args *ReportArgs, responseCode int) error
ReportUnauthenticatedRequest(args *ReportArgs) error
ReportInvalidTokenRequest(args *ReportArgs) error
ReportEventDispatchTime(args *ReportArgs, responseCode int, d time.Duration) error
ReportEventProcessingTime(args *ReportArgs, d time.Duration) error
}
Expand Down Expand Up @@ -120,6 +136,18 @@ func register() {
Aggregation: view.Count(),
TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, responseCodeKey, responseCodeClassKey, broker.UniqueTagKey, broker.ContainerTagKey},
},
&view.View{
Description: unauthenticatedRequestsM.Description(),
Measure: unauthenticatedRequestsM,
Aggregation: view.Count(),
TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, broker.UniqueTagKey, broker.ContainerTagKey},
},
&view.View{
Description: invalidTokenRequestsM.Description(),
Measure: invalidTokenRequestsM,
Aggregation: view.Count(),
TagKeys: []tag.Key{triggerFilterTypeKey, triggerFilterRequestTypeKey, triggerFilterRequestSchemeKey, broker.UniqueTagKey, broker.ContainerTagKey},
},
&view.View{
Description: dispatchTimeInMsecM.Description(),
Measure: dispatchTimeInMsecM,
Expand Down Expand Up @@ -150,6 +178,26 @@ func (r *reporter) ReportEventCount(args *ReportArgs, responseCode int) error {
return nil
}

// ReportUnauthenticatedRequest captures unauthenticated requests. (The requests that do not have JWT token in the header)
func (r *reporter) ReportUnauthenticatedRequest(args *ReportArgs) error {
ctx, err := r.generateTag(args)
if err != nil {
return err
}
metrics.Record(ctx, unauthenticatedRequestsM.M(1))
return nil
}

// ReportInvalidTokenRequest captures requests with invalid tokens.
func (r *reporter) ReportInvalidTokenRequest(args *ReportArgs) error {
ctx, err := r.generateTag(args)
if err != nil {
return err
}
metrics.Record(ctx, invalidTokenRequestsM.M(1))
return nil
}

// ReportEventDispatchTime captures dispatch times.
func (r *reporter) ReportEventDispatchTime(args *ReportArgs, responseCode int, d time.Duration) error {
ctx, err := r.generateTag(args,
Expand Down
Loading