From d4e647d6fcc01e8dc5679f6095230af417b0027d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Mon, 24 Jun 2024 18:58:01 +0200 Subject: [PATCH] List applying policies in Channel and propagate EventPolicies to underlying channel (#8014) * Move AppliedEventPoliciesStatus to ChannelableStatus to have list of policies in channel resource * Move functionality to update status with policies to auth package and make generic * Resync Channels when features CM changes * Simplify WithEventPolicyToRef * List applying policies in Channels status * Add integration test for listing policies * Copy channels EventPolicies to backing channel --- config/core/resources/channel.yaml | 12 + .../core/roles/controller-clusterroles.yaml | 2 + docs/eventing-api.md | 36 +-- pkg/apis/duck/v1/channelable_types.go | 3 + pkg/apis/duck/v1/zz_generated.deepcopy.go | 1 + pkg/apis/messaging/v1/channel_lifecycle.go | 21 ++ .../messaging/v1/channel_lifecycle_test.go | 30 ++ .../messaging/v1/in_memory_channel_types.go | 4 - .../messaging/v1/zz_generated.deepcopy.go | 1 - pkg/auth/event_policy.go | 52 ++++ pkg/reconciler/channel/channel.go | 104 +++++++ pkg/reconciler/channel/channel_test.go | 267 +++++++++++++++++- pkg/reconciler/channel/controller.go | 40 ++- pkg/reconciler/channel/controller_test.go | 12 +- .../channel/resources/eventpolicy.go | 76 +++++ .../controller/inmemorychannel.go | 40 +-- .../controller/inmemorychannel_test.go | 32 +-- .../subscription/subscription_test.go | 1 + pkg/reconciler/testing/v1/channel.go | 38 +++ pkg/reconciler/testing/v1/eventpolicy.go | 31 +- 20 files changed, 694 insertions(+), 109 deletions(-) create mode 100644 pkg/reconciler/channel/resources/eventpolicy.go diff --git a/config/core/resources/channel.yaml b/config/core/resources/channel.yaml index 02b174ee93f..4ca65afb239 100644 --- a/config/core/resources/channel.yaml +++ b/config/core/resources/channel.yaml @@ -244,6 +244,18 @@ spec: namespace: description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/ This is optional field, it gets defaulted to the object holding it if left out.' type: string + policies: + description: List of applied EventPolicies + type: array + items: + type: object + properties: + apiVersion: + description: The API version of the applied EventPolicy. This indicates, which version of EventPolicy is supported by the resource. + type: string + name: + description: The name of the applied EventPolicy + type: string conditions: description: Conditions the latest available observations of a resource's current state. type: array diff --git a/config/core/roles/controller-clusterroles.yaml b/config/core/roles/controller-clusterroles.yaml index a68986c7e5e..5dcf0479f4f 100644 --- a/config/core/roles/controller-clusterroles.yaml +++ b/config/core/roles/controller-clusterroles.yaml @@ -97,6 +97,8 @@ rules: - "triggers/status" - "eventtypes" - "eventtypes/status" + - "eventpolicies" + - "eventpolicies/status" verbs: - "get" - "list" diff --git a/docs/eventing-api.md b/docs/eventing-api.md index c11bcd806ea..26d31ae8bfb 100644 --- a/docs/eventing-api.md +++ b/docs/eventing-api.md @@ -49,7 +49,7 @@ Resource Types:

AppliedEventPoliciesStatus

-(Appears on:BrokerStatus, ParallelStatus, SequenceStatus, InMemoryChannelStatus) +(Appears on:ChannelableStatus, BrokerStatus, ParallelStatus, SequenceStatus)

AppliedEventPoliciesStatus contains the list of policies which apply to a resource. @@ -368,6 +368,23 @@ DeliveryStatus resolved delivery options.

+ + +AppliedEventPoliciesStatus
+ + +AppliedEventPoliciesStatus + + + + +

+(Members of AppliedEventPoliciesStatus are embedded into this type.) +

+(Optional) +

AppliedEventPoliciesStatus contains the list of EventPolicies which apply to this Channel

+ +

DeliverySpec @@ -5459,23 +5476,6 @@ ChannelableStatus

Channel conforms to Duck type ChannelableStatus.

- - -AppliedEventPoliciesStatus
- - -AppliedEventPoliciesStatus - - - - -

-(Members of AppliedEventPoliciesStatus are embedded into this type.) -

-(Optional) -

AppliedEventPoliciesStatus contains the list of EventPolicies which apply to this Broker

- -

SubscriptionSpec diff --git a/pkg/apis/duck/v1/channelable_types.go b/pkg/apis/duck/v1/channelable_types.go index fef82764dea..82d75bd986a 100644 --- a/pkg/apis/duck/v1/channelable_types.go +++ b/pkg/apis/duck/v1/channelable_types.go @@ -67,6 +67,9 @@ type ChannelableStatus struct { // resolved delivery options. // +optional DeliveryStatus `json:",inline"` + // AppliedEventPoliciesStatus contains the list of EventPolicies which apply to this Channel + // +optional + AppliedEventPoliciesStatus `json:",inline"` } var ( diff --git a/pkg/apis/duck/v1/zz_generated.deepcopy.go b/pkg/apis/duck/v1/zz_generated.deepcopy.go index 609c7f6f6e0..d7965aaf4c6 100644 --- a/pkg/apis/duck/v1/zz_generated.deepcopy.go +++ b/pkg/apis/duck/v1/zz_generated.deepcopy.go @@ -154,6 +154,7 @@ func (in *ChannelableStatus) DeepCopyInto(out *ChannelableStatus) { in.AddressStatus.DeepCopyInto(&out.AddressStatus) in.SubscribableStatus.DeepCopyInto(&out.SubscribableStatus) in.DeliveryStatus.DeepCopyInto(&out.DeliveryStatus) + in.AppliedEventPoliciesStatus.DeepCopyInto(&out.AppliedEventPoliciesStatus) return } diff --git a/pkg/apis/messaging/v1/channel_lifecycle.go b/pkg/apis/messaging/v1/channel_lifecycle.go index 2936a1f194e..4f225822fce 100644 --- a/pkg/apis/messaging/v1/channel_lifecycle.go +++ b/pkg/apis/messaging/v1/channel_lifecycle.go @@ -29,6 +29,7 @@ var chCondSet = apis.NewLivingConditionSet( ChannelConditionBackingChannelReady, ChannelConditionAddressable, ChannelConditionDeadLetterSinkResolved, + ChannelConditionEventPoliciesReady, ) const ( @@ -45,6 +46,10 @@ const ( // ChannelConditionDeadLetterSinkResolved has status True when there is a Dead Letter Sink ref or URI // defined in the Spec.Delivery, is a valid destination and its correctly resolved into a valid URI ChannelConditionDeadLetterSinkResolved apis.ConditionType = "DeadLetterSinkResolved" + + // ChannelConditionEventPoliciesReady has status True when all the EventPolicies which reference this + // Channel are Ready too. + ChannelConditionEventPoliciesReady apis.ConditionType = "EventPoliciesReady" ) // GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface. @@ -146,3 +151,19 @@ func (cs *ChannelStatus) MarkDeadLetterSinkResolvedFailed(reason, messageFormat cs.DeliveryStatus = eventingduck.DeliveryStatus{} chCondSet.Manage(cs).MarkFalse(ChannelConditionDeadLetterSinkResolved, reason, messageFormat, messageA...) } + +func (cs *ChannelStatus) MarkEventPoliciesFailed(reason, messageFormat string, messageA ...interface{}) { + chCondSet.Manage(cs).MarkFalse(ChannelConditionEventPoliciesReady, reason, messageFormat, messageA...) +} + +func (cs *ChannelStatus) MarkEventPoliciesUnknown(reason, messageFormat string, messageA ...interface{}) { + chCondSet.Manage(cs).MarkUnknown(ChannelConditionEventPoliciesReady, reason, messageFormat, messageA...) +} + +func (cs *ChannelStatus) MarkEventPoliciesTrue() { + chCondSet.Manage(cs).MarkTrue(ChannelConditionEventPoliciesReady) +} + +func (cs *ChannelStatus) MarkEventPoliciesTrueWithReason(reason, messageFormat string, messageA ...interface{}) { + chCondSet.Manage(cs).MarkTrueWithReason(ChannelConditionEventPoliciesReady, reason, messageFormat, messageA...) +} diff --git a/pkg/apis/messaging/v1/channel_lifecycle_test.go b/pkg/apis/messaging/v1/channel_lifecycle_test.go index 7dd31487e32..91a0775ba02 100644 --- a/pkg/apis/messaging/v1/channel_lifecycle_test.go +++ b/pkg/apis/messaging/v1/channel_lifecycle_test.go @@ -108,6 +108,9 @@ func TestChannelInitializeConditions(t *testing.T) { }, { Type: ChannelConditionDeadLetterSinkResolved, Status: corev1.ConditionUnknown, + }, { + Type: ChannelConditionEventPoliciesReady, + Status: corev1.ConditionUnknown, }, { Type: ChannelConditionReady, Status: corev1.ConditionUnknown, @@ -139,6 +142,9 @@ func TestChannelInitializeConditions(t *testing.T) { }, { Type: ChannelConditionDeadLetterSinkResolved, Status: corev1.ConditionUnknown, + }, { + Type: ChannelConditionEventPoliciesReady, + Status: corev1.ConditionUnknown, }, { Type: ChannelConditionReady, Status: corev1.ConditionUnknown, @@ -170,6 +176,9 @@ func TestChannelInitializeConditions(t *testing.T) { }, { Type: ChannelConditionDeadLetterSinkResolved, Status: corev1.ConditionUnknown, + }, { + Type: ChannelConditionEventPoliciesReady, + Status: corev1.ConditionUnknown, }, { Type: ChannelConditionReady, Status: corev1.ConditionUnknown, @@ -198,33 +207,45 @@ func TestChannelConditionStatus(t *testing.T) { address *duckv1.Addressable backingChannelStatus corev1.ConditionStatus DLSResolved corev1.ConditionStatus + eventPolicyStatus corev1.ConditionStatus wantConditionStatus corev1.ConditionStatus }{{ name: "all happy", address: validAddress, backingChannelStatus: corev1.ConditionTrue, DLSResolved: corev1.ConditionTrue, + eventPolicyStatus: corev1.ConditionTrue, wantConditionStatus: corev1.ConditionTrue, }, { name: "address not set", address: &duckv1.Addressable{}, backingChannelStatus: corev1.ConditionTrue, + eventPolicyStatus: corev1.ConditionTrue, wantConditionStatus: corev1.ConditionFalse, }, { name: "nil address", address: nil, backingChannelStatus: corev1.ConditionTrue, + eventPolicyStatus: corev1.ConditionTrue, wantConditionStatus: corev1.ConditionFalse, }, { name: "backing channel with unknown status", address: validAddress, backingChannelStatus: corev1.ConditionUnknown, + eventPolicyStatus: corev1.ConditionTrue, wantConditionStatus: corev1.ConditionUnknown, }, { name: "backing channel with false status", address: validAddress, backingChannelStatus: corev1.ConditionFalse, + eventPolicyStatus: corev1.ConditionTrue, + wantConditionStatus: corev1.ConditionFalse, + }, { + name: "EventPolicies not Ready", + address: validAddress, + backingChannelStatus: corev1.ConditionTrue, + eventPolicyStatus: corev1.ConditionFalse, wantConditionStatus: corev1.ConditionFalse, }} for _, test := range tests { @@ -240,6 +261,14 @@ func TestChannelConditionStatus(t *testing.T) { cs.MarkBackingChannelUnknown("ChannelUnknown", "testing") } + if test.eventPolicyStatus == corev1.ConditionTrue { + cs.MarkEventPoliciesTrue() + } else if test.eventPolicyStatus == corev1.ConditionFalse { + cs.MarkEventPoliciesFailed("EventPolicyFailure", "testing") + } else { + cs.MarkEventPoliciesUnknown("EventPolicyFailure", "testing") + } + if test.DLSResolved == corev1.ConditionTrue { cs.MarkDeadLetterSinkResolvedSucceeded(v1.DeliveryStatus{}) } @@ -421,6 +450,7 @@ func TestChannelPropagateStatuses(t *testing.T) { cs := &ChannelStatus{} cs.PropagateStatuses(tc.channelableStatus) cs.MarkDeadLetterSinkNotConfigured() + cs.MarkEventPoliciesTrue() got := cs.GetTopLevelCondition().Status if tc.wantConditionStatus != got { t.Errorf("unexpected readiness: want %v, got %v", tc.wantConditionStatus, got) diff --git a/pkg/apis/messaging/v1/in_memory_channel_types.go b/pkg/apis/messaging/v1/in_memory_channel_types.go index cdf0f6075ba..5b0c971b54b 100644 --- a/pkg/apis/messaging/v1/in_memory_channel_types.go +++ b/pkg/apis/messaging/v1/in_memory_channel_types.go @@ -73,10 +73,6 @@ type InMemoryChannelSpec struct { type InMemoryChannelStatus struct { // Channel conforms to Duck type ChannelableStatus. eventingduckv1.ChannelableStatus `json:",inline"` - - // AppliedEventPoliciesStatus contains the list of EventPolicies which apply to this Broker - // +optional - eventingduckv1.AppliedEventPoliciesStatus `json:",inline"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/apis/messaging/v1/zz_generated.deepcopy.go b/pkg/apis/messaging/v1/zz_generated.deepcopy.go index f0e08f87f5f..fff2d237b49 100644 --- a/pkg/apis/messaging/v1/zz_generated.deepcopy.go +++ b/pkg/apis/messaging/v1/zz_generated.deepcopy.go @@ -245,7 +245,6 @@ func (in *InMemoryChannelSpec) DeepCopy() *InMemoryChannelSpec { func (in *InMemoryChannelStatus) DeepCopyInto(out *InMemoryChannelStatus) { *out = *in in.ChannelableStatus.DeepCopyInto(&out.ChannelableStatus) - in.AppliedEventPoliciesStatus.DeepCopyInto(&out.AppliedEventPoliciesStatus) return } diff --git a/pkg/auth/event_policy.go b/pkg/auth/event_policy.go index e049772f135..56ac38021be 100644 --- a/pkg/auth/event_policy.go +++ b/pkg/auth/event_policy.go @@ -20,6 +20,9 @@ import ( "fmt" "strings" + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + "knative.dev/eventing/pkg/apis/feature" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" @@ -287,3 +290,52 @@ func EventPolicyEventHandler(indexer cache.Indexer, gk schema.GroupKind, enqueue }, } } + +type EventPolicyStatusMarker interface { + MarkEventPoliciesFailed(reason, messageFormat string, messageA ...interface{}) + MarkEventPoliciesUnknown(reason, messageFormat string, messageA ...interface{}) + MarkEventPoliciesTrue() + MarkEventPoliciesTrueWithReason(reason, messageFormat string, messageA ...interface{}) +} + +func UpdateStatusWithEventPolicies(featureFlags feature.Flags, status *eventingduckv1.AppliedEventPoliciesStatus, statusMarker EventPolicyStatusMarker, eventPolicyLister listerseventingv1alpha1.EventPolicyLister, gvk schema.GroupVersionKind, objectMeta metav1.ObjectMeta) error { + status.Policies = nil + + applyingEvenPolicies, err := GetEventPoliciesForResource(eventPolicyLister, gvk, objectMeta) + if err != nil { + statusMarker.MarkEventPoliciesFailed("EventPoliciesGetFailed", "Failed to get applying event policies") + return fmt.Errorf("unable to get applying event policies: %w", err) + } + + if len(applyingEvenPolicies) > 0 { + unreadyEventPolicies := []string{} + for _, policy := range applyingEvenPolicies { + if !policy.Status.IsReady() { + unreadyEventPolicies = append(unreadyEventPolicies, policy.Name) + } else { + // only add Ready policies to the list + status.Policies = append(status.Policies, eventingduckv1.AppliedEventPolicyRef{ + Name: policy.Name, + APIVersion: v1alpha1.SchemeGroupVersion.String(), + }) + } + } + + if len(unreadyEventPolicies) == 0 { + statusMarker.MarkEventPoliciesTrue() + } else { + statusMarker.MarkEventPoliciesFailed("EventPoliciesNotReady", "event policies %s are not ready", strings.Join(unreadyEventPolicies, ", ")) + } + } else { + // we have no applying event policy. So we set the EP condition to True + if featureFlags.IsOIDCAuthentication() { + // in case of OIDC auth, we also set the message with the default authorization mode + statusMarker.MarkEventPoliciesTrueWithReason("DefaultAuthorizationMode", "Default authz mode is %q", featureFlags[feature.AuthorizationDefaultMode]) + } else { + // in case OIDC is disabled, we set EP condition to true too, but give some message that authz (EPs) require OIDC + statusMarker.MarkEventPoliciesTrueWithReason("OIDCDisabled", "Feature %q must be enabled to support Authorization", feature.OIDCAuthentication) + } + } + + return nil +} diff --git a/pkg/reconciler/channel/channel.go b/pkg/reconciler/channel/channel.go index 94d8d41422f..daa66ca5ff0 100644 --- a/pkg/reconciler/channel/channel.go +++ b/pkg/reconciler/channel/channel.go @@ -20,11 +20,20 @@ import ( "context" "fmt" + "knative.dev/eventing/pkg/reconciler/channel/resources" + + "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/labels" + eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + eventingclientset "knative.dev/eventing/pkg/client/clientset/versioned" + "go.uber.org/zap" apierrs "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/dynamic" + "knative.dev/eventing/pkg/apis/feature" + "knative.dev/eventing/pkg/auth" "knative.dev/pkg/kmeta" duckapis "knative.dev/pkg/apis/duck" @@ -35,6 +44,7 @@ import ( eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" v1 "knative.dev/eventing/pkg/apis/messaging/v1" channelreconciler "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/channel" + eventingv1alpha1listers "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1" listers "knative.dev/eventing/pkg/client/listers/messaging/v1" ducklib "knative.dev/eventing/pkg/duck" eventingduck "knative.dev/eventing/pkg/duck" @@ -47,6 +57,10 @@ type Reconciler struct { // dynamicClientSet allows us to configure pluggable Build objects dynamicClientSet dynamic.Interface + + eventPolicyLister eventingv1alpha1listers.EventPolicyLister + + eventingClientSet eventingclientset.Interface } // Check that our Reconciler implements Interface @@ -54,6 +68,8 @@ var _ channelreconciler.Interface = (*Reconciler)(nil) // ReconcileKind implements Interface.ReconcileKind. func (r *Reconciler) ReconcileKind(ctx context.Context, c *v1.Channel) pkgreconciler.Event { + featureFlags := feature.FromContext(ctx) + // 1. Create the backing Channel CRD, if it doesn't exist. // 2. Propagate the backing Channel CRD Status, Address, and SubscribableStatus into this Channel. @@ -96,9 +112,97 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, c *v1.Channel) pkgreconc c.Status.MarkDeadLetterSinkNotConfigured() } + err = auth.UpdateStatusWithEventPolicies(featureFlags, &c.Status.AppliedEventPoliciesStatus, &c.Status, r.eventPolicyLister, v1.SchemeGroupVersion.WithKind("Channel"), c.ObjectMeta) + if err != nil { + return fmt.Errorf("could not update channel status with EventPolicies: %v", err) + } + + err = r.reconcileBackingChannelEventPolicies(ctx, c, backingChannel) + if err != nil { + return fmt.Errorf("could not reconcile backing channels (%s/%s) event policies: %w", backingChannel.Namespace, backingChannel.Name, err) + } + + return nil +} + +func (r *Reconciler) reconcileBackingChannelEventPolicies(ctx context.Context, channel *v1.Channel, backingChannel *eventingduckv1.Channelable) error { + applyingEventPoliciesForChannel, err := auth.GetEventPoliciesForResource(r.eventPolicyLister, v1.SchemeGroupVersion.WithKind("Channel"), channel.ObjectMeta) + if err != nil { + return fmt.Errorf("could not get applying EventPolicies for for channel %s/%s: %w", channel.Namespace, channel.Name, err) + } + + for _, policy := range applyingEventPoliciesForChannel { + err := r.reconcileBackingChannelEventPolicy(ctx, backingChannel, policy) + if err != nil { + return fmt.Errorf("could not reconcile EventPolicy %s/%s for backing channel %s/%s: %w", policy.Namespace, policy.Name, backingChannel.Namespace, backingChannel.Name, err) + } + } + + // Check, if we have old EP for the backing channel, which are not relevant anymore + applyingEventPoliciesForBackingChannel, err := auth.GetEventPoliciesForResource(r.eventPolicyLister, backingChannel.GroupVersionKind(), backingChannel.ObjectMeta) + if err != nil { + return fmt.Errorf("could not get applying EventPolicies for for backing channel %s/%s: %w", channel.Namespace, channel.Name, err) + } + + selector, err := labels.ValidatedSelectorFromSet(resources.LabelsForBackingChannelsEventPolicy(backingChannel)) + if err != nil { + return fmt.Errorf("could not get valid selector for backing channels EventPolicy %s/%s: %w", backingChannel.Namespace, backingChannel.Name, err) + } + + existingEventPoliciesForBackingChannel, err := r.eventPolicyLister.EventPolicies(backingChannel.Namespace).List(selector) + if err != nil { + return fmt.Errorf("could not get existing EventPolicies in backing channels namespace %q: %w", backingChannel.Namespace, err) + } + + for _, policy := range existingEventPoliciesForBackingChannel { + if !r.containsPolicy(policy.Name, applyingEventPoliciesForBackingChannel) { + + // the existing policy is not in the list of applying policies anymore --> is outdated --> delete it + err := r.eventingClientSet.EventingV1alpha1().EventPolicies(policy.Namespace).Delete(ctx, policy.Name, metav1.DeleteOptions{}) + if err != nil && apierrs.IsNotFound(err) { + return fmt.Errorf("could not delete old EventPolicy %s/%s: %w", policy.Namespace, policy.Name, err) + } + } + } + + return nil +} + +func (r *Reconciler) reconcileBackingChannelEventPolicy(ctx context.Context, backingChannel *eventingduckv1.Channelable, eventpolicy *eventingv1alpha1.EventPolicy) error { + expected := resources.MakeEventPolicyForBackingChannel(backingChannel, eventpolicy) + + foundEP, err := r.eventPolicyLister.EventPolicies(expected.Namespace).Get(expected.Name) + if apierrs.IsNotFound(err) { + _, err := r.eventingClientSet.EventingV1alpha1().EventPolicies(expected.Namespace).Create(ctx, expected, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("could not create EventPolicy %s/%s: %w", expected.Namespace, expected.Name, err) + } + } else if err != nil { + return fmt.Errorf("could not get EventPolicy %s/%s: %w", expected.Namespace, expected.Name, err) + } else if r.policyNeedsUpdate(foundEP, expected) { + expected.SetResourceVersion(foundEP.ResourceVersion) + _, err := r.eventingClientSet.EventingV1alpha1().EventPolicies(expected.Namespace).Update(ctx, expected, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("could not update EventPolicy %s/%s: %w", expected.Namespace, expected.Name, err) + } + } + return nil } +func (r *Reconciler) containsPolicy(name string, policies []*eventingv1alpha1.EventPolicy) bool { + for _, policy := range policies { + if policy.Name == name { + return true + } + } + return false +} + +func (r *Reconciler) policyNeedsUpdate(foundEP, expected *eventingv1alpha1.EventPolicy) bool { + return !equality.Semantic.DeepDerivative(expected, foundEP) +} + // reconcileBackingChannel reconciles Channel's 'c' underlying CRD channel. func (r *Reconciler) reconcileBackingChannel(ctx context.Context, channelResourceInterface dynamic.ResourceInterface, c *v1.Channel, backingChannelObjRef duckv1.KReference) (*eventingduckv1.Channelable, error) { logger := logging.FromContext(ctx) diff --git a/pkg/reconciler/channel/channel_test.go b/pkg/reconciler/channel/channel_test.go index 91e0e469ffd..783abe359a2 100644 --- a/pkg/reconciler/channel/channel_test.go +++ b/pkg/reconciler/channel/channel_test.go @@ -19,6 +19,8 @@ import ( "fmt" "testing" + v1 "knative.dev/eventing/pkg/apis/messaging/v1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -26,6 +28,7 @@ import ( "k8s.io/apimachinery/pkg/types" clientgotesting "k8s.io/client-go/testing" "k8s.io/utils/pointer" + fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" v1addr "knative.dev/pkg/client/injection/ducks/duck/v1/addressable" @@ -38,7 +41,6 @@ import ( "knative.dev/pkg/tracker" eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" - fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake" "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable" channelreconciler "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/channel" "knative.dev/eventing/pkg/duck" @@ -48,6 +50,9 @@ import ( const ( testNS = "test-namespace" channelName = "test-channel" + + readyEventPolicyName = "test-event-policy-ready" + unreadyEventPolicyName = "test-event-policy-unready" ) var ( @@ -61,6 +66,18 @@ var ( Name: pointer.String("http"), URL: apis.HTTP(network.GetServiceHostname("foo", "bar")), } + + channelV1GVK = metav1.GroupVersionKind{ + Group: "messaging.knative.dev", + Version: "v1", + Kind: "Channel", + } + + imcV1GVK = metav1.GroupVersionKind{ + Group: "messaging.knative.dev", + Version: "v1", + Kind: "InMemoryChannel", + } ) func TestReconcile(t *testing.T) { @@ -145,7 +162,8 @@ func TestReconcile(t *testing.T) { WithBackingChannelObjRef(backingChannelObjRef()), WithBackingChannelReady, WithChannelDLSUnknown(), - WithChannelAddress(&backingChannelAddressable)), + WithChannelAddress(&backingChannelAddressable), + WithChannelEventPoliciesReadyBecauseOIDCDisabled()), }}, }, { Name: "Already reconciled", @@ -157,7 +175,8 @@ func TestReconcile(t *testing.T) { WithBackingChannelObjRef(backingChannelObjRef()), WithBackingChannelReady, WithChannelAddress(&backingChannelAddressable), - WithChannelDLSUnknown()), + WithChannelDLSUnknown(), + WithChannelEventPoliciesReadyBecauseOIDCDisabled()), NewInMemoryChannel(channelName, testNS, WithInitInMemoryChannelConditions, WithInMemoryChannelDeploymentReady(), @@ -190,7 +209,8 @@ func TestReconcile(t *testing.T) { WithBackingChannelObjRef(backingChannelObjRef()), WithChannelNoAddress(), WithChannelDLSUnknown(), - WithBackingChannelUnknown("BackingChannelNotConfigured", "BackingChannel has not yet been reconciled.")), + WithBackingChannelUnknown("BackingChannelNotConfigured", "BackingChannel has not yet been reconciled."), + WithChannelEventPoliciesReadyBecauseOIDCDisabled()), }}, }, { Name: "Backing channel created with delivery", @@ -240,7 +260,8 @@ func TestReconcile(t *testing.T) { WithChannelNoAddress(), WithChannelDelivery(deliverySpec), WithChannelDLSUnknown(), - WithBackingChannelUnknown("BackingChannelNotConfigured", "BackingChannel has not yet been reconciled.")), + WithBackingChannelUnknown("BackingChannelNotConfigured", "BackingChannel has not yet been reconciled."), + WithChannelEventPoliciesReadyBecauseOIDCDisabled()), }}, }, { Name: "Generation Bump", @@ -274,7 +295,8 @@ func TestReconcile(t *testing.T) { WithChannelAddress(&backingChannelAddressable), WithChannelGeneration(42), // Updates - WithChannelObservedGeneration(42)), + WithChannelObservedGeneration(42), + WithChannelEventPoliciesReadyBecauseOIDCDisabled()), }}, }, { Name: "Updating subscribers statuses", @@ -285,7 +307,8 @@ func TestReconcile(t *testing.T) { WithInitChannelConditions, WithBackingChannelObjRef(backingChannelObjRef()), WithBackingChannelReady, - WithChannelAddress(&backingChannelAddressable)), + WithChannelAddress(&backingChannelAddressable), + WithChannelEventPoliciesReady()), NewInMemoryChannel(channelName, testNS, WithInitInMemoryChannelConditions, WithInMemoryChannelDuckAnnotationV1Beta1, @@ -307,8 +330,234 @@ func TestReconcile(t *testing.T) { WithBackingChannelReady, WithChannelAddress(&backingChannelAddressable), WithChannelSubscriberStatuses(subscriberStatuses()), - WithChannelDLSUnknown()), + WithChannelDLSUnknown(), + WithChannelEventPoliciesReadyBecauseOIDCDisabled()), }}, + }, { + Name: "Should provision applying EventPolicies", + Key: testKey, + Objects: []runtime.Object{ + NewChannel(channelName, testNS, + WithChannelTemplate(channelCRD()), + WithInitChannelConditions), + NewInMemoryChannel(channelName, testNS, + WithInitInMemoryChannelConditions, + WithInMemoryChannelDeploymentReady(), + WithInMemoryChannelServiceReady(), + WithInMemoryChannelEndpointsReady(), + WithInMemoryChannelChannelServiceReady(), + WithInMemoryChannelAddress(backingChannelAddressable), + WithInMemoryChannelDLSUnknown(), + WithInMemoryChannelEventPoliciesReady()), + NewEventPolicy(readyEventPolicyName, testNS, + WithReadyEventPolicyCondition, + WithEventPolicyToRef(channelV1GVK, channelName), + ), + NewEventPolicy(fmt.Sprintf("%s-%s", readyEventPolicyName, channelName), testNS, + WithEventPolicyToRef(imcV1GVK, channelName), + WithEventPolicyOwnerReferences([]metav1.OwnerReference{ + { + APIVersion: v1.SchemeGroupVersion.String(), + Kind: "InMemoryChannel", + Name: channelName, + }, { + Name: readyEventPolicyName, + }, + }...), + WithEventPolicyLabels(map[string]string{ + "messaging.knative.dev/channel-group": v1.SchemeGroupVersion.Group, + "messaging.knative.dev/channel-version": v1.SchemeGroupVersion.Version, + "messaging.knative.dev/channel-kind": "InMemoryChannel", + "messaging.knative.dev/channel-name": channelName, + }), + ), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewChannel(channelName, testNS, + WithChannelTemplate(channelCRD()), + WithInitChannelConditions, + WithBackingChannelObjRef(backingChannelObjRef()), + WithBackingChannelReady, + WithChannelDLSUnknown(), + WithChannelAddress(&backingChannelAddressable), + WithChannelEventPoliciesReady(), + WithChannelEventPoliciesListed(readyEventPolicyName)), + }}, + }, { + Name: "Should mark as NotReady on unready EventPolicies", + Key: testKey, + Objects: []runtime.Object{ + NewChannel(channelName, testNS, + WithChannelTemplate(channelCRD()), + WithInitChannelConditions), + NewInMemoryChannel(channelName, testNS, + WithInitInMemoryChannelConditions, + WithInMemoryChannelDeploymentReady(), + WithInMemoryChannelServiceReady(), + WithInMemoryChannelEndpointsReady(), + WithInMemoryChannelChannelServiceReady(), + WithInMemoryChannelAddress(backingChannelAddressable), + WithInMemoryChannelDLSUnknown(), + WithInMemoryChannelEventPoliciesReady()), + NewEventPolicy(unreadyEventPolicyName, testNS, + WithUnreadyEventPolicyCondition, + WithEventPolicyToRef(channelV1GVK, channelName), + ), + NewEventPolicy(fmt.Sprintf("%s-%s", unreadyEventPolicyName, channelName), testNS, + WithEventPolicyToRef(imcV1GVK, channelName), + WithEventPolicyOwnerReferences([]metav1.OwnerReference{ + { + APIVersion: v1.SchemeGroupVersion.String(), + Kind: "InMemoryChannel", + Name: channelName, + }, { + Name: unreadyEventPolicyName, + }, + }...), + WithEventPolicyLabels(map[string]string{ + "messaging.knative.dev/channel-group": v1.SchemeGroupVersion.Group, + "messaging.knative.dev/channel-version": v1.SchemeGroupVersion.Version, + "messaging.knative.dev/channel-kind": "InMemoryChannel", + "messaging.knative.dev/channel-name": channelName, + }), + ), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewChannel(channelName, testNS, + WithChannelTemplate(channelCRD()), + WithInitChannelConditions, + WithBackingChannelObjRef(backingChannelObjRef()), + WithBackingChannelReady, + WithChannelDLSUnknown(), + WithChannelAddress(&backingChannelAddressable), + WithChannelEventPoliciesNotReady("EventPoliciesNotReady", fmt.Sprintf("event policies %s are not ready", unreadyEventPolicyName))), + }}, + }, { + Name: "should list only Ready EventPolicies", + Key: testKey, + Objects: []runtime.Object{ + NewChannel(channelName, testNS, + WithChannelTemplate(channelCRD()), + WithInitChannelConditions), + NewInMemoryChannel(channelName, testNS, + WithInitInMemoryChannelConditions, + WithInMemoryChannelDeploymentReady(), + WithInMemoryChannelServiceReady(), + WithInMemoryChannelEndpointsReady(), + WithInMemoryChannelChannelServiceReady(), + WithInMemoryChannelAddress(backingChannelAddressable), + WithInMemoryChannelDLSUnknown(), + WithInMemoryChannelEventPoliciesReady()), + NewEventPolicy(readyEventPolicyName, testNS, + WithReadyEventPolicyCondition, + WithEventPolicyToRef(channelV1GVK, channelName), + ), + NewEventPolicy(unreadyEventPolicyName, testNS, + WithUnreadyEventPolicyCondition, + WithEventPolicyToRef(channelV1GVK, channelName), + ), + NewEventPolicy(fmt.Sprintf("%s-%s", readyEventPolicyName, channelName), testNS, + WithEventPolicyToRef(imcV1GVK, channelName), + WithEventPolicyOwnerReferences([]metav1.OwnerReference{ + { + APIVersion: v1.SchemeGroupVersion.String(), + Kind: "InMemoryChannel", + Name: channelName, + }, { + Name: readyEventPolicyName, + }, + }...), + WithEventPolicyLabels(map[string]string{ + "messaging.knative.dev/channel-group": v1.SchemeGroupVersion.Group, + "messaging.knative.dev/channel-version": v1.SchemeGroupVersion.Version, + "messaging.knative.dev/channel-kind": "InMemoryChannel", + "messaging.knative.dev/channel-name": channelName, + }), + ), + NewEventPolicy(fmt.Sprintf("%s-%s", unreadyEventPolicyName, channelName), testNS, + WithEventPolicyToRef(imcV1GVK, channelName), + WithEventPolicyOwnerReferences([]metav1.OwnerReference{ + { + APIVersion: v1.SchemeGroupVersion.String(), + Kind: "InMemoryChannel", + Name: channelName, + }, { + Name: unreadyEventPolicyName, + }, + }...), + WithEventPolicyLabels(map[string]string{ + "messaging.knative.dev/channel-group": v1.SchemeGroupVersion.Group, + "messaging.knative.dev/channel-version": v1.SchemeGroupVersion.Version, + "messaging.knative.dev/channel-kind": "InMemoryChannel", + "messaging.knative.dev/channel-name": channelName, + }), + ), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewChannel(channelName, testNS, + WithChannelTemplate(channelCRD()), + WithInitChannelConditions, + WithBackingChannelObjRef(backingChannelObjRef()), + WithBackingChannelReady, + WithChannelDLSUnknown(), + WithChannelAddress(&backingChannelAddressable), + WithChannelEventPoliciesNotReady("EventPoliciesNotReady", fmt.Sprintf("event policies %s are not ready", unreadyEventPolicyName)), + WithChannelEventPoliciesListed(readyEventPolicyName)), + }}, + }, { + Name: "should create EventPolicies for backing channel", + Key: testKey, + Objects: []runtime.Object{ + NewChannel(channelName, testNS, + WithChannelTemplate(channelCRD()), + WithInitChannelConditions, + WithChannelEventPoliciesReady(), + WithChannelEventPoliciesListed(readyEventPolicyName)), + NewInMemoryChannel(channelName, testNS, + WithInitInMemoryChannelConditions, + WithInMemoryChannelDeploymentReady(), + WithInMemoryChannelServiceReady(), + WithInMemoryChannelEndpointsReady(), + WithInMemoryChannelChannelServiceReady(), + WithInMemoryChannelAddress(backingChannelAddressable), + WithInMemoryChannelDLSUnknown(), + WithInMemoryChannelEventPoliciesReady()), + NewEventPolicy(readyEventPolicyName, testNS, + WithReadyEventPolicyCondition, + WithEventPolicyToRef(channelV1GVK, channelName), + ), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: NewChannel(channelName, testNS, + WithChannelTemplate(channelCRD()), + WithInitChannelConditions, + WithBackingChannelObjRef(backingChannelObjRef()), + WithBackingChannelReady, + WithChannelDLSUnknown(), + WithChannelAddress(&backingChannelAddressable), + WithChannelEventPoliciesReady(), + WithChannelEventPoliciesListed(readyEventPolicyName)), + }}, + WantCreates: []runtime.Object{ + NewEventPolicy(fmt.Sprintf("%s-%s", readyEventPolicyName, channelName), testNS, + WithEventPolicyToRef(imcV1GVK, channelName), + WithEventPolicyOwnerReferences([]metav1.OwnerReference{ + { + APIVersion: v1.SchemeGroupVersion.String(), + Kind: "InMemoryChannel", + Name: channelName, + }, { + Name: readyEventPolicyName, + }, + }...), + WithEventPolicyLabels(map[string]string{ + "messaging.knative.dev/channel-group": v1.SchemeGroupVersion.Group, + "messaging.knative.dev/channel-version": v1.SchemeGroupVersion.Version, + "messaging.knative.dev/channel-kind": "InMemoryChannel", + "messaging.knative.dev/channel-name": channelName, + }), + ), + }, }} logger := logtesting.TestLogger(t) @@ -319,6 +568,8 @@ func TestReconcile(t *testing.T) { dynamicClientSet: fakedynamicclient.Get(ctx), channelLister: listers.GetMessagingChannelLister(), channelableTracker: &fakeListableTracker{duck.NewListableTrackerFromTracker(ctx, channelable.Get, tracker.New(func(types.NamespacedName) {}, 0))}, + eventPolicyLister: listers.GetEventPolicyLister(), + eventingClientSet: fakeeventingclient.Get(ctx), } return channelreconciler.NewReconciler(ctx, logger, fakeeventingclient.Get(ctx), listers.GetMessagingChannelLister(), diff --git a/pkg/reconciler/channel/controller.go b/pkg/reconciler/channel/controller.go index 73d5818b389..2d9eff5f6f8 100644 --- a/pkg/reconciler/channel/controller.go +++ b/pkg/reconciler/channel/controller.go @@ -19,11 +19,19 @@ package channel import ( "context" + messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" + "knative.dev/eventing/pkg/auth" + + "knative.dev/eventing/pkg/apis/feature" + "knative.dev/pkg/logging" + "knative.dev/pkg/configmap" "knative.dev/pkg/controller" "knative.dev/pkg/injection/clients/dynamicclient" + eventingclient "knative.dev/eventing/pkg/client/injection/client" "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable" + "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy" channelinformer "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/channel" channelreconciler "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/channel" "knative.dev/eventing/pkg/duck" @@ -36,16 +44,42 @@ func NewController( cmw configmap.Watcher, ) *controller.Impl { channelInformer := channelinformer.Get(ctx) + eventPolicyInformer := eventpolicy.Get(ctx) r := &Reconciler{ - dynamicClientSet: dynamicclient.Get(ctx), - channelLister: channelInformer.Lister(), + dynamicClientSet: dynamicclient.Get(ctx), + channelLister: channelInformer.Lister(), + eventPolicyLister: eventPolicyInformer.Lister(), + eventingClientSet: eventingclient.Get(ctx), } - impl := channelreconciler.NewImpl(ctx, r) + + var globalResync func() + featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) { + if globalResync != nil { + globalResync() + } + }) + featureStore.WatchConfigs(cmw) + + impl := channelreconciler.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options { + return controller.Options{ + ConfigStore: featureStore, + } + }) r.channelableTracker = duck.NewListableTrackerFromTracker(ctx, channelable.Get, impl.Tracker) + globalResync = func() { + impl.GlobalResync(channelInformer.Informer()) + } + channelInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)) + channelGK := messagingv1.SchemeGroupVersion.WithKind("Channel").GroupKind() + + // Enqueue the Channel, if we have an EventPolicy which was referencing + // or got updated and now is referencing the Channel + eventPolicyInformer.Informer().AddEventHandler(auth.EventPolicyEventHandler(channelInformer.Informer().GetIndexer(), channelGK, impl.EnqueueKey)) + return impl } diff --git a/pkg/reconciler/channel/controller_test.go b/pkg/reconciler/channel/controller_test.go index f33ad1f8f8a..548a41868e3 100644 --- a/pkg/reconciler/channel/controller_test.go +++ b/pkg/reconciler/channel/controller_test.go @@ -19,12 +19,17 @@ package channel import ( "testing" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "knative.dev/eventing/pkg/apis/feature" + "knative.dev/pkg/configmap" . "knative.dev/pkg/reconciler/testing" // Fake injection informers _ "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable/fake" + _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy/fake" _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/channel/fake" _ "knative.dev/pkg/client/injection/kube/client/fake" _ "knative.dev/pkg/injection/clients/dynamicclient/fake" @@ -33,7 +38,12 @@ import ( func TestNew(t *testing.T) { ctx, _ := SetupFakeContext(t) - c := NewController(ctx, configmap.NewStaticWatcher()) + c := NewController(ctx, configmap.NewStaticWatcher(&corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: feature.FlagsConfigName, + Namespace: "knative-eventing", + }, + })) if c == nil { t.Fatal("Expected NewController to return a non-nil value") diff --git a/pkg/reconciler/channel/resources/eventpolicy.go b/pkg/reconciler/channel/resources/eventpolicy.go new file mode 100644 index 00000000000..8b2562b71ff --- /dev/null +++ b/pkg/reconciler/channel/resources/eventpolicy.go @@ -0,0 +1,76 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + "knative.dev/pkg/kmeta" +) + +const ( + BackingChannelEventPolicyLabelPrefix = "messaging.knative.dev/" +) + +func MakeEventPolicyForBackingChannel(backingChannel *eventingduckv1.Channelable, parentPolicy *eventingv1alpha1.EventPolicy) *eventingv1alpha1.EventPolicy { + parentPolicy = parentPolicy.DeepCopy() + + return &eventingv1alpha1.EventPolicy{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: backingChannel.Namespace, + Name: kmeta.ChildName(fmt.Sprintf("%s-", parentPolicy.Name), backingChannel.Name), + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: backingChannel.APIVersion, + Kind: backingChannel.Kind, + Name: backingChannel.Name, + UID: backingChannel.UID, + }, { + APIVersion: parentPolicy.APIVersion, + Kind: parentPolicy.Kind, + Name: parentPolicy.Name, + UID: parentPolicy.UID, + }, + }, + Labels: LabelsForBackingChannelsEventPolicy(backingChannel), + }, + Spec: eventingv1alpha1.EventPolicySpec{ + To: []eventingv1alpha1.EventPolicySpecTo{ + { + Ref: &eventingv1alpha1.EventPolicyToReference{ + APIVersion: backingChannel.APIVersion, + Kind: backingChannel.Kind, + Name: backingChannel.Name, + }, + }, + }, + From: parentPolicy.Spec.From, + }, + } +} + +func LabelsForBackingChannelsEventPolicy(backingChannel *eventingduckv1.Channelable) map[string]string { + return map[string]string{ + BackingChannelEventPolicyLabelPrefix + "channel-group": backingChannel.GroupVersionKind().Group, + BackingChannelEventPolicyLabelPrefix + "channel-version": backingChannel.GroupVersionKind().Version, + BackingChannelEventPolicyLabelPrefix + "channel-kind": backingChannel.Kind, + BackingChannelEventPolicyLabelPrefix + "channel-name": backingChannel.Name, + } +} diff --git a/pkg/reconciler/inmemorychannel/controller/inmemorychannel.go b/pkg/reconciler/inmemorychannel/controller/inmemorychannel.go index 3d818463dd6..73281a54e68 100644 --- a/pkg/reconciler/inmemorychannel/controller/inmemorychannel.go +++ b/pkg/reconciler/inmemorychannel/controller/inmemorychannel.go @@ -20,10 +20,7 @@ import ( "context" "errors" "fmt" - "strings" - eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" - messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1" "k8s.io/client-go/kubernetes" @@ -238,42 +235,9 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, imc *v1.InMemoryChannel) imc.GetConditionSet().Manage(imc.GetStatus()).MarkTrue(v1.InMemoryChannelConditionAddressable) - imc.Status.Policies = nil - applyingEvenPolicies, err := auth.GetEventPoliciesForResource(r.eventPolicyLister, messagingv1.SchemeGroupVersion.WithKind("InMemoryChannel"), imc.ObjectMeta) + err = auth.UpdateStatusWithEventPolicies(featureFlags, &imc.Status.AppliedEventPoliciesStatus, &imc.Status, r.eventPolicyLister, v1.SchemeGroupVersion.WithKind("InMemoryChannel"), imc.ObjectMeta) if err != nil { - logging.FromContext(ctx).Errorw("Unable to get applying event policies for InMemoryChannel", zap.Error(err)) - imc.Status.MarkEventPoliciesFailed("EventPoliciesGetFailed", "Failed to get applying event policies") - } - - if len(applyingEvenPolicies) > 0 { - unreadyEventPolicies := []string{} - for _, policy := range applyingEvenPolicies { - if !policy.Status.IsReady() { - unreadyEventPolicies = append(unreadyEventPolicies, policy.Name) - } else { - // only add Ready policies to the list - imc.Status.Policies = append(imc.Status.Policies, eventingduck.AppliedEventPolicyRef{ - Name: policy.Name, - APIVersion: eventingv1alpha1.SchemeGroupVersion.String(), - }) - } - } - - if len(unreadyEventPolicies) == 0 { - imc.Status.MarkEventPoliciesTrue() - } else { - imc.Status.MarkEventPoliciesFailed("EventPoliciesNotReady", "event policies %s are not ready", strings.Join(unreadyEventPolicies, ", ")) - } - - } else { - // we have no applying event policy. So we set the EP condition to True - if featureFlags.IsOIDCAuthentication() { - // in case of OIDC auth, we also set the message with the default authorization mode - imc.Status.MarkEventPoliciesTrueWithReason("DefaultAuthorizationMode", "Default authz mode is %q", featureFlags[feature.AuthorizationDefaultMode]) - } else { - // in case OIDC is disabled, we set EP condition to true too, but give some message that authz (EPs) require OIDC - imc.Status.MarkEventPoliciesTrueWithReason("OIDCDisabled", "Feature %q must be enabled to support Authorization", feature.OIDCAuthentication) - } + return fmt.Errorf("could not update InMemoryChannels status with EventPolicies: %v", err) } // Ok, so now the Dispatcher Deployment & Service have been created, we're golden since the diff --git a/pkg/reconciler/inmemorychannel/controller/inmemorychannel_test.go b/pkg/reconciler/inmemorychannel/controller/inmemorychannel_test.go index 11c0aca30cd..f9f859444ba 100644 --- a/pkg/reconciler/inmemorychannel/controller/inmemorychannel_test.go +++ b/pkg/reconciler/inmemorychannel/controller/inmemorychannel_test.go @@ -22,8 +22,6 @@ import ( "strconv" "testing" - "knative.dev/eventing/pkg/apis/eventing/v1alpha1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" "k8s.io/utils/pointer" @@ -105,6 +103,12 @@ var ( URL: dlsURI, CACerts: nil, } + + imcV1GVK = metav1.GroupVersionKind{ + Group: "messaging.knative.dev", + Version: "v1", + Kind: "InMemoryChannel", + } ) func TestAllCases(t *testing.T) { @@ -659,11 +663,7 @@ func TestAllCases(t *testing.T) { makeChannelService(NewInMemoryChannel(imcName, testNS)), NewEventPolicy(readyEventPolicyName, testNS, WithReadyEventPolicyCondition, - WithEventPolicyToRef(v1alpha1.EventPolicyToReference{ - APIVersion: v1.SchemeGroupVersion.String(), - Kind: "InMemoryChannel", - Name: imcName, - }), + WithEventPolicyToRef(imcV1GVK, imcName), ), }, WantErr: false, @@ -698,11 +698,7 @@ func TestAllCases(t *testing.T) { makeChannelService(NewInMemoryChannel(imcName, testNS)), NewEventPolicy(unreadyEventPolicyName, testNS, WithUnreadyEventPolicyCondition, - WithEventPolicyToRef(v1alpha1.EventPolicyToReference{ - APIVersion: v1.SchemeGroupVersion.String(), - Kind: "InMemoryChannel", - Name: imcName, - }), + WithEventPolicyToRef(imcV1GVK, imcName), ), }, WantErr: false, @@ -736,19 +732,11 @@ func TestAllCases(t *testing.T) { makeChannelService(NewInMemoryChannel(imcName, testNS)), NewEventPolicy(readyEventPolicyName, testNS, WithReadyEventPolicyCondition, - WithEventPolicyToRef(v1alpha1.EventPolicyToReference{ - APIVersion: v1.SchemeGroupVersion.String(), - Kind: "InMemoryChannel", - Name: imcName, - }), + WithEventPolicyToRef(imcV1GVK, imcName), ), NewEventPolicy(unreadyEventPolicyName, testNS, WithUnreadyEventPolicyCondition, - WithEventPolicyToRef(v1alpha1.EventPolicyToReference{ - APIVersion: v1.SchemeGroupVersion.String(), - Kind: "InMemoryChannel", - Name: imcName, - }), + WithEventPolicyToRef(imcV1GVK, imcName), ), }, WantErr: false, diff --git a/pkg/reconciler/subscription/subscription_test.go b/pkg/reconciler/subscription/subscription_test.go index d74c1e5664d..b0e208ed936 100644 --- a/pkg/reconciler/subscription/subscription_test.go +++ b/pkg/reconciler/subscription/subscription_test.go @@ -1487,6 +1487,7 @@ func TestAllCases(t *testing.T) { WithBackingChannelReady, WithChannelAddress(sink), WithChannelDLSUnknown(), + WithChannelEventPoliciesReady(), ), NewInMemoryChannel(channelName, testNS, WithInitInMemoryChannelConditions, diff --git a/pkg/reconciler/testing/v1/channel.go b/pkg/reconciler/testing/v1/channel.go index c4f5942f9b2..2c2d87d4896 100644 --- a/pkg/reconciler/testing/v1/channel.go +++ b/pkg/reconciler/testing/v1/channel.go @@ -18,6 +18,9 @@ import ( "fmt" "time" + "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + "knative.dev/eventing/pkg/apis/feature" + "k8s.io/apimachinery/pkg/types" v1 "k8s.io/api/core/v1" @@ -159,6 +162,41 @@ func WithChannelDLSUnknown() ChannelOption { } } +func WithChannelEventPoliciesReady() ChannelOption { + return func(c *eventingv1.Channel) { + c.Status.MarkEventPoliciesTrue() + } +} + +func WithChannelEventPoliciesNotReady(reason, message string) ChannelOption { + return func(c *eventingv1.Channel) { + c.Status.MarkEventPoliciesFailed(reason, message) + } +} + +func WithChannelEventPoliciesListed(policyNames ...string) ChannelOption { + return func(c *eventingv1.Channel) { + for _, name := range policyNames { + c.Status.Policies = append(c.Status.Policies, eventingduckv1.AppliedEventPolicyRef{ + APIVersion: v1alpha1.SchemeGroupVersion.String(), + Name: name, + }) + } + } +} + +func WithChannelEventPoliciesReadyBecauseOIDCDisabled() ChannelOption { + return func(c *eventingv1.Channel) { + c.Status.MarkEventPoliciesTrueWithReason("OIDCDisabled", "Feature %q must be enabled to support Authorization", feature.OIDCAuthentication) + } +} + +func WithChannelEventPoliciesReadyBecauseNoPolicyAndOIDCEnabled() ChannelOption { + return func(c *eventingv1.Channel) { + c.Status.MarkEventPoliciesTrueWithReason("DefaultAuthorizationMode", "Default authz mode is %q", feature.AuthorizationAllowSameNamespace) + } +} + func WithChannelDLSResolvedFailed() ChannelOption { return func(c *eventingv1.Channel) { c.Status.MarkDeadLetterSinkResolvedFailed( diff --git a/pkg/reconciler/testing/v1/eventpolicy.go b/pkg/reconciler/testing/v1/eventpolicy.go index a54b07a03e0..0d2934ac6ce 100644 --- a/pkg/reconciler/testing/v1/eventpolicy.go +++ b/pkg/reconciler/testing/v1/eventpolicy.go @@ -66,23 +66,28 @@ func WithUnreadyEventPolicyCondition(ep *v1alpha1.EventPolicy) { } } -func WithEventPolicyTo(tos ...v1alpha1.EventPolicySpecTo) EventPolicyOption { - return func(ep *v1alpha1.EventPolicy) { - ep.Spec.To = append(ep.Spec.To, tos...) - } -} - -func WithEventPolicyToRef(ref v1alpha1.EventPolicyToReference) EventPolicyOption { +func WithEventPolicyToRef(gvk metav1.GroupVersionKind, name string) EventPolicyOption { return func(ep *v1alpha1.EventPolicy) { ep.Spec.To = append(ep.Spec.To, v1alpha1.EventPolicySpecTo{ - Ref: &ref, + Ref: &v1alpha1.EventPolicyToReference{ + APIVersion: apiVersion(gvk), + Kind: gvk.Kind, + Name: name, + }, }) } } -func WithEventPolicyFrom(froms ...v1alpha1.EventPolicySpecFrom) EventPolicyOption { +func WithEventPolicyFrom(gvk metav1.GroupVersionKind, name, namespace string) EventPolicyOption { return func(ep *v1alpha1.EventPolicy) { - ep.Spec.From = append(ep.Spec.From, froms...) + ep.Spec.From = append(ep.Spec.From, v1alpha1.EventPolicySpecFrom{ + Ref: &v1alpha1.EventPolicyFromReference{ + APIVersion: apiVersion(gvk), + Kind: gvk.Kind, + Name: name, + Namespace: namespace, + }, + }) } } @@ -92,10 +97,8 @@ func WithEventPolicyLabels(labels map[string]string) EventPolicyOption { } } -func WithEventPolicyOwnerReference(ownerRef metav1.OwnerReference) EventPolicyOption { +func WithEventPolicyOwnerReferences(ownerRefs ...metav1.OwnerReference) EventPolicyOption { return func(ep *v1alpha1.EventPolicy) { - ep.ObjectMeta.OwnerReferences = []metav1.OwnerReference{ - ownerRef, - } + ep.ObjectMeta.OwnerReferences = append(ep.ObjectMeta.OwnerReferences, ownerRefs...) } }