Skip to content

Commit

Permalink
Support authorization in Channel ingress (#8162)
Browse files Browse the repository at this point in the history
* Add AuthZ check in channel ingress

* Add e2e test
  • Loading branch information
creydr authored Aug 16, 2024
1 parent b57ac3a commit bf945f9
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 5 deletions.
33 changes: 30 additions & 3 deletions pkg/channel/event_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
nethttp "net/http"
"time"

duckv1 "knative.dev/eventing/pkg/apis/duck/v1"

"knative.dev/eventing/pkg/apis/feature"

"knative.dev/eventing/pkg/auth"
Expand Down Expand Up @@ -71,6 +73,7 @@ type EventReceiver struct {
reporter StatsReporter
tokenVerifier *auth.OIDCTokenVerifier
audience string
getPoliciesForFunc GetPoliciesForFunc
withContext func(context.Context) context.Context
}

Expand Down Expand Up @@ -107,6 +110,16 @@ func ResolveChannelFromPath(PathToChannelFunc ResolveChannelFromPathFunc) EventR
}
}

// GetPoliciesForFunc function enables the EventReceiver to get the Channels AppliedEventPoliciesStatus
type GetPoliciesForFunc func(channel ChannelReference) ([]duckv1.AppliedEventPolicyRef, error)

func ReceiverWithGetPoliciesForFunc(fn GetPoliciesForFunc) EventReceiverOptions {
return func(r *EventReceiver) error {
r.getPoliciesForFunc = fn
return nil
}
}

