diff --git a/pkg/channel/event_receiver.go b/pkg/channel/event_receiver.go index baaa8653ba7..37e56f8b4c3 100644 --- a/pkg/channel/event_receiver.go +++ b/pkg/channel/event_receiver.go @@ -23,6 +23,8 @@ import ( nethttp "net/http" "time" + duckv1 "knative.dev/eventing/pkg/apis/duck/v1" + "knative.dev/eventing/pkg/apis/feature" "knative.dev/eventing/pkg/auth" @@ -71,6 +73,7 @@ type EventReceiver struct { reporter StatsReporter tokenVerifier *auth.OIDCTokenVerifier audience string + getPoliciesForFunc GetPoliciesForFunc withContext func(context.Context) context.Context } @@ -107,6 +110,16 @@ func ResolveChannelFromPath(PathToChannelFunc ResolveChannelFromPathFunc) EventR } } +// GetPoliciesForFunc function enables the EventReceiver to get the Channels AppliedEventPoliciesStatus +type GetPoliciesForFunc func(channel ChannelReference) ([]duckv1.AppliedEventPolicyRef, error) + +func ReceiverWithGetPoliciesForFunc(fn GetPoliciesForFunc) EventReceiverOptions { + return func(r *EventReceiver) error { + r.getPoliciesForFunc = fn + return nil + } +} + func OIDCTokenVerification(tokenVerifier *auth.OIDCTokenVerifier, audience string) EventReceiverOptions { return func(r *EventReceiver) error { r.tokenVerifier = tokenVerifier @@ -256,12 +269,26 @@ func (r *EventReceiver) ServeHTTP(response nethttp.ResponseWriter, request *neth features := feature.FromContext(ctx) if features.IsOIDCAuthentication() { r.logger.Debug("OIDC authentication is enabled") - err = r.tokenVerifier.VerifyJWTFromRequest(ctx, request, &r.audience, response) + + if r.getPoliciesForFunc == nil { + r.logger.Error("getPoliciesForFunc() callback not set. Can't get applying event policies of channel") + response.WriteHeader(nethttp.StatusInternalServerError) + return + } + + applyingEventPolicies, err := r.getPoliciesForFunc(channel) + if err != nil { + r.logger.Error("could not get applying event policies of channel", zap.Error(err), zap.String("channel", channel.String())) + response.WriteHeader(nethttp.StatusInternalServerError) + return + } + + err = r.tokenVerifier.VerifyRequest(ctx, features, &r.audience, channel.Namespace, applyingEventPolicies, request, response) if err != nil { - r.logger.Warn("Error when validating the JWT token in the request", zap.Error(err)) + r.logger.Warn("could not verify authn and authz of request", zap.Error(err)) return } - r.logger.Debug("Request contained a valid JWT. Continuing...") + r.logger.Debug("Request contained a valid and authorized JWT. Continuing...") } err = r.receiverFunc(request.Context(), channel, *event, utils.PassThroughHeaders(request.Header)) diff --git a/pkg/reconciler/inmemorychannel/dispatcher/controller.go b/pkg/reconciler/inmemorychannel/dispatcher/controller.go index fbe1cd4fde3..d79320d24ff 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/controller.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/controller.go @@ -138,6 +138,7 @@ func NewController( eventDispatcher: kncloudevents.NewDispatcher(clientConfig, oidcTokenProvider), tokenVerifier: auth.NewOIDCTokenVerifier(ctx), clientConfig: clientConfig, + inMemoryChannelLister: inmemorychannelInformer.Lister(), } var globalResync func(obj interface{}) diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go index e94ab710da3..22654823a7d 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go @@ -20,6 +20,8 @@ import ( "context" "fmt" + listers "knative.dev/eventing/pkg/client/listers/messaging/v1" + "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "go.uber.org/zap" @@ -55,12 +57,13 @@ type Reconciler struct { reporter channel.StatsReporter messagingClientSet messagingv1.MessagingV1Interface eventTypeLister v1beta2.EventTypeLister + inMemoryChannelLister listers.InMemoryChannelLister eventingClient eventingv1beta2.EventingV1beta2Interface featureStore *feature.Store eventDispatcher *kncloudevents.Dispatcher - tokenVerifier *auth.OIDCTokenVerifier - clientConfig eventingtls.ClientConfig + tokenVerifier *auth.OIDCTokenVerifier + clientConfig eventingtls.ClientConfig } // Check the interfaces Reconciler should implement @@ -133,6 +136,7 @@ func (r *Reconciler) reconcile(ctx context.Context, imc *v1.InMemoryChannel) rec r.eventDispatcher, channel.OIDCTokenVerification(r.tokenVerifier, audience(imc)), channel.ReceiverWithContextFunc(wc), + channel.ReceiverWithGetPoliciesForFunc(r.getAppliedEventPolicyRef), ) if err != nil { logging.FromContext(ctx).Error("Failed to create a new fanout.EventHandler", err) @@ -165,6 +169,7 @@ func (r *Reconciler) reconcile(ctx context.Context, imc *v1.InMemoryChannel) rec channel.ResolveChannelFromPath(channel.ParseChannelFromPath), channel.OIDCTokenVerification(r.tokenVerifier, audience(imc)), channel.ReceiverWithContextFunc(wc), + channel.ReceiverWithGetPoliciesForFunc(r.getAppliedEventPolicyRef), ) if err != nil { logging.FromContext(ctx).Error("Failed to create a new fanout.EventHandler", err) @@ -278,6 +283,15 @@ func (r *Reconciler) deleteFunc(obj interface{}) { handleSubscribers(imc.Spec.Subscribers, kncloudevents.DeleteAddressableHandler) } +func (r *Reconciler) getAppliedEventPolicyRef(channel channel.ChannelReference) ([]eventingduckv1.AppliedEventPolicyRef, error) { + imc, err := r.inMemoryChannelLister.InMemoryChannels(channel.Namespace).Get(channel.Name) + if err != nil { + return nil, fmt.Errorf("could not get inmemory channel %s/%s: %w", channel.Namespace, channel.Name, err) + } + + return imc.Status.Policies, nil +} + func handleSubscribers(subscribers []eventingduckv1.SubscriberSpec, handle func(duckv1.Addressable)) { for _, sub := range subscribers { handle(duckv1.Addressable{ diff --git a/test/rekt/channel_test.go b/test/rekt/channel_test.go index bc1b08cef9b..c0c32358e39 100644 --- a/test/rekt/channel_test.go +++ b/test/rekt/channel_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "knative.dev/eventing/test/rekt/features/authz" "knative.dev/reconciler-test/pkg/feature" "github.com/cloudevents/sdk-go/v2/binding" @@ -39,6 +40,7 @@ import ( "knative.dev/eventing/test/rekt/features/channel" "knative.dev/eventing/test/rekt/features/oidc" ch "knative.dev/eventing/test/rekt/resources/channel" + channelresource "knative.dev/eventing/test/rekt/resources/channel" "knative.dev/eventing/test/rekt/resources/channel_impl" "knative.dev/eventing/test/rekt/resources/subscription" ) @@ -392,3 +394,39 @@ func TestChannelImplSupportsOIDC(t *testing.T) { env.TestSet(ctx, t, oidc.AddressableOIDCConformance(channel_impl.GVR(), channel_impl.GVK().Kind, name, env.Namespace())) } + +func TestChannelImplSupportsAuthZ(t *testing.T) { + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.Managed(t), + eventshub.WithTLS(t), + ) + + name := feature.MakeRandomK8sName("channelimpl") + env.Prerequisite(ctx, t, channel.ImplGoesReady(name)) + + env.TestSet(ctx, t, authz.AddressableAuthZConformance(channel_impl.GVR(), channel_impl.GVK().Kind, name)) +} + +func TestChannelSupportsAuthZ(t *testing.T) { + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.Managed(t), + eventshub.WithTLS(t), + ) + + name := feature.MakeRandomK8sName("channel") + env.Prerequisite(ctx, t, channel.GoesReady(name)) + + env.TestSet(ctx, t, authz.AddressableAuthZConformance(channelresource.GVR(), "Channel", name)) +}