func OIDCTokenVerification(tokenVerifier *auth.OIDCTokenVerifier, audience string) EventReceiverOptions {
return func(r *EventReceiver) error {
r.tokenVerifier = tokenVerifier
Expand Down Expand Up @@ -256,12 +269,26 @@ func (r *EventReceiver) ServeHTTP(response nethttp.ResponseWriter, request *neth
features := feature.FromContext(ctx)
if features.IsOIDCAuthentication() {
r.logger.Debug("OIDC authentication is enabled")
err = r.tokenVerifier.VerifyJWTFromRequest(ctx, request, &r.audience, response)

if r.getPoliciesForFunc == nil {
r.logger.Error("getPoliciesForFunc() callback not set. Can't get applying event policies of channel")
response.WriteHeader(nethttp.StatusInternalServerError)
return
}

applyingEventPolicies, err := r.getPoliciesForFunc(channel)
if err != nil {
r.logger.Error("could not get applying event policies of channel", zap.Error(err), zap.String("channel", channel.String()))
response.WriteHeader(nethttp.StatusInternalServerError)
return
}

err = r.tokenVerifier.VerifyRequest(ctx, features, &r.audience, channel.Namespace, applyingEventPolicies, request, response)
if err != nil {
r.logger.Warn("Error when validating the JWT token in the request", zap.Error(err))
r.logger.Warn("could not verify authn and authz of request", zap.Error(err))
return
}
r.logger.Debug("Request contained a valid JWT. Continuing...")
r.logger.Debug("Request contained a valid and authorized JWT. Continuing...")
}

err = r.receiverFunc(request.Context(), channel, *event, utils.PassThroughHeaders(request.Header))
Expand Down
1 change: 1 addition & 0 deletions pkg/reconciler/inmemorychannel/dispatcher/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func NewController(
eventDispatcher: kncloudevents.NewDispatcher(clientConfig, oidcTokenProvider),
tokenVerifier: auth.NewOIDCTokenVerifier(ctx),
clientConfig: clientConfig,
inMemoryChannelLister: inmemorychannelInformer.Lister(),
}

var globalResync func(obj interface{})
Expand Down
18 changes: 16 additions & 2 deletions pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"fmt"

listers "knative.dev/eventing/pkg/client/listers/messaging/v1"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"go.uber.org/zap"
Expand Down Expand Up @@ -55,12 +57,13 @@ type Reconciler struct {
reporter channel.StatsReporter
messagingClientSet messagingv1.MessagingV1Interface
eventTypeLister v1beta2.EventTypeLister
inMemoryChannelLister listers.InMemoryChannelLister
eventingClient eventingv1beta2.EventingV1beta2Interface
featureStore *feature.Store
eventDispatcher *kncloudevents.Dispatcher
tokenVerifier *auth.OIDCTokenVerifier

clientConfig eventingtls.ClientConfig
tokenVerifier *auth.OIDCTokenVerifier
clientConfig eventingtls.ClientConfig
}

// Check the interfaces Reconciler should implement
Expand Down Expand Up @@ -133,6 +136,7 @@ func (r *Reconciler) reconcile(ctx context.Context, imc *v1.InMemoryChannel) rec
r.eventDispatcher,
channel.OIDCTokenVerification(r.tokenVerifier, audience(imc)),
channel.ReceiverWithContextFunc(wc),
channel.ReceiverWithGetPoliciesForFunc(r.getAppliedEventPolicyRef),
)
if err != nil {
logging.FromContext(ctx).Error("Failed to create a new fanout.EventHandler", err)
Expand Down Expand Up @@ -165,6 +169,7 @@ func (r *Reconciler) reconcile(ctx context.Context, imc *v1.InMemoryChannel) rec
channel.ResolveChannelFromPath(channel.ParseChannelFromPath),
channel.OIDCTokenVerification(r.tokenVerifier, audience(imc)),
channel.ReceiverWithContextFunc(wc),
channel.ReceiverWithGetPoliciesForFunc(r.getAppliedEventPolicyRef),
)
if err != nil {
logging.FromContext(ctx).Error("Failed to create a new fanout.EventHandler", err)
Expand Down Expand Up @@ -278,6 +283,15 @@ func (r *Reconciler) deleteFunc(obj interface{}) {
handleSubscribers(imc.Spec.Subscribers, kncloudevents.DeleteAddressableHandler)
}

func (r *Reconciler) getAppliedEventPolicyRef(channel channel.ChannelReference) ([]eventingduckv1.AppliedEventPolicyRef, error) {
imc, err := r.inMemoryChannelLister.InMemoryChannels(channel.Namespace).Get(channel.Name)
if err != nil {
return nil, fmt.Errorf("could not get inmemory channel %s/%s: %w", channel.Namespace, channel.Name, err)
}

return imc.Status.Policies, nil
}

func handleSubscribers(subscribers []eventingduckv1.SubscriberSpec, handle func(duckv1.Addressable)) {
for _, sub := range subscribers {
handle(duckv1.Addressable{
Expand Down
38 changes: 38 additions & 0 deletions test/rekt/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

"knative.dev/eventing/test/rekt/features/authz"
"knative.dev/reconciler-test/pkg/feature"

"github.com/cloudevents/sdk-go/v2/binding"
Expand All @@ -39,6 +40,7 @@ import (
"knative.dev/eventing/test/rekt/features/channel"
"knative.dev/eventing/test/rekt/features/oidc"
ch "knative.dev/eventing/test/rekt/resources/channel"
channelresource "knative.dev/eventing/test/rekt/resources/channel"
"knative.dev/eventing/test/rekt/resources/channel_impl"
"knative.dev/eventing/test/rekt/resources/subscription"
)
Expand Down Expand Up @@ -392,3 +394,39 @@ func TestChannelImplSupportsOIDC(t *testing.T) {

env.TestSet(ctx, t, oidc.AddressableOIDCConformance(channel_impl.GVR(), channel_impl.GVK().Kind, name, env.Namespace()))
}

func TestChannelImplSupportsAuthZ(t *testing.T) {
t.Parallel()

ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.Managed(t),
eventshub.WithTLS(t),
)

name := feature.MakeRandomK8sName("channelimpl")
env.Prerequisite(ctx, t, channel.ImplGoesReady(name))

env.TestSet(ctx, t, authz.AddressableAuthZConformance(channel_impl.GVR(), channel_impl.GVK().Kind, name))
}

func TestChannelSupportsAuthZ(t *testing.T) {
t.Parallel()

ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.Managed(t),
eventshub.WithTLS(t),
)

name := feature.MakeRandomK8sName("channel")
env.Prerequisite(ctx, t, channel.GoesReady(name))

env.TestSet(ctx, t, authz.AddressableAuthZConformance(channelresource.GVR(), "Channel", name))
}

0 comments on commit bf945f9

Please sign in to comment.